pub mod request; pub mod model { pub use giterated_models::model::*; } pub mod messages { pub use giterated_models::messages::*; } use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use std::{convert::Infallible, ops::Deref}; use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult}; use futures_util::{SinkExt, StreamExt}; use giterated_models::{ messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}, model::instance::Instance, }; use semver::Version; use serde::Serialize; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; type Socket = WebSocketStream>; #[macro_use] extern crate tracing; pub struct GiteratedApiBuilder { our_instance: Instance, our_private_key: Option, our_public_key: Option, target_instance: Option, } pub trait AsInstance { type Error: std::error::Error + Send + Sync + 'static; fn into_instance(self) -> Result; } impl AsInstance for &str { type Error = ::Err; fn into_instance(self) -> Result { Instance::from_str(self) } } impl AsInstance for Instance { type Error = Infallible; fn into_instance(self) -> Result { Ok(self) } } impl GiteratedApiBuilder { pub fn from_local(instance: impl AsInstance) -> Result { Ok(Self { our_instance: instance.into_instance()?, our_private_key: None, our_public_key: None, target_instance: None, }) } pub fn from_local_for_other( instance: impl AsInstance, other: impl AsInstance, ) -> Result { Ok(Self { our_instance: instance.into_instance()?, our_private_key: None, our_public_key: None, target_instance: Some(other.into_instance()?), }) } pub fn private_key(&mut self, key: impl ToString) -> &mut Self { self.our_private_key = Some(key.to_string()); self } pub fn public_key(&mut self, key: impl ToString) -> &mut Self { self.our_public_key = Some(key.to_string()); self } pub async fn build(&mut self) -> Result { Ok(GiteratedApi { configuration: Arc::new(GiteratedApiConfiguration { our_private_key: self.our_private_key.take().unwrap(), our_public_key: self.our_public_key.take().unwrap(), target_instance: self.target_instance.take(), // todo target_public_key: None, }), }) } } pub struct GiteratedConnectionPool { target_instance: Instance, socket_addr: Option, } #[async_trait::async_trait] impl Manager for GiteratedConnectionPool { type Type = Socket; type Error = anyhow::Error; async fn create(&self) -> Result { info!("Creating new Daemon connection"); let mut connection = GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?; // Handshake first! GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?; Ok(connection) } async fn recycle(&self, socket: &mut Socket) -> RecycleResult { match socket.send(Message::Ping(vec![])).await { Ok(_) => Ok(()), Err(err) => { info!("Socket died!"); Err(RecycleError::Backend(err.into())) } } } } pub struct GiteratedApiConfiguration { pub our_private_key: String, pub our_public_key: String, pub target_instance: Option, pub target_public_key: Option, } #[derive(Clone)] pub struct DaemonConnectionPool(Pool); impl DaemonConnectionPool { pub fn instance(&self) -> &Instance { &self.0.manager().target_instance } } impl Deref for DaemonConnectionPool { type Target = Pool; fn deref(&self) -> &Self::Target { &self.0 } } impl DaemonConnectionPool { pub fn connect( instance: impl ToOwned, ) -> Result> { let instance = instance.to_owned(); Ok(Self( Pool::builder(GiteratedConnectionPool { socket_addr: None, target_instance: instance.to_owned(), }) .build()?, )) } pub fn connect_other( instance_identity: impl ToOwned, connection_addr: SocketAddr, ) -> Result> { Ok(Self( Pool::builder(GiteratedConnectionPool { target_instance: instance_identity.to_owned(), socket_addr: Some(connection_addr), }) .build()?, )) } } // Keep this private // seriousyl. #[derive(Clone)] struct GiteratedApi { configuration: Arc, } impl GiteratedApi { // pub async fn public_key(&self) -> String { // if let Some(public_key) = &self.configuration.target_public_key { // public_key.clone() // } else { // assert!(self.configuration.target_instance.is_none()); // self.configuration.our_public_key.clone() // } // } // /// Register on an [`Instance`]. // /// // /// # Authorization // /// - Must be made by the same instance its being sent to // pub async fn register( // &self, // username: String, // email: Option, // password: String, // pool: &DaemonConnectionPool, // ) -> Result { // let mut connection = pool.0.get().await.unwrap(); // let message = InstanceAuthenticated::new( // RegisterAccountRequest { // username, // email, // password, // }, // pool.0.manager().target_instance.clone(), // self.configuration.our_private_key.clone(), // ) // .unwrap(); // Self::send_message( // &MessageKind::Authentication(AuthenticationMessage::Request( // AuthenticationRequest::RegisterAccount(message), // )), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( // AuthenticationResponse::RegisterAccount(response), // ))) = serde_json::from_slice(&payload) // { // return Ok(response); // } // } // connection.close(None).await?; // Err(SocketClosedError.into()) // } // /// Create repository on the target instance. // pub async fn create_repository( // &self, // user_token: String, // name: String, // description: Option, // visibility: RepositoryVisibility, // default_branch: String, // owner: User, // pool: &DaemonConnectionPool, // ) -> Result { // let mut connection = pool.0.get().await.unwrap(); // let target_respository = Repository { // owner: owner.clone(), // name: name.clone(), // instance: self // .configuration // .target_instance // .as_ref() // .unwrap_or(&pool.0.manager().target_instance) // .clone(), // }; // let request = CreateRepositoryRequest { // name, // description, // visibility, // default_branch, // owner, // }; // let message = UnvalidatedUserAuthenticated::new( // request, // user_token, // self.configuration.our_private_key.clone(), // ) // .unwrap(); // Self::send_message( // &MessageKind::Repository(RepositoryMessage { // target: target_respository, // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository( // message, // )), // }), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::Repository(RepositoryMessage { // command: // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), // .. // })) = serde_json::from_slice(&payload) // { // return Ok(true); // } // } // connection.close(None).await?; // Err(SocketClosedError.into()) // } // pub async fn repository_info( // &self, // token: &str, // request: &RepositoryInfoRequest, // pool: &DaemonConnectionPool, // ) -> Result { // let mut connection = pool.0.get().await.unwrap(); // let message = UnvalidatedUserAuthenticated::new( // request.clone(), // token.to_string(), // self.configuration.our_private_key.clone(), // ) // .unwrap(); // Self::send_message( // &MessageKind::Repository(RepositoryMessage { // target: request.repository.clone(), // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)), // }), // &mut connection, // ) // .await?; // loop { // // while let Ok(payload) = Self::next_payload(&mut socket).await { // let payload = match self.next_payload(&mut connection).await { // Ok(payload) => payload, // Err(err) => { // error!("Error while fetching next payload: {:?}", err); // continue; // } // }; // if let Ok(MessageKind::Repository(RepositoryMessage { // command: // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), // .. // })) = serde_json::from_slice(&payload) // { // return Ok(response); // } // } // } // /// Requests an authentication token for the given login. // /// // /// # Authorization // /// This request can only be sent to the same instance from which // /// it is issued. // pub async fn authentication_token( // &self, // secret_key: String, // username: String, // password: String, // pool: &DaemonConnectionPool, // ) -> Result { // let mut connection = pool.0.get().await.unwrap(); // let request = InstanceAuthenticated::new( // AuthenticationTokenRequest { // username, // password, // }, // pool.0.manager().target_instance.clone(), // include_str!("example_keys/giterated.key").to_string(), // ) // .unwrap(); // Self::send_message( // &MessageKind::Authentication(AuthenticationMessage::Request( // AuthenticationRequest::AuthenticationToken(request), // )), // &mut connection, // ) // .await?; // loop { // // while let Ok(payload) = Self::next_payload(&mut socket).await { // let payload = match self.next_payload(&mut connection).await { // Ok(payload) => payload, // Err(err) => { // error!("Error while fetching next payload: {:?}", err); // continue; // } // }; // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( // AuthenticationResponse::AuthenticationToken(response), // ))) = serde_json::from_slice(&payload) // { // return Ok(response.token); // } // } // } // /// Requests a new token for the given login. // /// // /// # Authorization // /// This request can only be sent to the same instance from which // /// it is issued. // pub async fn extend_token( // &self, // secret_key: String, // token: String, // pool: &DaemonConnectionPool, // ) -> Result, Error> { // let mut connection = pool.0.get().await.unwrap(); // let request = InstanceAuthenticated::new( // TokenExtensionRequest { token }, // pool.0.manager().target_instance.clone(), // self.configuration.our_private_key.clone(), // ) // .unwrap(); // Self::send_message( // &MessageKind::Authentication(AuthenticationMessage::Request( // AuthenticationRequest::TokenExtension(request), // )), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( // AuthenticationResponse::TokenExtension(response), // ))) = serde_json::from_slice(&payload) // { // return Ok(response.new_token); // } // } // todo!() // } // pub async fn user_display_name( // &self, // user: impl ToOwned, // pool: &DaemonConnectionPool, // ) -> Result, Error> { // let mut connection = pool.0.get().await.unwrap(); // let request = UserDisplayNameRequest { // user: user.to_owned(), // }; // Self::send_message( // &MessageKind::User(UserMessage { // instance: self // .configuration // .target_instance // .as_ref() // .unwrap_or(&pool.0.manager().target_instance) // .clone(), // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)), // }), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::User(UserMessage { // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)), // .. // })) = serde_json::from_slice(&payload) // { // return Ok(response.display_name); // } // } // connection.close(None).await?; // Err(SocketClosedError.into()) // } // pub async fn user_display_image( // &self, // user: impl ToOwned, // pool: &DaemonConnectionPool, // ) -> Result, Error> { // let mut connection = pool.0.get().await.unwrap(); // let request = UserDisplayImageRequest { // user: user.to_owned(), // }; // Self::send_message( // &MessageKind::User(UserMessage { // instance: self // .configuration // .target_instance // .as_ref() // .unwrap_or(&pool.0.manager().target_instance) // .clone(), // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)), // }), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::User(UserMessage { // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)), // .. // })) = serde_json::from_slice(&payload) // { // return Ok(response.image_url); // } // } // connection.close(None).await?; // Err(SocketClosedError.into()) // } // pub async fn user_bio( // &self, // user: impl ToOwned, // pool: &DaemonConnectionPool, // ) -> Result, Error> { // let mut connection = pool.0.get().await.unwrap(); // let request = UserBioRequest { // user: user.to_owned(), // }; // Self::send_message( // &MessageKind::User(UserMessage { // instance: self // .configuration // .target_instance // .as_ref() // .unwrap_or(&pool.0.manager().target_instance) // .clone(), // message: UserMessageKind::Request(UserMessageRequest::Bio(request)), // }), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::User(UserMessage { // message: UserMessageKind::Response(UserMessageResponse::Bio(response)), // .. // })) = serde_json::from_slice(&payload) // { // return Ok(response.bio); // } // } // connection.close(None).await?; // Err(SocketClosedError.into()) // } // pub async fn user_repositories( // &self, // user: impl ToOwned, // pool: &DaemonConnectionPool, // ) -> Result, Error> { // let mut connection = pool.0.get().await.unwrap(); // let request = UserRepositoriesRequest { // user: user.to_owned(), // }; // Self::send_message( // &MessageKind::User(UserMessage { // instance: self // .configuration // .target_instance // .as_ref() // .unwrap_or(&pool.0.manager().target_instance) // .clone(), // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)), // }), // &mut connection, // ) // .await?; // while let Ok(payload) = self.next_payload(&mut connection).await { // if let Ok(MessageKind::User(UserMessage { // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)), // .. // })) = serde_json::from_slice(&payload) // { // return Ok(response.repositories); // } // } // connection.close(None).await?; // Err(SocketClosedError.into()) // } async fn connect_to( instance: &Instance, socket_addr: &Option, ) -> Result { if let Some(addr) = socket_addr { info!( "Connecting to {}", format!("ws://{}/.giterated/daemon/", addr) ); let (websocket, _response) = connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; info!("Connection established with {}", addr); Ok(websocket) } else { info!( "Connecting to {}", format!("wss://{}/.giterated/daemon/", instance.url) ); let (websocket, _response) = connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?; info!("Connection established with {}", instance.url); Ok(websocket) } } async fn handle_handshake( socket: &mut Socket, _instance: &Instance, ) -> Result<(), anyhow::Error> { // Send handshake initiation Self::send_message( &InitiateHandshake { version: Version::from_str("0.0.0").unwrap(), }, socket, ) .await?; while let Some(message) = socket.next().await { let message = match message { Ok(message) => message, Err(err) => { error!("Error reading message: {:?}", err); continue; } }; let payload = match message { Message::Text(text) => text.into_bytes(), Message::Binary(bytes) => bytes, Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { socket.close(None).await?; return Err(SocketClosedError.into()); } _ => unreachable!(), }; info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); // We try deserializing the finalize response first as it is smaller and just as common if let Ok(finalize) = serde_json::from_slice::(&payload) { if finalize.success { info!("Handshake success!"); return Ok(()); } else { panic!() } } else { match serde_json::from_slice::(&payload) { Ok(response) => { // let message = if !validate_version(&response.version) { // error!( // "Version compatibility failure! Our Version: {}, Their Version: {}", // version(), // response.version // ); // HandshakeFinalize { success: false } // } else { // info!("Connected with a compatible version"); // HandshakeFinalize { success: true } // }; warn!("Version compatibility has been no-op'd"); let message = HandshakeFinalize { success: true }; // Send [`HandshakeFinalize`] to indicate if we're compatible or not Self::send_message(&message, socket).await?; } Err(err) => { error!("Error deserializing message: {:?}", err); continue; } } } } Ok(()) } async fn send_message( message: &T, socket: &mut Socket, ) -> Result<(), anyhow::Error> { socket .send(Message::Binary(serde_json::to_vec(&message).unwrap())) .await?; Ok(()) } // async fn next_payload(&self, socket: &mut Socket) -> Result, Error> { // while let Some(message) = socket.next().await { // let message = message?; // match message { // Message::Text(text) => return Ok(text.into_bytes()), // Message::Binary(bytes) => return Ok(bytes), // Message::Ping(_) => continue, // Message::Pong(_) => continue, // Message::Close(_) => { // socket.close(None).await?; // return Err(SocketClosedError.into()); // } // _ => unreachable!(), // } // } // socket.close(None).await?; // return Err(SocketClosedError.into()); // } } #[derive(Debug, thiserror::Error)] #[error("closed socket")] struct SocketClosedError;