Git repository hosting, collaboration, and discovery for the Fediverse.
Fixes schmixes
parent: tbd commit: 92ff417
Showing 6 changed files with 110 insertions and 107 deletions
Cargo.toml
@@ -14,6 +14,7 @@ tracing = "*" | ||
14 | 14 | futures-util = "*" |
15 | 15 | serde = { version = "1", features = ["derive"]} |
16 | 16 | serde_json = "1.0" |
17 | bincode = "*" | |
17 | 18 | tracing-subscriber = "0.3" |
18 | 19 | rand = "*" |
19 | 20 | jsonwebtoken = { version = "*", features = ["use_pem"]} |
src/daemon_backend.rs
@@ -1,19 +1,18 @@ | ||
1 | 1 | use std::fmt::Debug; |
2 | 2 | |
3 | use deadpool::managed::Pool; | |
4 | 3 | use futures_util::{SinkExt, StreamExt}; |
5 | 4 | use giterated_models::{ |
5 | authenticated::Authenticated, | |
6 | 6 | error::OperationError, |
7 | model::{authenticated::Authenticated, MessageTarget}, | |
8 | operation::{ | |
9 | GiteratedMessage, GiteratedObject, GiteratedOperation, Object, ObjectBackend, | |
10 | ObjectRequest, ObjectRequestError, ObjectResponse, | |
11 | }, | |
7 | message::GiteratedMessage, | |
8 | object::{GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse}, | |
9 | object_backend::ObjectBackend, | |
10 | operation::GiteratedOperation, | |
12 | 11 | }; |
13 | use serde::{de::DeserializeOwned, Deserialize, Serialize}; | |
12 | use serde::de::DeserializeOwned; | |
14 | 13 | use tokio_tungstenite::tungstenite::Message; |
15 | 14 | |
16 | use crate::{pool::GiteratedConnectionPool, DaemonConnectionPool, Socket}; | |
15 | use crate::{DaemonConnectionPool, Socket}; | |
17 | 16 | |
18 | 17 | #[async_trait::async_trait] |
19 | 18 | impl ObjectBackend for DaemonConnectionPool { |
@@ -44,6 +43,7 @@ impl ObjectBackend for DaemonConnectionPool { | ||
44 | 43 | object_str: &str, |
45 | 44 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> { |
46 | 45 | let operation = ObjectRequest(object_str.to_string()); |
46 | info!("Get object: {:?}", operation); | |
47 | 47 | let message = GiteratedMessage { |
48 | 48 | object: self.0.manager().target_instance.clone(), |
49 | 49 | operation: ObjectRequest::operation_name().to_string(), |
@@ -59,13 +59,10 @@ impl ObjectBackend for DaemonConnectionPool { | ||
59 | 59 | let authenticated = Authenticated::new(message); |
60 | 60 | |
61 | 61 | let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?; |
62 | ||
63 | let object_raw = std::str::from_utf8(&object_raw.0).map_err(|e| OperationError::Internal(e.to_string()))?; | |
64 | ||
65 | 62 | Ok(unsafe { |
66 | 63 | Object::new_unchecked( |
67 | O::from_str(&object_raw) | |
68 | .map_err(|e| OperationError::Internal("heck".to_string()))?, | |
64 | O::from_str(&object_raw.0) | |
65 | .map_err(|_e| OperationError::Internal("heck".to_string()))?, | |
69 | 66 | self.clone(), |
70 | 67 | ) |
71 | 68 | }) |
@@ -81,7 +78,7 @@ async fn send_expect< | ||
81 | 78 | socket: &mut Socket, |
82 | 79 | message: Authenticated<O, D>, |
83 | 80 | ) -> Result<R, OperationError<B>> { |
84 | let payload = serde_json::to_vec(&message.into_payload()).unwrap(); | |
81 | let payload = bincode::serialize(&message.into_payload()).unwrap(); | |
85 | 82 | |
86 | 83 | socket |
87 | 84 | .send(Message::Binary(payload)) |
@@ -96,7 +93,7 @@ async fn send_expect< | ||
96 | 93 | } |
97 | 94 | }; |
98 | 95 | |
99 | let as_target = serde_json::from_slice::<R>(&payload) | |
96 | let _as_target = serde_json::from_slice::<R>(&payload) | |
100 | 97 | .map_err(|e| OperationError::Internal(e.to_string()))?; |
101 | 98 | } |
102 | 99 |
src/handshake.rs
@@ -1,8 +1,5 @@ | ||
1 | use std::str::FromStr; | |
1 | use futures_util::SinkExt; | |
2 | 2 | |
3 | use futures_util::StreamExt; | |
4 | use giterated_models::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}; | |
5 | use semver::Version; | |
6 | 3 | use serde::Serialize; |
7 | 4 | use tokio_tungstenite::tungstenite::Message; |
8 | 5 | |
@@ -16,90 +13,94 @@ impl GiteratedConnectionHandshaker { | ||
16 | 13 | } |
17 | 14 | |
18 | 15 | pub async fn handshake(self) -> Result<Socket, anyhow::Error> { |
19 | let mut socket = self.0; | |
20 | ||
21 | // Send handshake initiation | |
22 | Self::send_raw_message( | |
23 | &mut socket, | |
24 | &InitiateHandshake { | |
25 | version: Version::from_str("0.0.0").unwrap(), | |
26 | }, | |
27 | ) | |
28 | .await?; | |
29 | ||
30 | while let Some(message) = socket.next().await { | |
31 | let message = match message { | |
32 | Ok(message) => message, | |
33 | Err(err) => { | |
34 | error!("Error reading message: {:?}", err); | |
35 | continue; | |
36 | } | |
37 | }; | |
38 | ||
39 | let payload = match message { | |
40 | Message::Text(text) => text.into_bytes(), | |
41 | Message::Binary(bytes) => bytes, | |
42 | Message::Ping(_) => continue, | |
43 | Message::Pong(_) => continue, | |
44 | Message::Close(_) => { | |
45 | socket.close(None).await?; | |
46 | ||
47 | return Err(SocketClosedError.into()); | |
48 | } | |
49 | _ => unreachable!(), | |
50 | }; | |
51 | ||
52 | info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); | |
53 | ||
54 | // We try deserializing the finalize response first as it is smaller and just as common | |
55 | if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) { | |
56 | if finalize.success { | |
57 | info!("Handshake success!"); | |
58 | ||
59 | return Ok(socket); | |
60 | } else { | |
61 | socket.close(None).await?; | |
62 | return Err(SocketClosedError.into()); | |
63 | } | |
64 | } else { | |
65 | match serde_json::from_slice::<HandshakeResponse>(&payload) { | |
66 | Ok(_response) => { | |
67 | // let message = if !validate_version(&response.version) { | |
68 | // error!( | |
69 | // "Version compatibility failure! Our Version: {}, Their Version: {}", | |
70 | // version(), | |
71 | // response.version | |
72 | // ); | |
73 | ||
74 | // HandshakeFinalize { success: false } | |
75 | // } else { | |
76 | // info!("Connected with a compatible version"); | |
77 | ||
78 | // HandshakeFinalize { success: true } | |
79 | // }; | |
80 | ||
81 | warn!("Version compatibility has been no-op'd"); | |
82 | ||
83 | let message = HandshakeFinalize { success: true }; | |
84 | // Send [`HandshakeFinalize`] to indicate if we're compatible or not | |
85 | Self::send_raw_message(&mut socket, &message).await?; | |
86 | } | |
87 | Err(err) => { | |
88 | error!("Error deserializing message: {:?}", err); | |
89 | continue; | |
90 | } | |
91 | } | |
92 | } | |
93 | } | |
94 | ||
95 | Ok(socket) | |
16 | // let mut socket = self.0; | |
17 | ||
18 | // // Send handshake initiation | |
19 | // Self::send_raw_message( | |
20 | // &mut socket, | |
21 | // &InitiateHandshake { | |
22 | // version: Version::from_str("0.0.0").unwrap(), | |
23 | // }, | |
24 | // ) | |
25 | // .await?; | |
26 | ||
27 | // while let Some(message) = socket.next().await { | |
28 | // let message = match message { | |
29 | // Ok(message) => message, | |
30 | // Err(err) => { | |
31 | // error!("Error reading message: {:?}", err); | |
32 | // continue; | |
33 | // } | |
34 | // }; | |
35 | ||
36 | // let payload = match message { | |
37 | // Message::Text(text) => text.into_bytes(), | |
38 | // Message::Binary(bytes) => bytes, | |
39 | // Message::Ping(_) => continue, | |
40 | // Message::Pong(_) => continue, | |
41 | // Message::Close(_) => { | |
42 | // socket.close(None).await?; | |
43 | ||
44 | // return Err(SocketClosedError.into()); | |
45 | // } | |
46 | // _ => unreachable!(), | |
47 | // }; | |
48 | ||
49 | // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); | |
50 | ||
51 | // // We try deserializing the finalize response first as it is smaller and just as common | |
52 | // if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) { | |
53 | // if finalize.success { | |
54 | // info!("Handshake success!"); | |
55 | ||
56 | // return Ok(socket); | |
57 | // } else { | |
58 | // socket.close(None).await?; | |
59 | // return Err(SocketClosedError.into()); | |
60 | // } | |
61 | // } else { | |
62 | // match serde_json::from_slice::<HandshakeResponse>(&payload) { | |
63 | // Ok(_response) => { | |
64 | // // let message = if !validate_version(&response.version) { | |
65 | // // error!( | |
66 | // // "Version compatibility failure! Our Version: {}, Their Version: {}", | |
67 | // // version(), | |
68 | // // response.version | |
69 | // // ); | |
70 | ||
71 | // // HandshakeFinalize { success: false } | |
72 | // // } else { | |
73 | // // info!("Connected with a compatible version"); | |
74 | ||
75 | // // HandshakeFinalize { success: true } | |
76 | // // }; | |
77 | ||
78 | // warn!("Version compatibility has been no-op'd"); | |
79 | ||
80 | // let message = HandshakeFinalize { success: true }; | |
81 | // // Send [`HandshakeFinalize`] to indicate if we're compatible or not | |
82 | // Self::send_raw_message(&mut socket, &message).await?; | |
83 | // } | |
84 | // Err(err) => { | |
85 | // error!("Error deserializing message: {:?}", err); | |
86 | // continue; | |
87 | // } | |
88 | // } | |
89 | // } | |
90 | // } | |
91 | ||
92 | Ok(self.0) | |
96 | 93 | } |
97 | 94 | |
98 | 95 | async fn send_raw_message<T: Serialize>( |
99 | _socket: &Socket, | |
100 | _message: &T, | |
96 | socket: &mut Socket, | |
97 | message: &T, | |
101 | 98 | ) -> Result<(), tokio_tungstenite::tungstenite::Error> { |
102 | todo!() | |
99 | socket | |
100 | .send(Message::Binary(bincode::serialize(message).unwrap())) | |
101 | .await?; | |
102 | ||
103 | Ok(()) | |
103 | 104 | } |
104 | 105 | } |
105 | 106 |
src/lib.rs
@@ -1,7 +1,7 @@ | ||
1 | 1 | use std::{fmt::Debug, net::SocketAddr}; |
2 | 2 | |
3 | 3 | use deadpool::managed::{BuildError, Pool}; |
4 | use giterated_models::model::instance::Instance; | |
4 | use giterated_models::instance::Instance; | |
5 | 5 | use pool::GiteratedConnectionPool; |
6 | 6 | use tokio::net::TcpStream; |
7 | 7 | use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; |
src/main.rs
@@ -2,21 +2,25 @@ use std::str::FromStr; | ||
2 | 2 | |
3 | 3 | use giterated_api::DaemonConnectionPool; |
4 | 4 | use giterated_models::{ |
5 | model::{instance::Instance, user::User}, | |
6 | operation::ObjectBackend, | |
7 | values::user::DisplayName, | |
5 | instance::Instance, | |
6 | object_backend::ObjectBackend, | |
7 | user::{DisplayName, User}, | |
8 | 8 | }; |
9 | 9 | |
10 | 10 | #[tokio::main] |
11 | 11 | async fn main() -> Result<(), anyhow::Error> { |
12 | 12 | tracing_subscriber::fmt::init(); |
13 | let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap(); | |
13 | let pool = DaemonConnectionPool::connect_other( | |
14 | Instance::from_str("giterated.dev")?, | |
15 | ("127.0.0.1:1111").parse().unwrap(), | |
16 | ) | |
17 | .unwrap(); | |
14 | 18 | |
15 | 19 | let mut user = pool.get_object::<User>("ambee:giterated.dev").await?; |
16 | 20 | |
17 | let display_name = user.get::<DisplayName>().await?; | |
21 | let _display_name = user.get::<DisplayName>().await?; | |
18 | 22 | |
19 | let repositories = user | |
23 | let _repositories = user | |
20 | 24 | .repositories(&Instance::from_str("giterated.dev").unwrap()) |
21 | 25 | .await?; |
22 | 26 |
src/pool.rs
@@ -2,7 +2,7 @@ use std::net::SocketAddr; | ||
2 | 2 | |
3 | 3 | use deadpool::managed::{Manager, RecycleError, RecycleResult}; |
4 | 4 | use futures_util::SinkExt; |
5 | use giterated_models::model::instance::Instance; | |
5 | use giterated_models::instance::Instance; | |
6 | 6 | use tokio_tungstenite::{connect_async, tungstenite::Message}; |
7 | 7 | |
8 | 8 | use crate::{handshake::GiteratedConnectionHandshaker, Socket}; |