diff --git a/old/lib.rs_old b/old/lib.rs_old new file mode 100644 index 0000000..732b753 --- /dev/null +++ b/old/lib.rs_old @@ -0,0 +1,764 @@ +pub mod request; + +pub mod model { + pub use giterated_models::model::*; +} + +pub mod messages { + pub use giterated_models::messages::*; +} + +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use std::{convert::Infallible, ops::Deref}; + +use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult}; + +use futures_util::{SinkExt, StreamExt}; + +use giterated_models::{ + messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}, + model::instance::Instance, +}; +use semver::Version; +use serde::Serialize; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +type Socket = WebSocketStream>; + +#[macro_use] +extern crate tracing; + +pub struct GiteratedApiBuilder { + our_instance: Instance, + our_private_key: Option, + our_public_key: Option, + target_instance: Option, +} + +pub trait AsInstance { + type Error: std::error::Error + Send + Sync + 'static; + + fn into_instance(self) -> Result; +} + +impl AsInstance for &str { + type Error = ::Err; + + fn into_instance(self) -> Result { + Instance::from_str(self) + } +} + +impl AsInstance for Instance { + type Error = Infallible; + + fn into_instance(self) -> Result { + Ok(self) + } +} + +impl GiteratedApiBuilder { + pub fn from_local(instance: impl AsInstance) -> Result { + Ok(Self { + our_instance: instance.into_instance()?, + our_private_key: None, + our_public_key: None, + target_instance: None, + }) + } + + pub fn from_local_for_other( + instance: impl AsInstance, + other: impl AsInstance, + ) -> Result { + Ok(Self { + our_instance: instance.into_instance()?, + our_private_key: None, + our_public_key: None, + target_instance: Some(other.into_instance()?), + }) + } + + pub fn private_key(&mut self, key: impl ToString) -> &mut Self { + self.our_private_key = Some(key.to_string()); + + self + } + + pub fn public_key(&mut self, key: impl ToString) -> &mut Self { + self.our_public_key = Some(key.to_string()); + + self + } + + pub async fn build(&mut self) -> Result { + Ok(GiteratedApi { + configuration: Arc::new(GiteratedApiConfiguration { + our_private_key: self.our_private_key.take().unwrap(), + our_public_key: self.our_public_key.take().unwrap(), + target_instance: self.target_instance.take(), + // todo + target_public_key: None, + }), + }) + } +} + +pub struct GiteratedConnectionPool { + target_instance: Instance, + socket_addr: Option, +} + +#[async_trait::async_trait] +impl Manager for GiteratedConnectionPool { + type Type = Socket; + type Error = anyhow::Error; + + async fn create(&self) -> Result { + info!("Creating new Daemon connection"); + let mut connection = + GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?; + + // Handshake first! + GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?; + + Ok(connection) + } + + async fn recycle(&self, socket: &mut Socket) -> RecycleResult { + match socket.send(Message::Ping(vec![])).await { + Ok(_) => Ok(()), + Err(err) => { + info!("Socket died!"); + Err(RecycleError::Backend(err.into())) + } + } + } +} + +pub struct GiteratedApiConfiguration { + pub our_private_key: String, + pub our_public_key: String, + pub target_instance: Option, + pub target_public_key: Option, +} + +#[derive(Clone)] +pub struct DaemonConnectionPool(Pool); + +impl DaemonConnectionPool { + pub fn instance(&self) -> &Instance { + &self.0.manager().target_instance + } +} + +impl Deref for DaemonConnectionPool { + type Target = Pool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DaemonConnectionPool { + pub fn connect( + instance: impl ToOwned, + ) -> Result> { + let instance = instance.to_owned(); + Ok(Self( + Pool::builder(GiteratedConnectionPool { + socket_addr: None, + target_instance: instance.to_owned(), + }) + .build()?, + )) + } + + pub fn connect_other( + instance_identity: impl ToOwned, + connection_addr: SocketAddr, + ) -> Result> { + Ok(Self( + Pool::builder(GiteratedConnectionPool { + target_instance: instance_identity.to_owned(), + socket_addr: Some(connection_addr), + }) + .build()?, + )) + } +} + +// Keep this private +// seriousyl. +#[derive(Clone)] +struct GiteratedApi { + configuration: Arc, +} + +impl GiteratedApi { + // pub async fn public_key(&self) -> String { + // if let Some(public_key) = &self.configuration.target_public_key { + // public_key.clone() + // } else { + // assert!(self.configuration.target_instance.is_none()); + + // self.configuration.our_public_key.clone() + // } + // } + + // /// Register on an [`Instance`]. + // /// + // /// # Authorization + // /// - Must be made by the same instance its being sent to + // pub async fn register( + // &self, + // username: String, + // email: Option, + // password: String, + // pool: &DaemonConnectionPool, + // ) -> Result { + // let mut connection = pool.0.get().await.unwrap(); + + // let message = InstanceAuthenticated::new( + // RegisterAccountRequest { + // username, + // email, + // password, + // }, + // pool.0.manager().target_instance.clone(), + // self.configuration.our_private_key.clone(), + // ) + // .unwrap(); + + // Self::send_message( + // &MessageKind::Authentication(AuthenticationMessage::Request( + // AuthenticationRequest::RegisterAccount(message), + // )), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( + // AuthenticationResponse::RegisterAccount(response), + // ))) = serde_json::from_slice(&payload) + // { + // return Ok(response); + // } + // } + + // connection.close(None).await?; + + // Err(SocketClosedError.into()) + // } + + // /// Create repository on the target instance. + // pub async fn create_repository( + // &self, + // user_token: String, + // name: String, + // description: Option, + // visibility: RepositoryVisibility, + // default_branch: String, + // owner: User, + // pool: &DaemonConnectionPool, + // ) -> Result { + // let mut connection = pool.0.get().await.unwrap(); + + // let target_respository = Repository { + // owner: owner.clone(), + // name: name.clone(), + // instance: self + // .configuration + // .target_instance + // .as_ref() + // .unwrap_or(&pool.0.manager().target_instance) + // .clone(), + // }; + + // let request = CreateRepositoryRequest { + // name, + // description, + // visibility, + // default_branch, + // owner, + // }; + + // let message = UnvalidatedUserAuthenticated::new( + // request, + // user_token, + // self.configuration.our_private_key.clone(), + // ) + // .unwrap(); + + // Self::send_message( + // &MessageKind::Repository(RepositoryMessage { + // target: target_respository, + // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository( + // message, + // )), + // }), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::Repository(RepositoryMessage { + // command: + // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), + // .. + // })) = serde_json::from_slice(&payload) + // { + // return Ok(true); + // } + // } + + // connection.close(None).await?; + + // Err(SocketClosedError.into()) + // } + + // pub async fn repository_info( + // &self, + // token: &str, + // request: &RepositoryInfoRequest, + // pool: &DaemonConnectionPool, + // ) -> Result { + // let mut connection = pool.0.get().await.unwrap(); + + // let message = UnvalidatedUserAuthenticated::new( + // request.clone(), + // token.to_string(), + // self.configuration.our_private_key.clone(), + // ) + // .unwrap(); + + // Self::send_message( + // &MessageKind::Repository(RepositoryMessage { + // target: request.repository.clone(), + // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)), + // }), + // &mut connection, + // ) + // .await?; + + // loop { + // // while let Ok(payload) = Self::next_payload(&mut socket).await { + // let payload = match self.next_payload(&mut connection).await { + // Ok(payload) => payload, + // Err(err) => { + // error!("Error while fetching next payload: {:?}", err); + // continue; + // } + // }; + + // if let Ok(MessageKind::Repository(RepositoryMessage { + // command: + // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), + // .. + // })) = serde_json::from_slice(&payload) + // { + // return Ok(response); + // } + // } + // } + + // /// Requests an authentication token for the given login. + // /// + // /// # Authorization + // /// This request can only be sent to the same instance from which + // /// it is issued. + // pub async fn authentication_token( + // &self, + // secret_key: String, + // username: String, + // password: String, + // pool: &DaemonConnectionPool, + // ) -> Result { + // let mut connection = pool.0.get().await.unwrap(); + + // let request = InstanceAuthenticated::new( + // AuthenticationTokenRequest { + // username, + // password, + // }, + // pool.0.manager().target_instance.clone(), + // include_str!("example_keys/giterated.key").to_string(), + // ) + // .unwrap(); + + // Self::send_message( + // &MessageKind::Authentication(AuthenticationMessage::Request( + // AuthenticationRequest::AuthenticationToken(request), + // )), + // &mut connection, + // ) + // .await?; + + // loop { + // // while let Ok(payload) = Self::next_payload(&mut socket).await { + // let payload = match self.next_payload(&mut connection).await { + // Ok(payload) => payload, + // Err(err) => { + // error!("Error while fetching next payload: {:?}", err); + // continue; + // } + // }; + + // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( + // AuthenticationResponse::AuthenticationToken(response), + // ))) = serde_json::from_slice(&payload) + // { + // return Ok(response.token); + // } + // } + // } + + // /// Requests a new token for the given login. + // /// + // /// # Authorization + // /// This request can only be sent to the same instance from which + // /// it is issued. + // pub async fn extend_token( + // &self, + // secret_key: String, + // token: String, + // pool: &DaemonConnectionPool, + // ) -> Result, Error> { + // let mut connection = pool.0.get().await.unwrap(); + + // let request = InstanceAuthenticated::new( + // TokenExtensionRequest { token }, + // pool.0.manager().target_instance.clone(), + // self.configuration.our_private_key.clone(), + // ) + // .unwrap(); + + // Self::send_message( + // &MessageKind::Authentication(AuthenticationMessage::Request( + // AuthenticationRequest::TokenExtension(request), + // )), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( + // AuthenticationResponse::TokenExtension(response), + // ))) = serde_json::from_slice(&payload) + // { + // return Ok(response.new_token); + // } + // } + + // todo!() + // } + + // pub async fn user_display_name( + // &self, + // user: impl ToOwned, + // pool: &DaemonConnectionPool, + // ) -> Result, Error> { + // let mut connection = pool.0.get().await.unwrap(); + + // let request = UserDisplayNameRequest { + // user: user.to_owned(), + // }; + + // Self::send_message( + // &MessageKind::User(UserMessage { + // instance: self + // .configuration + // .target_instance + // .as_ref() + // .unwrap_or(&pool.0.manager().target_instance) + // .clone(), + // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)), + // }), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::User(UserMessage { + // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)), + // .. + // })) = serde_json::from_slice(&payload) + // { + // return Ok(response.display_name); + // } + // } + + // connection.close(None).await?; + + // Err(SocketClosedError.into()) + // } + + // pub async fn user_display_image( + // &self, + // user: impl ToOwned, + // pool: &DaemonConnectionPool, + // ) -> Result, Error> { + // let mut connection = pool.0.get().await.unwrap(); + + // let request = UserDisplayImageRequest { + // user: user.to_owned(), + // }; + + // Self::send_message( + // &MessageKind::User(UserMessage { + // instance: self + // .configuration + // .target_instance + // .as_ref() + // .unwrap_or(&pool.0.manager().target_instance) + // .clone(), + // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)), + // }), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::User(UserMessage { + // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)), + // .. + // })) = serde_json::from_slice(&payload) + // { + // return Ok(response.image_url); + // } + // } + + // connection.close(None).await?; + + // Err(SocketClosedError.into()) + // } + + // pub async fn user_bio( + // &self, + // user: impl ToOwned, + // pool: &DaemonConnectionPool, + // ) -> Result, Error> { + // let mut connection = pool.0.get().await.unwrap(); + + // let request = UserBioRequest { + // user: user.to_owned(), + // }; + + // Self::send_message( + // &MessageKind::User(UserMessage { + // instance: self + // .configuration + // .target_instance + // .as_ref() + // .unwrap_or(&pool.0.manager().target_instance) + // .clone(), + // message: UserMessageKind::Request(UserMessageRequest::Bio(request)), + // }), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::User(UserMessage { + // message: UserMessageKind::Response(UserMessageResponse::Bio(response)), + // .. + // })) = serde_json::from_slice(&payload) + // { + // return Ok(response.bio); + // } + // } + + // connection.close(None).await?; + + // Err(SocketClosedError.into()) + // } + + // pub async fn user_repositories( + // &self, + // user: impl ToOwned, + // pool: &DaemonConnectionPool, + // ) -> Result, Error> { + // let mut connection = pool.0.get().await.unwrap(); + + // let request = UserRepositoriesRequest { + // user: user.to_owned(), + // }; + + // Self::send_message( + // &MessageKind::User(UserMessage { + // instance: self + // .configuration + // .target_instance + // .as_ref() + // .unwrap_or(&pool.0.manager().target_instance) + // .clone(), + // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)), + // }), + // &mut connection, + // ) + // .await?; + + // while let Ok(payload) = self.next_payload(&mut connection).await { + // if let Ok(MessageKind::User(UserMessage { + // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)), + // .. + // })) = serde_json::from_slice(&payload) + // { + // return Ok(response.repositories); + // } + // } + + // connection.close(None).await?; + + // Err(SocketClosedError.into()) + // } + + async fn connect_to( + instance: &Instance, + socket_addr: &Option, + ) -> Result { + if let Some(addr) = socket_addr { + info!( + "Connecting to {}", + format!("ws://{}/.giterated/daemon/", addr) + ); + let (websocket, _response) = + connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; + info!("Connection established with {}", addr); + + Ok(websocket) + } else { + info!( + "Connecting to {}", + format!("wss://{}/.giterated/daemon/", instance.url) + ); + let (websocket, _response) = + connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?; + info!("Connection established with {}", instance.url); + + Ok(websocket) + } + } + + async fn handle_handshake( + socket: &mut Socket, + _instance: &Instance, + ) -> Result<(), anyhow::Error> { + // Send handshake initiation + Self::send_message( + &InitiateHandshake { + version: Version::from_str("0.0.0").unwrap(), + }, + socket, + ) + .await?; + + while let Some(message) = socket.next().await { + let message = match message { + Ok(message) => message, + Err(err) => { + error!("Error reading message: {:?}", err); + continue; + } + }; + + let payload = match message { + Message::Text(text) => text.into_bytes(), + Message::Binary(bytes) => bytes, + Message::Ping(_) => continue, + Message::Pong(_) => continue, + Message::Close(_) => { + socket.close(None).await?; + + return Err(SocketClosedError.into()); + } + _ => unreachable!(), + }; + + info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); + + // We try deserializing the finalize response first as it is smaller and just as common + if let Ok(finalize) = serde_json::from_slice::(&payload) { + if finalize.success { + info!("Handshake success!"); + + return Ok(()); + } else { + panic!() + } + } else { + match serde_json::from_slice::(&payload) { + Ok(response) => { + // let message = if !validate_version(&response.version) { + // error!( + // "Version compatibility failure! Our Version: {}, Their Version: {}", + // version(), + // response.version + // ); + + // HandshakeFinalize { success: false } + // } else { + // info!("Connected with a compatible version"); + + // HandshakeFinalize { success: true } + // }; + + warn!("Version compatibility has been no-op'd"); + + let message = HandshakeFinalize { success: true }; + // Send [`HandshakeFinalize`] to indicate if we're compatible or not + Self::send_message(&message, socket).await?; + } + Err(err) => { + error!("Error deserializing message: {:?}", err); + continue; + } + } + } + } + + Ok(()) + } + + async fn send_message( + message: &T, + socket: &mut Socket, + ) -> Result<(), anyhow::Error> { + socket + .send(Message::Binary(serde_json::to_vec(&message).unwrap())) + .await?; + + Ok(()) + } + + // async fn next_payload(&self, socket: &mut Socket) -> Result, Error> { + // while let Some(message) = socket.next().await { + // let message = message?; + + // match message { + // Message::Text(text) => return Ok(text.into_bytes()), + // Message::Binary(bytes) => return Ok(bytes), + // Message::Ping(_) => continue, + // Message::Pong(_) => continue, + // Message::Close(_) => { + // socket.close(None).await?; + + // return Err(SocketClosedError.into()); + // } + // _ => unreachable!(), + // } + // } + + // socket.close(None).await?; + + // return Err(SocketClosedError.into()); + // } +} + +#[derive(Debug, thiserror::Error)] +#[error("closed socket")] +struct SocketClosedError; diff --git a/old/main.rs_old b/old/main.rs_old new file mode 100644 index 0000000..daa8e39 --- /dev/null +++ b/old/main.rs_old @@ -0,0 +1,131 @@ +use std::str::FromStr; + +use giterated_api::DaemonConnectionPool; +use giterated_api::{ + messages::user::{UserBioRequest, UserBioResponse}, + model::{instance::Instance, user::User}, +}; + +use serde::{Deserialize, Serialize}; +// use jwt::SignWithKey; + +#[macro_use] +extern crate tracing; + +// #[tokio::main] +// async fn main() -> Result<(), anyhow::Error> { +// tracing_subscriber::fmt::init(); + +// let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap(); + +// let mut api = GiteratedApiBuilder::from_local("giterated.dev") +// .unwrap() +// .private_key(include_str!("example_keys/giterated.key")) +// .public_key(include_str!("example_keys/giterated.key.pub")) +// .build() +// .await +// .unwrap(); + +// info!("Lets try to make an account!"); + +// let response = api +// .register( +// String::from("ambee"), +// None, +// String::from("lolthisisinthecommithistory"), +// &pool, +// ) +// .await; + +// info!("Registration response: {:?}", response); + +// let token = api +// .authentication_token( +// String::from("foobar"), +// String::from("ambee"), +// String::from("password"), +// &pool, +// ) +// .await +// .unwrap(); + +// println!("Token: {}", token); + +// let public_key = api.public_key().await; + +// println!("Server public key:\n{}", public_key); +// let verification_key = DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(); +// let data: TokenData = decode( +// &token, +// &verification_key, +// &Validation::new(Algorithm::RS256), +// ) +// .unwrap(); + +// println!("The token was valid! Data:\n{:#?}", data.claims); + +// info!("Lets extend that token!"); + +// let new_token = api +// .extend_token(String::from("foobar"), token.clone(), &pool) +// .await +// .unwrap(); +// info!("New Token Returned:\n{:?}", new_token); + +// info!("Try to create a repository? uwu"); + +// let repository = api +// .create_repository( +// new_token.unwrap(), +// String::from("super-repository"), +// None, +// RepositoryVisibility::Public, +// String::from("master"), +// User::from_str("ambee:giterated.dev").unwrap(), +// &pool, +// ) +// .await +// .unwrap(); + +// assert!(repository); + +// info!("Lets view our repository!"); + +// let view = api +// .repository_info( +// &token, +// Repository::from_str("ambee:giterated.dev/super-repository@giterated.dev").unwrap(), +// &pool, +// ) +// .await +// .unwrap(); + +// info!("Repository Info:\n{:#?}", view); + +// Ok(()) +// } + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + tracing_subscriber::fmt::init(); + let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap(); + + let request = UserBioRequest { + user: User::from_str("ambee:giterated.dev").unwrap(), + }; + + let response = giterated_api::request::request_local(&request) + .execute_expect::(&pool) + .await?; + + info!("Response: {:?}", response); + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +struct UserTokenMetadata { + user: User, + generated_for: Instance, + exp: u64, +} diff --git a/old/request.rs_old b/old/request.rs_old new file mode 100644 index 0000000..ed936fe --- /dev/null +++ b/old/request.rs_old @@ -0,0 +1,82 @@ +use std::fmt::Debug; + +use anyhow::Error; +use futures_util::{SinkExt, StreamExt}; +use giterated_models::{ + messages::{error::ConnectionError, MessageTarget}, + model::{ + authenticated::{Authenticated, AuthenticationSourceProvider}, + instance::Instance, + }, +}; +use serde::{de::DeserializeOwned, Serialize}; +use tokio_tungstenite::tungstenite::Message; + +use crate::DaemonConnectionPool; + +pub fn request_local( + request: T, +) -> PreparedRequest { + PreparedRequest { + request: Authenticated::new_empty(request), + instance: None, + } +} + +pub fn request( + instance: Instance, + request: T, +) -> PreparedRequest { + PreparedRequest { + request: Authenticated::new_empty(request), + instance: Some(instance), + } +} + +pub struct PreparedRequest { + instance: Option, + request: Authenticated, +} + +impl PreparedRequest { + pub fn authenticate(mut self, source: P) -> Self { + self.request.append_authentication(source); + + self + } + + pub async fn execute_expect( + self, + pool: &DaemonConnectionPool, + ) -> Result { + let mut socket = pool.0.get().await.unwrap(); + + let payload = serde_json::to_vec(&self.request.into_payload()).unwrap(); + + socket.send(Message::Binary(payload)).await?; + + while let Some(message) = socket.next().await { + let payload = match message? { + Message::Binary(payload) => payload, + _ => { + continue; + } + }; + + let as_target = serde_json::from_slice::(&payload).map_err(|e| Error::from(e)); + + if as_target.is_err() { + // Maybe we got an error payload? + if let Ok(error_payload) = serde_json::from_slice::(&payload) { + return Err(error_payload.into()); + } + } else { + // We did not get an error payload, forward the deserialization error from the + // expected type + return as_target; + } + } + + panic!() + } +} diff --git a/src/daemon_backend.rs b/src/daemon_backend.rs new file mode 100644 index 0000000..733b1f6 --- /dev/null +++ b/src/daemon_backend.rs @@ -0,0 +1,102 @@ +use std::fmt::Debug; + +use deadpool::managed::Pool; +use futures_util::{SinkExt, StreamExt}; +use giterated_models::{ + error::OperationError, + model::{authenticated::Authenticated, MessageTarget}, + operation::{ + GiteratedMessage, GiteratedObject, GiteratedOperation, Object, ObjectBackend, + ObjectRequest, ObjectRequestError, ObjectResponse, + }, +}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio_tungstenite::tungstenite::Message; + +use crate::{pool::GiteratedConnectionPool, DaemonConnectionPool, Socket}; + +#[async_trait::async_trait] +impl ObjectBackend for DaemonConnectionPool { + async fn object_operation + Debug>( + &self, + object: O, + operation: D, + ) -> Result> { + let message = GiteratedMessage { + object, + operation: operation.operation_name().to_string(), + payload: operation, + }; + + let mut connection = self + .0 + .get() + .await + .map_err(|e| OperationError::Internal(e.to_string()))?; + + let authenticated = Authenticated::new(message); + + send_expect(&mut connection, authenticated).await + } + + async fn get_object( + &self, + object_str: &str, + ) -> Result, OperationError> { + let operation = ObjectRequest(object_str.to_string()); + let message = GiteratedMessage { + object: self.0.manager().target_instance.clone(), + operation: operation.operation_name().to_string(), + payload: operation, + }; + + let mut connection = self + .0 + .get() + .await + .map_err(|e| OperationError::Internal(e.to_string()))?; + + let authenticated = Authenticated::new(message); + + let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?; + + Ok(unsafe { + Object::new_unchecked( + serde_json::from_slice(&object_raw.0) + .map_err(|e| OperationError::Internal(e.to_string()))?, + self.clone(), + ) + }) + } +} + +async fn send_expect< + O: GiteratedObject, + D: GiteratedOperation, + B: DeserializeOwned, + R: DeserializeOwned, +>( + socket: &mut Socket, + message: Authenticated, +) -> Result> { + let payload = serde_json::to_vec(&message.into_payload()).unwrap(); + + socket + .send(Message::Binary(payload)) + .await + .map_err(|e| OperationError::Internal(e.to_string()))?; + + while let Some(message) = socket.next().await { + let payload = match message.map_err(|e| OperationError::Internal(e.to_string()))? { + Message::Binary(payload) => payload, + _ => { + continue; + } + }; + + let as_target = serde_json::from_slice::(&payload) + .map_err(|e| OperationError::Internal(e.to_string()))?; + } + + panic!() +} diff --git a/src/example_keys/giterated.key b/src/example_keys/giterated.key deleted file mode 100644 index 1370005..0000000 --- a/src/example_keys/giterated.key +++ /dev/null @@ -1,51 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIJJgIBAAKCAgEArGLMTdU15wfXOX3SwbQTZnAEvMfWK7JBH3t0ZHYcnQRJQt2/ -OAvH9h8WwQAjXcoS9wEvFfEJA2OWdzHWJSSnDD8M5eI2F8+cZeQ5bBk6YGxyYkVo -pkV/RxoitlFFXeSTksQO7VVG9qiw5lcsW+WRw9LJHSyviV8AXZY0GK3RHZwsQnEY -IfmiwaW1+gUEi2ztoBTQjJXH/MkXdYOXqINzVPta/cfw7lOJHOh1X4WSkKErfCR0 -GW4PhAdj3hQFHknta9Fm2SBmgEZt6P/RSprUuKuX7Ufr8mb5Vqv7HwCKapsFym7D -gvbey3M8nCdpWc3w1B/X4vW0cePZg6XX3VAXmMgB6o9WW23IwAaWMWGe1v/oXOUj -7y2+BIzCTUbYdDruVZ9szpNvq9ddGqTGhpivKnpyAANe5e4ggppV4zZC4yKmVqDQ -O2MJh+Qr0mIkbOFFbytvAzIlKBWL+zeMtloOjvIU5F6AoWCr3W0EoJy4oDR0ZVto -vcqETPtYuoFJ1Y5qlAIiC/J1TfKIRhB09WiuSzG5enEOH8thojulS46kIgJqKLMO -vBi3JUf5EHqEXGjIILcb9plAI6D3pHV4rUNIv3KY0ci2PqMfmHjE3v/ELlu4YKKq -fhn/fDMuYphZ41Ga0eaKRhV4ClwFWeI0hOVXhk6C01UZZct2ZbvayraTZCECAwEA -AQKCAgEAl+SOBF7DmggMmjnFxKv5FB/L7NNgYSw1uZm8GvD/kVK/gs2EucuXq8QE -9pY6k1+EimReqsSxnmzXnbsp55x+HIpJwR0rcJucQSNxfVBVYbTsrK5f4XIHDg13 -XJILvwmzBnT+ehzT5G8LQEq7aVXEtHk8gBppqW8uEUhSKxSs15xOW1TvYLBnup1a -1SwqrveSAaWVhOpNRu2hYAhNT0xUCSNZL5hHMJgmjnQ9R6eYVxvMBxzPt8CEp18j -ngCh6ehV7NSb/OFRr+Fe4xjVvxjiKr33pjnjKrmVJctwAAcn73sdBRvH5dPEyBuH -4kfPyjNt6lsMjIzXLCsJ87fjlrwFrUQgq3ojmH/4twJtP9YoVoGg8Zu4BsWqEwxa -vWvlYxA7WbFQ7ty/9voU5w2uII3FOuUk+6HouSg6x/yGRmd9f4kkEOEznsbWrrfU -1Il8PM6fiWCdUZ8XTBiufutoY0lEusIDoUcUtJXemwl0nsfOatSAvGAFd1k5I5wc -xy7psVT9NmT8gtSO+0G7uc25k+nTsLFddsl6SgAweWIvyeAg5rHYsodOqbrPm6Cw -Lq475mm2la+1m/oifyTp6oHgy/GzWc0v87pC9L5xe5fifGwnE+du/t8gAo/ZhDFI -2sp8pI/QrjC9yEGVe8GcWTUtdNjZ9DnMAY+wb6HQLDsQx494amECggEBAOFWKBrK -MpR7xtJ11u3ED/1HwMSkkdNwsrzWJHKUjq94CjYlfXXhPpK7yM4aMxJ7NZmcjaBC -bHTt7+EfnBVL/cFVO0S054SnPfsI5IxIPwDZyeKteAOwkWTY5E0oHBozeUQ2801g -KfU25X6TLF4ZEvLO4kFlw4r40Sfeq+J3Tpi5ERxCW/RUMKsALfSUOQXduPXmqEyZ -QHeoH9hE7IJVmbv3kN9MF3fgo6aiRzwSs1KyGBb70sxsIs9AUvg+7xTtdv1hARwE -yC7g6ghCEaXR8EZWat4IMCTQvv93sBzUwCFa9vDptgzyadPzIxoHMfhbOR1UNmQN -LzbxLoPKWd9sWy0CggEBAMPYDJDoChWT+u+2Q/hPJ65RrzahNKFzhZTPoNwgSzMt -Z3PoGYG3ca7wYqWOjUNFdvt8nFtXuVoKjtxdvzv40whe5yYSld5FZSDYm+9WtjYx -UoZAGhIfhyE4ekXoBHh6Bh/KMMBPPQFY5LVxSNm6HNyWzIDYUvHdMWDL/qmHdEQt -LsyS6fuxIeMEgDrJ9/HAgMMdjiIBL4oVIL1LYBX3vD8uC4970EVmyZ/MDl1ZiDhe -OglmTRnbgePleryURRBLLw/ky3pYv/EMKRCX/dSABCgvMIi/D0gcHEy0FM4at9T2 -1Nyo1KBucb9aT+KGbCP0Ko+eA3cd5VYKJAtYrnqItUUCgf9sQ/kA5iVnMhFVDUk2 -8/y6tL7pvChUbtFx6XGZm8byh7pgSaL+ADsQRSk13WCsgIZAR/fECCYUCD446/cS -RHCnc0wGtuSF19TvyFYHEK80uW9GehIvs6Ynzg3jBGJ8ND8Ph1de1dVS/A1Hw26N -x35TKxOKWFqbavETNule5fPdbQ3LhhaoTcsUXgG2gYDkUKONgkVaiEdxNlYWkwcP -mBFFPq1cnDKqZkQ6y71uH44JLYhlgpjFny8aZM14eMRmSbHiC7l8vM9xtp67WQMh -qLzJDrxJ8aUwCxu5osf7Ej09yXbcSW4uykoOi8NRviNEMJBAhzWa3LrSqw6uQ4rq -ziUCggEAbg2Ooi+C2zVZIjOuZm80wUStzWkxhjjArCsxHgIXwB6XsA6Rps9LVx9G -j/pXb6Itho0z4DCfu/WK6lLUEAN3s5CBHGf9R/Z/KcIPfqOfqTx2P3LuM5j7+rMe -IwKK4JjRsDOSyb69bXBitYN/iLqJVXx4Vz84/SlrghWgeevgbh9l2RgF3KZhgI0a -8e5lIrkmon6NTJaV/GZ7C2S8Dhw08NwTKwJMu3NTgjTNLbAOWH665mVSlmE/0K04 -F5jKZqmZPLk5jvsogXBv8x82SJ/Xti0ufOnA0KjbTk80Ec351/cNDyLguXbW/Mzn -b0hSpLGk6SfGkr1+DqeMMcQX6EvCcQKCAQBYmtLHK6Eyn9DRLxgPwLMbAs3TZkLa -1k/9N42DyGrgM7M9GlMjRKcOfhAkCUhD3y7BMmmAZioD0GQt3DbLR/rc1LtiH5BF -PEyWQ/xA//Ydd7x+jYokfKec/6DSMkfE6Sih55NpaeVhWc5j8++bnazzRQDTQDVg -j/+MvMmaiAhPBdtgo9fnkDoGlgvwNuR/3omfwxNioxKsCjuc6Ht+7q0UkzgDFecM -MUj6ER7qFzrwPOEmHeCfQ4UURmT4f4jjxLgxqTqjFdZDYbVLpzlh//yL4hdZVco5 -ucSj793YjqASwFJRQ7dQEIyLouQCxjUWkeKUJzhqHNxuOam3ieYVW3v4 ------END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/src/example_keys/giterated.key.pub b/src/example_keys/giterated.key.pub deleted file mode 100644 index 960a231..0000000 --- a/src/example_keys/giterated.key.pub +++ /dev/null @@ -1,13 +0,0 @@ ------BEGIN RSA PUBLIC KEY----- -MIICCgKCAgEArGLMTdU15wfXOX3SwbQTZnAEvMfWK7JBH3t0ZHYcnQRJQt2/OAvH -9h8WwQAjXcoS9wEvFfEJA2OWdzHWJSSnDD8M5eI2F8+cZeQ5bBk6YGxyYkVopkV/ -RxoitlFFXeSTksQO7VVG9qiw5lcsW+WRw9LJHSyviV8AXZY0GK3RHZwsQnEYIfmi -waW1+gUEi2ztoBTQjJXH/MkXdYOXqINzVPta/cfw7lOJHOh1X4WSkKErfCR0GW4P -hAdj3hQFHknta9Fm2SBmgEZt6P/RSprUuKuX7Ufr8mb5Vqv7HwCKapsFym7Dgvbe -y3M8nCdpWc3w1B/X4vW0cePZg6XX3VAXmMgB6o9WW23IwAaWMWGe1v/oXOUj7y2+ -BIzCTUbYdDruVZ9szpNvq9ddGqTGhpivKnpyAANe5e4ggppV4zZC4yKmVqDQO2MJ -h+Qr0mIkbOFFbytvAzIlKBWL+zeMtloOjvIU5F6AoWCr3W0EoJy4oDR0ZVtovcqE -TPtYuoFJ1Y5qlAIiC/J1TfKIRhB09WiuSzG5enEOH8thojulS46kIgJqKLMOvBi3 -JUf5EHqEXGjIILcb9plAI6D3pHV4rUNIv3KY0ci2PqMfmHjE3v/ELlu4YKKqfhn/ -fDMuYphZ41Ga0eaKRhV4ClwFWeI0hOVXhk6C01UZZct2ZbvayraTZCECAwEAAQ== ------END RSA PUBLIC KEY----- diff --git a/src/handshake.rs b/src/handshake.rs new file mode 100644 index 0000000..0a90583 --- /dev/null +++ b/src/handshake.rs @@ -0,0 +1,108 @@ +use std::str::FromStr; + +use futures_util::StreamExt; +use giterated_models::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}; +use semver::Version; +use serde::Serialize; +use tokio_tungstenite::tungstenite::Message; + +use crate::Socket; + +pub struct GiteratedConnectionHandshaker(Socket); + +impl GiteratedConnectionHandshaker { + pub fn new(socket: Socket) -> Self { + Self(socket) + } + + pub async fn handshake(self) -> Result { + let mut socket = self.0; + + // Send handshake initiation + Self::send_raw_message( + &mut socket, + &InitiateHandshake { + version: Version::from_str("0.0.0").unwrap(), + }, + ) + .await?; + + while let Some(message) = socket.next().await { + let message = match message { + Ok(message) => message, + Err(err) => { + error!("Error reading message: {:?}", err); + continue; + } + }; + + let payload = match message { + Message::Text(text) => text.into_bytes(), + Message::Binary(bytes) => bytes, + Message::Ping(_) => continue, + Message::Pong(_) => continue, + Message::Close(_) => { + socket.close(None).await?; + + return Err(SocketClosedError.into()); + } + _ => unreachable!(), + }; + + info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); + + // We try deserializing the finalize response first as it is smaller and just as common + if let Ok(finalize) = serde_json::from_slice::(&payload) { + if finalize.success { + info!("Handshake success!"); + + return Ok(socket); + } else { + socket.close(None).await?; + return Err(SocketClosedError.into()); + } + } else { + match serde_json::from_slice::(&payload) { + Ok(_response) => { + // let message = if !validate_version(&response.version) { + // error!( + // "Version compatibility failure! Our Version: {}, Their Version: {}", + // version(), + // response.version + // ); + + // HandshakeFinalize { success: false } + // } else { + // info!("Connected with a compatible version"); + + // HandshakeFinalize { success: true } + // }; + + warn!("Version compatibility has been no-op'd"); + + let message = HandshakeFinalize { success: true }; + // Send [`HandshakeFinalize`] to indicate if we're compatible or not + Self::send_raw_message(&mut socket, &message).await?; + } + Err(err) => { + error!("Error deserializing message: {:?}", err); + continue; + } + } + } + } + + Ok(socket) + } + + async fn send_raw_message( + _socket: &Socket, + _message: &T, + ) -> Result<(), tokio_tungstenite::tungstenite::Error> { + todo!() + } +} + +#[derive(Debug, thiserror::Error)] +#[error("closed socket")] +struct SocketClosedError; diff --git a/src/lib.rs b/src/lib.rs index 732b753..23c03ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,165 +1,26 @@ -pub mod request; +use std::{fmt::Debug, net::SocketAddr}; -pub mod model { - pub use giterated_models::model::*; -} - -pub mod messages { - pub use giterated_models::messages::*; -} - -use std::net::SocketAddr; -use std::str::FromStr; -use std::sync::Arc; -use std::{convert::Infallible, ops::Deref}; - -use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult}; - -use futures_util::{SinkExt, StreamExt}; - -use giterated_models::{ - messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}, - model::instance::Instance, -}; -use semver::Version; -use serde::Serialize; +use deadpool::managed::{BuildError, Pool}; +use giterated_models::model::instance::Instance; +use pool::GiteratedConnectionPool; use tokio::net::TcpStream; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; + +pub mod daemon_backend; +mod handshake; +mod pool; type Socket = WebSocketStream>; #[macro_use] extern crate tracing; -pub struct GiteratedApiBuilder { - our_instance: Instance, - our_private_key: Option, - our_public_key: Option, - target_instance: Option, -} - -pub trait AsInstance { - type Error: std::error::Error + Send + Sync + 'static; - - fn into_instance(self) -> Result; -} - -impl AsInstance for &str { - type Error = ::Err; - - fn into_instance(self) -> Result { - Instance::from_str(self) - } -} - -impl AsInstance for Instance { - type Error = Infallible; - - fn into_instance(self) -> Result { - Ok(self) - } -} - -impl GiteratedApiBuilder { - pub fn from_local(instance: impl AsInstance) -> Result { - Ok(Self { - our_instance: instance.into_instance()?, - our_private_key: None, - our_public_key: None, - target_instance: None, - }) - } - - pub fn from_local_for_other( - instance: impl AsInstance, - other: impl AsInstance, - ) -> Result { - Ok(Self { - our_instance: instance.into_instance()?, - our_private_key: None, - our_public_key: None, - target_instance: Some(other.into_instance()?), - }) - } - - pub fn private_key(&mut self, key: impl ToString) -> &mut Self { - self.our_private_key = Some(key.to_string()); - - self - } - - pub fn public_key(&mut self, key: impl ToString) -> &mut Self { - self.our_public_key = Some(key.to_string()); - - self - } - - pub async fn build(&mut self) -> Result { - Ok(GiteratedApi { - configuration: Arc::new(GiteratedApiConfiguration { - our_private_key: self.our_private_key.take().unwrap(), - our_public_key: self.our_public_key.take().unwrap(), - target_instance: self.target_instance.take(), - // todo - target_public_key: None, - }), - }) - } -} - -pub struct GiteratedConnectionPool { - target_instance: Instance, - socket_addr: Option, -} - -#[async_trait::async_trait] -impl Manager for GiteratedConnectionPool { - type Type = Socket; - type Error = anyhow::Error; - - async fn create(&self) -> Result { - info!("Creating new Daemon connection"); - let mut connection = - GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?; - - // Handshake first! - GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?; - - Ok(connection) - } - - async fn recycle(&self, socket: &mut Socket) -> RecycleResult { - match socket.send(Message::Ping(vec![])).await { - Ok(_) => Ok(()), - Err(err) => { - info!("Socket died!"); - Err(RecycleError::Backend(err.into())) - } - } - } -} - -pub struct GiteratedApiConfiguration { - pub our_private_key: String, - pub our_public_key: String, - pub target_instance: Option, - pub target_public_key: Option, -} - #[derive(Clone)] pub struct DaemonConnectionPool(Pool); -impl DaemonConnectionPool { - pub fn instance(&self) -> &Instance { - &self.0.manager().target_instance - } -} - -impl Deref for DaemonConnectionPool { - type Target = Pool; - - fn deref(&self) -> &Self::Target { - &self.0 +impl Debug for DaemonConnectionPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("DaemonConnectionPool").finish() } } @@ -190,575 +51,3 @@ impl DaemonConnectionPool { )) } } - -// Keep this private -// seriousyl. -#[derive(Clone)] -struct GiteratedApi { - configuration: Arc, -} - -impl GiteratedApi { - // pub async fn public_key(&self) -> String { - // if let Some(public_key) = &self.configuration.target_public_key { - // public_key.clone() - // } else { - // assert!(self.configuration.target_instance.is_none()); - - // self.configuration.our_public_key.clone() - // } - // } - - // /// Register on an [`Instance`]. - // /// - // /// # Authorization - // /// - Must be made by the same instance its being sent to - // pub async fn register( - // &self, - // username: String, - // email: Option, - // password: String, - // pool: &DaemonConnectionPool, - // ) -> Result { - // let mut connection = pool.0.get().await.unwrap(); - - // let message = InstanceAuthenticated::new( - // RegisterAccountRequest { - // username, - // email, - // password, - // }, - // pool.0.manager().target_instance.clone(), - // self.configuration.our_private_key.clone(), - // ) - // .unwrap(); - - // Self::send_message( - // &MessageKind::Authentication(AuthenticationMessage::Request( - // AuthenticationRequest::RegisterAccount(message), - // )), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( - // AuthenticationResponse::RegisterAccount(response), - // ))) = serde_json::from_slice(&payload) - // { - // return Ok(response); - // } - // } - - // connection.close(None).await?; - - // Err(SocketClosedError.into()) - // } - - // /// Create repository on the target instance. - // pub async fn create_repository( - // &self, - // user_token: String, - // name: String, - // description: Option, - // visibility: RepositoryVisibility, - // default_branch: String, - // owner: User, - // pool: &DaemonConnectionPool, - // ) -> Result { - // let mut connection = pool.0.get().await.unwrap(); - - // let target_respository = Repository { - // owner: owner.clone(), - // name: name.clone(), - // instance: self - // .configuration - // .target_instance - // .as_ref() - // .unwrap_or(&pool.0.manager().target_instance) - // .clone(), - // }; - - // let request = CreateRepositoryRequest { - // name, - // description, - // visibility, - // default_branch, - // owner, - // }; - - // let message = UnvalidatedUserAuthenticated::new( - // request, - // user_token, - // self.configuration.our_private_key.clone(), - // ) - // .unwrap(); - - // Self::send_message( - // &MessageKind::Repository(RepositoryMessage { - // target: target_respository, - // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository( - // message, - // )), - // }), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::Repository(RepositoryMessage { - // command: - // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), - // .. - // })) = serde_json::from_slice(&payload) - // { - // return Ok(true); - // } - // } - - // connection.close(None).await?; - - // Err(SocketClosedError.into()) - // } - - // pub async fn repository_info( - // &self, - // token: &str, - // request: &RepositoryInfoRequest, - // pool: &DaemonConnectionPool, - // ) -> Result { - // let mut connection = pool.0.get().await.unwrap(); - - // let message = UnvalidatedUserAuthenticated::new( - // request.clone(), - // token.to_string(), - // self.configuration.our_private_key.clone(), - // ) - // .unwrap(); - - // Self::send_message( - // &MessageKind::Repository(RepositoryMessage { - // target: request.repository.clone(), - // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)), - // }), - // &mut connection, - // ) - // .await?; - - // loop { - // // while let Ok(payload) = Self::next_payload(&mut socket).await { - // let payload = match self.next_payload(&mut connection).await { - // Ok(payload) => payload, - // Err(err) => { - // error!("Error while fetching next payload: {:?}", err); - // continue; - // } - // }; - - // if let Ok(MessageKind::Repository(RepositoryMessage { - // command: - // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), - // .. - // })) = serde_json::from_slice(&payload) - // { - // return Ok(response); - // } - // } - // } - - // /// Requests an authentication token for the given login. - // /// - // /// # Authorization - // /// This request can only be sent to the same instance from which - // /// it is issued. - // pub async fn authentication_token( - // &self, - // secret_key: String, - // username: String, - // password: String, - // pool: &DaemonConnectionPool, - // ) -> Result { - // let mut connection = pool.0.get().await.unwrap(); - - // let request = InstanceAuthenticated::new( - // AuthenticationTokenRequest { - // username, - // password, - // }, - // pool.0.manager().target_instance.clone(), - // include_str!("example_keys/giterated.key").to_string(), - // ) - // .unwrap(); - - // Self::send_message( - // &MessageKind::Authentication(AuthenticationMessage::Request( - // AuthenticationRequest::AuthenticationToken(request), - // )), - // &mut connection, - // ) - // .await?; - - // loop { - // // while let Ok(payload) = Self::next_payload(&mut socket).await { - // let payload = match self.next_payload(&mut connection).await { - // Ok(payload) => payload, - // Err(err) => { - // error!("Error while fetching next payload: {:?}", err); - // continue; - // } - // }; - - // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( - // AuthenticationResponse::AuthenticationToken(response), - // ))) = serde_json::from_slice(&payload) - // { - // return Ok(response.token); - // } - // } - // } - - // /// Requests a new token for the given login. - // /// - // /// # Authorization - // /// This request can only be sent to the same instance from which - // /// it is issued. - // pub async fn extend_token( - // &self, - // secret_key: String, - // token: String, - // pool: &DaemonConnectionPool, - // ) -> Result, Error> { - // let mut connection = pool.0.get().await.unwrap(); - - // let request = InstanceAuthenticated::new( - // TokenExtensionRequest { token }, - // pool.0.manager().target_instance.clone(), - // self.configuration.our_private_key.clone(), - // ) - // .unwrap(); - - // Self::send_message( - // &MessageKind::Authentication(AuthenticationMessage::Request( - // AuthenticationRequest::TokenExtension(request), - // )), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( - // AuthenticationResponse::TokenExtension(response), - // ))) = serde_json::from_slice(&payload) - // { - // return Ok(response.new_token); - // } - // } - - // todo!() - // } - - // pub async fn user_display_name( - // &self, - // user: impl ToOwned, - // pool: &DaemonConnectionPool, - // ) -> Result, Error> { - // let mut connection = pool.0.get().await.unwrap(); - - // let request = UserDisplayNameRequest { - // user: user.to_owned(), - // }; - - // Self::send_message( - // &MessageKind::User(UserMessage { - // instance: self - // .configuration - // .target_instance - // .as_ref() - // .unwrap_or(&pool.0.manager().target_instance) - // .clone(), - // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)), - // }), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::User(UserMessage { - // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)), - // .. - // })) = serde_json::from_slice(&payload) - // { - // return Ok(response.display_name); - // } - // } - - // connection.close(None).await?; - - // Err(SocketClosedError.into()) - // } - - // pub async fn user_display_image( - // &self, - // user: impl ToOwned, - // pool: &DaemonConnectionPool, - // ) -> Result, Error> { - // let mut connection = pool.0.get().await.unwrap(); - - // let request = UserDisplayImageRequest { - // user: user.to_owned(), - // }; - - // Self::send_message( - // &MessageKind::User(UserMessage { - // instance: self - // .configuration - // .target_instance - // .as_ref() - // .unwrap_or(&pool.0.manager().target_instance) - // .clone(), - // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)), - // }), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::User(UserMessage { - // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)), - // .. - // })) = serde_json::from_slice(&payload) - // { - // return Ok(response.image_url); - // } - // } - - // connection.close(None).await?; - - // Err(SocketClosedError.into()) - // } - - // pub async fn user_bio( - // &self, - // user: impl ToOwned, - // pool: &DaemonConnectionPool, - // ) -> Result, Error> { - // let mut connection = pool.0.get().await.unwrap(); - - // let request = UserBioRequest { - // user: user.to_owned(), - // }; - - // Self::send_message( - // &MessageKind::User(UserMessage { - // instance: self - // .configuration - // .target_instance - // .as_ref() - // .unwrap_or(&pool.0.manager().target_instance) - // .clone(), - // message: UserMessageKind::Request(UserMessageRequest::Bio(request)), - // }), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::User(UserMessage { - // message: UserMessageKind::Response(UserMessageResponse::Bio(response)), - // .. - // })) = serde_json::from_slice(&payload) - // { - // return Ok(response.bio); - // } - // } - - // connection.close(None).await?; - - // Err(SocketClosedError.into()) - // } - - // pub async fn user_repositories( - // &self, - // user: impl ToOwned, - // pool: &DaemonConnectionPool, - // ) -> Result, Error> { - // let mut connection = pool.0.get().await.unwrap(); - - // let request = UserRepositoriesRequest { - // user: user.to_owned(), - // }; - - // Self::send_message( - // &MessageKind::User(UserMessage { - // instance: self - // .configuration - // .target_instance - // .as_ref() - // .unwrap_or(&pool.0.manager().target_instance) - // .clone(), - // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)), - // }), - // &mut connection, - // ) - // .await?; - - // while let Ok(payload) = self.next_payload(&mut connection).await { - // if let Ok(MessageKind::User(UserMessage { - // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)), - // .. - // })) = serde_json::from_slice(&payload) - // { - // return Ok(response.repositories); - // } - // } - - // connection.close(None).await?; - - // Err(SocketClosedError.into()) - // } - - async fn connect_to( - instance: &Instance, - socket_addr: &Option, - ) -> Result { - if let Some(addr) = socket_addr { - info!( - "Connecting to {}", - format!("ws://{}/.giterated/daemon/", addr) - ); - let (websocket, _response) = - connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; - info!("Connection established with {}", addr); - - Ok(websocket) - } else { - info!( - "Connecting to {}", - format!("wss://{}/.giterated/daemon/", instance.url) - ); - let (websocket, _response) = - connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?; - info!("Connection established with {}", instance.url); - - Ok(websocket) - } - } - - async fn handle_handshake( - socket: &mut Socket, - _instance: &Instance, - ) -> Result<(), anyhow::Error> { - // Send handshake initiation - Self::send_message( - &InitiateHandshake { - version: Version::from_str("0.0.0").unwrap(), - }, - socket, - ) - .await?; - - while let Some(message) = socket.next().await { - let message = match message { - Ok(message) => message, - Err(err) => { - error!("Error reading message: {:?}", err); - continue; - } - }; - - let payload = match message { - Message::Text(text) => text.into_bytes(), - Message::Binary(bytes) => bytes, - Message::Ping(_) => continue, - Message::Pong(_) => continue, - Message::Close(_) => { - socket.close(None).await?; - - return Err(SocketClosedError.into()); - } - _ => unreachable!(), - }; - - info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); - - // We try deserializing the finalize response first as it is smaller and just as common - if let Ok(finalize) = serde_json::from_slice::(&payload) { - if finalize.success { - info!("Handshake success!"); - - return Ok(()); - } else { - panic!() - } - } else { - match serde_json::from_slice::(&payload) { - Ok(response) => { - // let message = if !validate_version(&response.version) { - // error!( - // "Version compatibility failure! Our Version: {}, Their Version: {}", - // version(), - // response.version - // ); - - // HandshakeFinalize { success: false } - // } else { - // info!("Connected with a compatible version"); - - // HandshakeFinalize { success: true } - // }; - - warn!("Version compatibility has been no-op'd"); - - let message = HandshakeFinalize { success: true }; - // Send [`HandshakeFinalize`] to indicate if we're compatible or not - Self::send_message(&message, socket).await?; - } - Err(err) => { - error!("Error deserializing message: {:?}", err); - continue; - } - } - } - } - - Ok(()) - } - - async fn send_message( - message: &T, - socket: &mut Socket, - ) -> Result<(), anyhow::Error> { - socket - .send(Message::Binary(serde_json::to_vec(&message).unwrap())) - .await?; - - Ok(()) - } - - // async fn next_payload(&self, socket: &mut Socket) -> Result, Error> { - // while let Some(message) = socket.next().await { - // let message = message?; - - // match message { - // Message::Text(text) => return Ok(text.into_bytes()), - // Message::Binary(bytes) => return Ok(bytes), - // Message::Ping(_) => continue, - // Message::Pong(_) => continue, - // Message::Close(_) => { - // socket.close(None).await?; - - // return Err(SocketClosedError.into()); - // } - // _ => unreachable!(), - // } - // } - - // socket.close(None).await?; - - // return Err(SocketClosedError.into()); - // } -} - -#[derive(Debug, thiserror::Error)] -#[error("closed socket")] -struct SocketClosedError; diff --git a/src/main.rs b/src/main.rs index daa8e39..fbcbb03 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,131 +1,24 @@ use std::str::FromStr; use giterated_api::DaemonConnectionPool; -use giterated_api::{ - messages::user::{UserBioRequest, UserBioResponse}, +use giterated_models::{ model::{instance::Instance, user::User}, + operation::ObjectBackend, + values::user::DisplayName, }; -use serde::{Deserialize, Serialize}; -// use jwt::SignWithKey; - -#[macro_use] -extern crate tracing; - -// #[tokio::main] -// async fn main() -> Result<(), anyhow::Error> { -// tracing_subscriber::fmt::init(); - -// let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap(); - -// let mut api = GiteratedApiBuilder::from_local("giterated.dev") -// .unwrap() -// .private_key(include_str!("example_keys/giterated.key")) -// .public_key(include_str!("example_keys/giterated.key.pub")) -// .build() -// .await -// .unwrap(); - -// info!("Lets try to make an account!"); - -// let response = api -// .register( -// String::from("ambee"), -// None, -// String::from("lolthisisinthecommithistory"), -// &pool, -// ) -// .await; - -// info!("Registration response: {:?}", response); - -// let token = api -// .authentication_token( -// String::from("foobar"), -// String::from("ambee"), -// String::from("password"), -// &pool, -// ) -// .await -// .unwrap(); - -// println!("Token: {}", token); - -// let public_key = api.public_key().await; - -// println!("Server public key:\n{}", public_key); -// let verification_key = DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(); -// let data: TokenData = decode( -// &token, -// &verification_key, -// &Validation::new(Algorithm::RS256), -// ) -// .unwrap(); - -// println!("The token was valid! Data:\n{:#?}", data.claims); - -// info!("Lets extend that token!"); - -// let new_token = api -// .extend_token(String::from("foobar"), token.clone(), &pool) -// .await -// .unwrap(); -// info!("New Token Returned:\n{:?}", new_token); - -// info!("Try to create a repository? uwu"); - -// let repository = api -// .create_repository( -// new_token.unwrap(), -// String::from("super-repository"), -// None, -// RepositoryVisibility::Public, -// String::from("master"), -// User::from_str("ambee:giterated.dev").unwrap(), -// &pool, -// ) -// .await -// .unwrap(); - -// assert!(repository); - -// info!("Lets view our repository!"); - -// let view = api -// .repository_info( -// &token, -// Repository::from_str("ambee:giterated.dev/super-repository@giterated.dev").unwrap(), -// &pool, -// ) -// .await -// .unwrap(); - -// info!("Repository Info:\n{:#?}", view); - -// Ok(()) -// } - #[tokio::main] async fn main() -> Result<(), anyhow::Error> { tracing_subscriber::fmt::init(); let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap(); - let request = UserBioRequest { - user: User::from_str("ambee:giterated.dev").unwrap(), - }; + let mut user = pool.get_object::("ambee:giterated.dev").await?; - let response = giterated_api::request::request_local(&request) - .execute_expect::(&pool) - .await?; + let display_name = user.get::().await?; - info!("Response: {:?}", response); + let repositories = user + .repositories(&Instance::from_str("giterated.dev").unwrap()) + .await?; Ok(()) } - -#[derive(Debug, Serialize, Deserialize)] -struct UserTokenMetadata { - user: User, - generated_for: Instance, - exp: u64, -} diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..ab215fd --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,67 @@ +use std::net::SocketAddr; + +use deadpool::managed::{Manager, RecycleError, RecycleResult}; +use futures_util::SinkExt; +use giterated_models::model::instance::Instance; +use tokio_tungstenite::{connect_async, tungstenite::Message}; + +use crate::{handshake::GiteratedConnectionHandshaker, Socket}; + +pub struct GiteratedConnectionPool { + pub target_instance: Instance, + pub socket_addr: Option, +} + +#[async_trait::async_trait] +impl Manager for GiteratedConnectionPool { + type Type = Socket; + type Error = anyhow::Error; + + async fn create(&self) -> Result { + info!("Creating new Daemon connection"); + let connection = connect_to(&self.target_instance, &self.socket_addr).await?; + + // Handshake first! + let connection = GiteratedConnectionHandshaker::new(connection) + .handshake() + .await?; + Ok(connection) + } + + async fn recycle(&self, socket: &mut Socket) -> RecycleResult { + match socket.send(Message::Ping(vec![])).await { + Ok(_) => Ok(()), + Err(err) => { + info!("Socket died!"); + Err(RecycleError::Backend(err.into())) + } + } + } +} + +async fn connect_to( + instance: &Instance, + socket_addr: &Option, +) -> Result { + if let Some(addr) = socket_addr { + info!( + "Connecting to {}", + format!("ws://{}/.giterated/daemon/", addr) + ); + let (websocket, _response) = + connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; + info!("Connection established with {}", addr); + + Ok(websocket) + } else { + info!( + "Connecting to {}", + format!("wss://{}/.giterated/daemon/", instance.url) + ); + let (websocket, _response) = + connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?; + info!("Connection established with {}", instance.url); + + Ok(websocket) + } +} diff --git a/src/request.rs b/src/request.rs deleted file mode 100644 index ed936fe..0000000 --- a/src/request.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::fmt::Debug; - -use anyhow::Error; -use futures_util::{SinkExt, StreamExt}; -use giterated_models::{ - messages::{error::ConnectionError, MessageTarget}, - model::{ - authenticated::{Authenticated, AuthenticationSourceProvider}, - instance::Instance, - }, -}; -use serde::{de::DeserializeOwned, Serialize}; -use tokio_tungstenite::tungstenite::Message; - -use crate::DaemonConnectionPool; - -pub fn request_local( - request: T, -) -> PreparedRequest { - PreparedRequest { - request: Authenticated::new_empty(request), - instance: None, - } -} - -pub fn request( - instance: Instance, - request: T, -) -> PreparedRequest { - PreparedRequest { - request: Authenticated::new_empty(request), - instance: Some(instance), - } -} - -pub struct PreparedRequest { - instance: Option, - request: Authenticated, -} - -impl PreparedRequest { - pub fn authenticate(mut self, source: P) -> Self { - self.request.append_authentication(source); - - self - } - - pub async fn execute_expect( - self, - pool: &DaemonConnectionPool, - ) -> Result { - let mut socket = pool.0.get().await.unwrap(); - - let payload = serde_json::to_vec(&self.request.into_payload()).unwrap(); - - socket.send(Message::Binary(payload)).await?; - - while let Some(message) = socket.next().await { - let payload = match message? { - Message::Binary(payload) => payload, - _ => { - continue; - } - }; - - let as_target = serde_json::from_slice::(&payload).map_err(|e| Error::from(e)); - - if as_target.is_err() { - // Maybe we got an error payload? - if let Ok(error_payload) = serde_json::from_slice::(&payload) { - return Err(error_payload.into()); - } - } else { - // We did not get an error payload, forward the deserialization error from the - // expected type - return as_target; - } - } - - panic!() - } -}