diff --git a/Cargo.toml b/Cargo.toml index 9b0d33e..00fe0ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ tracing = "*" futures-util = "*" serde = { version = "1", features = ["derive"]} serde_json = "1.0" +bincode = "*" tracing-subscriber = "0.3" rand = "*" jsonwebtoken = { version = "*", features = ["use_pem"]} diff --git a/src/daemon_backend.rs b/src/daemon_backend.rs index 14785a2..698edbd 100644 --- a/src/daemon_backend.rs +++ b/src/daemon_backend.rs @@ -1,19 +1,18 @@ use std::fmt::Debug; -use deadpool::managed::Pool; use futures_util::{SinkExt, StreamExt}; use giterated_models::{ + authenticated::Authenticated, error::OperationError, - model::{authenticated::Authenticated, MessageTarget}, - operation::{ - GiteratedMessage, GiteratedObject, GiteratedOperation, Object, ObjectBackend, - ObjectRequest, ObjectRequestError, ObjectResponse, - }, + message::GiteratedMessage, + object::{GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse}, + object_backend::ObjectBackend, + operation::GiteratedOperation, }; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::de::DeserializeOwned; use tokio_tungstenite::tungstenite::Message; -use crate::{pool::GiteratedConnectionPool, DaemonConnectionPool, Socket}; +use crate::{DaemonConnectionPool, Socket}; #[async_trait::async_trait] impl ObjectBackend for DaemonConnectionPool { @@ -44,6 +43,7 @@ impl ObjectBackend for DaemonConnectionPool { object_str: &str, ) -> Result, OperationError> { let operation = ObjectRequest(object_str.to_string()); + info!("Get object: {:?}", operation); let message = GiteratedMessage { object: self.0.manager().target_instance.clone(), operation: ObjectRequest::operation_name().to_string(), @@ -59,13 +59,10 @@ impl ObjectBackend for DaemonConnectionPool { let authenticated = Authenticated::new(message); let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?; - - let object_raw = std::str::from_utf8(&object_raw.0).map_err(|e| OperationError::Internal(e.to_string()))?; - Ok(unsafe { Object::new_unchecked( - O::from_str(&object_raw) - .map_err(|e| OperationError::Internal("heck".to_string()))?, + O::from_str(&object_raw.0) + .map_err(|_e| OperationError::Internal("heck".to_string()))?, self.clone(), ) }) @@ -81,7 +78,7 @@ async fn send_expect< socket: &mut Socket, message: Authenticated, ) -> Result> { - let payload = serde_json::to_vec(&message.into_payload()).unwrap(); + let payload = bincode::serialize(&message.into_payload()).unwrap(); socket .send(Message::Binary(payload)) @@ -96,7 +93,7 @@ async fn send_expect< } }; - let as_target = serde_json::from_slice::(&payload) + let _as_target = serde_json::from_slice::(&payload) .map_err(|e| OperationError::Internal(e.to_string()))?; } diff --git a/src/handshake.rs b/src/handshake.rs index 0a90583..bcd5e2a 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -1,8 +1,5 @@ -use std::str::FromStr; +use futures_util::SinkExt; -use futures_util::StreamExt; -use giterated_models::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}; -use semver::Version; use serde::Serialize; use tokio_tungstenite::tungstenite::Message; @@ -16,90 +13,94 @@ impl GiteratedConnectionHandshaker { } pub async fn handshake(self) -> Result { - let mut socket = self.0; - - // Send handshake initiation - Self::send_raw_message( - &mut socket, - &InitiateHandshake { - version: Version::from_str("0.0.0").unwrap(), - }, - ) - .await?; - - while let Some(message) = socket.next().await { - let message = match message { - Ok(message) => message, - Err(err) => { - error!("Error reading message: {:?}", err); - continue; - } - }; - - let payload = match message { - Message::Text(text) => text.into_bytes(), - Message::Binary(bytes) => bytes, - Message::Ping(_) => continue, - Message::Pong(_) => continue, - Message::Close(_) => { - socket.close(None).await?; - - return Err(SocketClosedError.into()); - } - _ => unreachable!(), - }; - - info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); - - // We try deserializing the finalize response first as it is smaller and just as common - if let Ok(finalize) = serde_json::from_slice::(&payload) { - if finalize.success { - info!("Handshake success!"); - - return Ok(socket); - } else { - socket.close(None).await?; - return Err(SocketClosedError.into()); - } - } else { - match serde_json::from_slice::(&payload) { - Ok(_response) => { - // let message = if !validate_version(&response.version) { - // error!( - // "Version compatibility failure! Our Version: {}, Their Version: {}", - // version(), - // response.version - // ); - - // HandshakeFinalize { success: false } - // } else { - // info!("Connected with a compatible version"); - - // HandshakeFinalize { success: true } - // }; - - warn!("Version compatibility has been no-op'd"); - - let message = HandshakeFinalize { success: true }; - // Send [`HandshakeFinalize`] to indicate if we're compatible or not - Self::send_raw_message(&mut socket, &message).await?; - } - Err(err) => { - error!("Error deserializing message: {:?}", err); - continue; - } - } - } - } - - Ok(socket) + // let mut socket = self.0; + + // // Send handshake initiation + // Self::send_raw_message( + // &mut socket, + // &InitiateHandshake { + // version: Version::from_str("0.0.0").unwrap(), + // }, + // ) + // .await?; + + // while let Some(message) = socket.next().await { + // let message = match message { + // Ok(message) => message, + // Err(err) => { + // error!("Error reading message: {:?}", err); + // continue; + // } + // }; + + // let payload = match message { + // Message::Text(text) => text.into_bytes(), + // Message::Binary(bytes) => bytes, + // Message::Ping(_) => continue, + // Message::Pong(_) => continue, + // Message::Close(_) => { + // socket.close(None).await?; + + // return Err(SocketClosedError.into()); + // } + // _ => unreachable!(), + // }; + + // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); + + // // We try deserializing the finalize response first as it is smaller and just as common + // if let Ok(finalize) = serde_json::from_slice::(&payload) { + // if finalize.success { + // info!("Handshake success!"); + + // return Ok(socket); + // } else { + // socket.close(None).await?; + // return Err(SocketClosedError.into()); + // } + // } else { + // match serde_json::from_slice::(&payload) { + // Ok(_response) => { + // // let message = if !validate_version(&response.version) { + // // error!( + // // "Version compatibility failure! Our Version: {}, Their Version: {}", + // // version(), + // // response.version + // // ); + + // // HandshakeFinalize { success: false } + // // } else { + // // info!("Connected with a compatible version"); + + // // HandshakeFinalize { success: true } + // // }; + + // warn!("Version compatibility has been no-op'd"); + + // let message = HandshakeFinalize { success: true }; + // // Send [`HandshakeFinalize`] to indicate if we're compatible or not + // Self::send_raw_message(&mut socket, &message).await?; + // } + // Err(err) => { + // error!("Error deserializing message: {:?}", err); + // continue; + // } + // } + // } + // } + + Ok(self.0) } async fn send_raw_message( - _socket: &Socket, - _message: &T, + socket: &mut Socket, + message: &T, ) -> Result<(), tokio_tungstenite::tungstenite::Error> { - todo!() + socket + .send(Message::Binary(bincode::serialize(message).unwrap())) + .await?; + + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 23c03ad..bf8fea6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, net::SocketAddr}; use deadpool::managed::{BuildError, Pool}; -use giterated_models::model::instance::Instance; +use giterated_models::instance::Instance; use pool::GiteratedConnectionPool; use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; diff --git a/src/main.rs b/src/main.rs index fbcbb03..00211a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,21 +2,25 @@ use std::str::FromStr; use giterated_api::DaemonConnectionPool; use giterated_models::{ - model::{instance::Instance, user::User}, - operation::ObjectBackend, - values::user::DisplayName, + instance::Instance, + object_backend::ObjectBackend, + user::{DisplayName, User}, }; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { tracing_subscriber::fmt::init(); - let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap(); + let pool = DaemonConnectionPool::connect_other( + Instance::from_str("giterated.dev")?, + ("127.0.0.1:1111").parse().unwrap(), + ) + .unwrap(); let mut user = pool.get_object::("ambee:giterated.dev").await?; - let display_name = user.get::().await?; + let _display_name = user.get::().await?; - let repositories = user + let _repositories = user .repositories(&Instance::from_str("giterated.dev").unwrap()) .await?; diff --git a/src/pool.rs b/src/pool.rs index ab215fd..1c6787a 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use deadpool::managed::{Manager, RecycleError, RecycleResult}; use futures_util::SinkExt; -use giterated_models::model::instance::Instance; +use giterated_models::instance::Instance; use tokio_tungstenite::{connect_async, tungstenite::Message}; use crate::{handshake::GiteratedConnectionHandshaker, Socket};