use std::convert::Infallible; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use anyhow::Error; use deadpool::managed::{BuildError, Manager, Pool, RecycleResult}; use futures_util::{SinkExt, StreamExt}; use giterated_daemon::messages::authentication::{RegisterAccountRequest, RegisterAccountResponse}; use giterated_daemon::messages::UnvalidatedUserAuthenticated; use giterated_daemon::model::repository::RepositoryVisibility; use giterated_daemon::model::user::User; use giterated_daemon::{ handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake}, messages::{ authentication::{ AuthenticationMessage, AuthenticationRequest, AuthenticationResponse, AuthenticationTokenRequest, TokenExtensionRequest, }, repository::{ CreateRepositoryRequest, RepositoryInfoRequest, RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse, }, InstanceAuthenticated, MessageKind, }, model::{ instance::Instance, repository::{Repository, RepositoryView}, }, }; use giterated_daemon::{validate_version, version}; use serde::Serialize; use tokio::net::TcpStream; use tokio::sync::broadcast::{Receiver, Sender}; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; type Socket = WebSocketStream>; #[macro_use] extern crate tracing; pub struct GiteratedApiBuilder { our_instance: Instance, our_private_key: Option, our_public_key: Option, target_instance: Option, } pub trait AsInstance { type Error: std::error::Error + Send + Sync + 'static; fn into_instance(self) -> Result; } impl AsInstance for &str { type Error = ::Err; fn into_instance(self) -> Result { Instance::from_str(self) } } impl AsInstance for Instance { type Error = Infallible; fn into_instance(self) -> Result { Ok(self) } } impl GiteratedApiBuilder { pub fn from_local(instance: impl AsInstance) -> Result { Ok(Self { our_instance: instance.into_instance()?, our_private_key: None, our_public_key: None, target_instance: None, }) } pub fn from_local_for_other( instance: impl AsInstance, other: impl AsInstance, ) -> Result { Ok(Self { our_instance: instance.into_instance()?, our_private_key: None, our_public_key: None, target_instance: Some(other.into_instance()?), }) } pub fn private_key(&mut self, key: impl ToString) -> &mut Self { self.our_private_key = Some(key.to_string()); self } pub fn public_key(&mut self, key: impl ToString) -> &mut Self { self.our_public_key = Some(key.to_string()); self } pub async fn build(&mut self) -> Result { Ok(GiteratedApi { configuration: Arc::new(GiteratedApiConfiguration { our_private_key: self.our_private_key.take().unwrap(), our_public_key: self.our_public_key.take().unwrap(), target_instance: self.target_instance.take(), // todo target_public_key: None, }), }) } } struct GiteratedConnectionPool { target_instance: Instance, } #[async_trait::async_trait] impl Manager for GiteratedConnectionPool { type Type = Socket; type Error = anyhow::Error; async fn create(&self) -> Result { info!("Creating new Daemon connection"); let mut connection = GiteratedApi::connect_to(self.target_instance.clone()).await?; // Handshake first! GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?; Ok(connection) } async fn recycle(&self, _: &mut Socket) -> RecycleResult { Ok(()) } } pub struct GiteratedApiConfiguration { pub our_private_key: String, pub our_public_key: String, pub target_instance: Option, pub target_public_key: Option, } #[derive(Clone)] pub struct DaemonConnectionPool(Pool); impl DaemonConnectionPool { pub fn connect( instance: impl ToOwned, ) -> Result> { Ok(Self( Pool::builder(GiteratedConnectionPool { target_instance: instance.to_owned(), }) .build()?, )) } } #[derive(Clone)] pub struct GiteratedApi { configuration: Arc, } impl GiteratedApi { pub async fn public_key(&mut self) -> String { if let Some(public_key) = &self.configuration.target_public_key { public_key.clone() } else { assert!(self.configuration.target_instance.is_none()); self.configuration.our_public_key.clone() } } /// Register on an [`Instance`]. /// /// # Authorization /// - Must be made by the same instance its being sent to pub async fn register( &self, username: String, email: Option, password: String, pool: &DaemonConnectionPool, ) -> Result { let mut connection = pool.0.get().await.unwrap(); let message = InstanceAuthenticated::new( RegisterAccountRequest { username, email, password, }, pool.0.manager().target_instance.clone(), self.configuration.our_private_key.clone(), ) .unwrap(); Self::send_message( &MessageKind::Authentication(AuthenticationMessage::Request( AuthenticationRequest::RegisterAccount(message), )), &mut connection, ) .await?; while let Ok(payload) = self.next_payload(&mut connection).await { if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( AuthenticationResponse::RegisterAccount(response), ))) = serde_json::from_slice(&payload) { return Ok(response); } } unreachable!() } /// Create repository on the target instance. pub async fn create_repository( &self, user_token: String, name: String, description: Option, visibility: RepositoryVisibility, default_branch: String, owner: User, pool: &DaemonConnectionPool, ) -> Result { let mut connection = pool.0.get().await.unwrap(); let target_respository = Repository { owner: owner.clone(), name: name.clone(), instance: self .configuration .target_instance .as_ref() .unwrap_or(&pool.0.manager().target_instance) .clone(), }; let request = CreateRepositoryRequest { name, description, visibility, default_branch, owner, }; let message = UnvalidatedUserAuthenticated::new( request, user_token, self.configuration.our_private_key.clone(), ) .unwrap(); Self::send_message( &MessageKind::Repository(RepositoryMessage { target: target_respository, command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository( message, )), }), &mut connection, ) .await?; while let Ok(payload) = self.next_payload(&mut connection).await { if let Ok(MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), .. })) = serde_json::from_slice(&payload) { return Ok(true); } } unreachable!() } pub async fn repository_info( &mut self, token: &str, repository: Repository, pool: &DaemonConnectionPool, ) -> Result { let mut connection = pool.0.get().await.unwrap(); let message = UnvalidatedUserAuthenticated::new( RepositoryInfoRequest { repository: repository.clone(), extra_metadata: true, rev: None, path: None, }, token.to_string(), self.configuration.our_private_key.clone(), ) .unwrap(); Self::send_message( &MessageKind::Repository(RepositoryMessage { target: repository.clone(), command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)), }), &mut connection, ) .await?; loop { // while let Ok(payload) = Self::next_payload(&mut socket).await { let payload = match self.next_payload(&mut connection).await { Ok(payload) => payload, Err(err) => { error!("Error while fetching next payload: {:?}", err); continue; } }; if let Ok(MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), .. })) = serde_json::from_slice(&payload) { return Ok(response); } } } /// Requests an authentication token for the given login. /// /// # Authorization /// This request can only be sent to the same instance from which /// it is issued. pub async fn authentication_token( &mut self, secret_key: String, username: String, password: String, pool: &DaemonConnectionPool, ) -> Result { let mut connection = pool.0.get().await.unwrap(); let request = InstanceAuthenticated::new( AuthenticationTokenRequest { secret_key, username, password, }, pool.0.manager().target_instance.clone(), include_str!("example_keys/giterated.key").to_string(), ) .unwrap(); Self::send_message( &MessageKind::Authentication(AuthenticationMessage::Request( AuthenticationRequest::AuthenticationToken(request), )), &mut connection, ) .await?; loop { // while let Ok(payload) = Self::next_payload(&mut socket).await { let payload = match self.next_payload(&mut connection).await { Ok(payload) => payload, Err(err) => { error!("Error while fetching next payload: {:?}", err); continue; } }; if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( AuthenticationResponse::AuthenticationToken(response), ))) = serde_json::from_slice(&payload) { return Ok(response.token); } } } /// Requests a new token for the given login. /// /// # Authorization /// This request can only be sent to the same instance from which /// it is issued. pub async fn extend_token( &mut self, secret_key: String, token: String, pool: &DaemonConnectionPool, ) -> Result, Error> { let mut connection = pool.0.get().await.unwrap(); let request = InstanceAuthenticated::new( TokenExtensionRequest { secret_key, token }, pool.0.manager().target_instance.clone(), self.configuration.our_private_key.clone(), ) .unwrap(); Self::send_message( &MessageKind::Authentication(AuthenticationMessage::Request( AuthenticationRequest::TokenExtension(request), )), &mut connection, ) .await?; while let Ok(payload) = self.next_payload(&mut connection).await { if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( AuthenticationResponse::TokenExtension(response), ))) = serde_json::from_slice(&payload) { return Ok(response.new_token); } } todo!() } async fn connect_to(instance: Instance) -> Result { let url = &instance.url; info!( "Connecting to {}", format!("wss://{}/.giterated/daemon/", url) ); let (websocket, _response) = connect_async(&format!("wss://{}/.giterated/daemon/", url)).await?; info!("Connection established with {}", url); Ok(websocket) } async fn handle_handshake( socket: &mut Socket, instance: &Instance, ) -> Result<(), anyhow::Error> { // Send handshake initiation Self::send_message( &MessageKind::Handshake(HandshakeMessage::Initiate(InitiateHandshake { identity: instance.clone(), version: version(), })), socket, ) .await?; while let Some(message) = socket.next().await { let message = match message { Ok(message) => message, Err(err) => { error!("Error reading message: {:?}", err); continue; } }; let payload = match message { Message::Text(text) => text.into_bytes(), Message::Binary(bytes) => bytes, Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { panic!() } _ => unreachable!(), }; info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); let message = match serde_json::from_slice::(&payload) { Ok(message) => message, Err(err) => { error!("Error deserializing message: {:?}", err); continue; } }; if let MessageKind::Handshake(handshake) = message { match handshake { HandshakeMessage::Initiate(_) => unimplemented!(), HandshakeMessage::Response(response) => { let message = if !validate_version(&response.version) { error!( "Version compatibility failure! Our Version: {}, Their Version: {}", version(), response.version ); HandshakeFinalize { success: false } } else { info!("Connected with a compatible version"); HandshakeFinalize { success: true } }; // Send HandshakeMessage::Finalize Self::send_message( &MessageKind::Handshake(HandshakeMessage::Finalize(message)), socket, ) .await?; } HandshakeMessage::Finalize(finalize) => { if finalize.success { return Ok(()); } else { panic!() } } } } } Ok(()) } async fn send_message( message: &T, socket: &mut Socket, ) -> Result<(), anyhow::Error> { socket .send(Message::Binary(serde_json::to_vec(&message).unwrap())) .await?; Ok(()) } async fn next_payload(&self, socket: &mut Socket) -> Result, Error> { while let Some(message) = socket.next().await { let message = message?; match message { Message::Text(text) => return Ok(text.into_bytes()), Message::Binary(bytes) => return Ok(bytes), Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { panic!() } _ => unreachable!(), } } unreachable!() } }