JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Huge refactor to prep for moving the daemon over to the plugin architecture

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨5df753c

⁨giterated-daemon/src/client.rs⁩ - ⁨3744⁩ bytes
Raw
1 use std::sync::Arc;
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_models::{
5 error::{IntoInternalError, OperationError},
6 instance::Instance,
7 object_backend::ObjectBackend,
8 };
9 use giterated_plugin::new_stack::Runtime;
10 use giterated_protocol::{AuthenticatedPayload, NetworkedObject, NetworkedOperation};
11 use tokio::net::TcpStream;
12 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
13
14 pub async fn client_wrapper(
15 our_instance: Instance,
16 mut socket: WebSocketStream<TcpStream>,
17 runtime: Arc<Runtime>,
18 ) {
19 loop {
20 let message = socket.next().await;
21
22 if message.is_none() {
23 // Keep an eye out for this, I dont see why we shouldn't end the connection
24 unreachable!()
25 }
26
27 let message = message.unwrap();
28
29 let payload = match message {
30 Ok(message) => {
31 let payload = match message {
32 Message::Binary(payload) => payload,
33 Message::Ping(_) => {
34 let _ = socket.send(Message::Pong(vec![])).await;
35 continue;
36 }
37 Message::Close(_) => return,
38 _ => continue,
39 };
40
41 payload
42 }
43 Err(err) => {
44 // Connection error
45 warn!("A connection error has occured: {:?}", err);
46
47 return;
48 }
49 };
50
51 trace!("Read payload from client");
52
53 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
54 Ok(payload) => payload,
55 Err(e) => {
56 warn!(
57 "A network payload deserialization failure has occurred: {:?}",
58 e
59 );
60
61 continue;
62 }
63 };
64
65 trace!(
66 "Deserialized payload for operation {} from client",
67 payload.operation
68 );
69
70 todo!()
71
72 // let operation_state = StackOperationState {
73 // our_instance: our_instance.clone(),
74 // instance: None,
75 // user: None,
76 // };
77
78 // let result = handle_client_message(payload, operation_state, runtime.clone()).await;
79
80 // Grab operation errors so we can log them, they don't make it across the network
81 // if let Err(OperationError::Internal(internal_error)) = &result {
82 // error!("An internal error has occurred:\n{:?}", internal_error);
83 // }
84
85 // // Map error to the network variant
86 // let result = result.map_err(|e| e.into_network());
87
88 // socket
89 // .send(Message::Binary(bincode::serialize(&result).unwrap()))
90 // .await
91 // .expect("there was an error sending a message, this is a problem for the receiver");
92 }
93 }
94
95 pub async fn handle_client_message(
96 payload: AuthenticatedPayload,
97 // operation_state: StackOperationState,
98 runtime: Arc<Runtime>,
99 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
100 // let mut networked_object = runtime
101 // .get_object::<NetworkedObject>(&payload.object, &operation_state)
102 // .await
103 // .as_internal_error_with_context("handling client message")?;
104
105 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
106 payload.into_message();
107
108 let networked_operation = NetworkedOperation::new(
109 message.payload.name.clone(),
110 message.payload.payload.clone(),
111 );
112
113 trace!("Calling handler for networked operation");
114
115 // networked_object
116 // .request(networked_operation, &operation_state)
117 // .await
118
119 todo!()
120 }
121