JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Spinning

Amber - ⁨1⁩ year ago

parent: tbd commit: ⁨1788060

⁨giterated-daemon/src/client.rs⁩ - ⁨3632⁩ bytes
Raw
1 use futures_util::{SinkExt, StreamExt};
2 use giterated_models::{
3 error::{IntoInternalError, OperationError},
4 instance::Instance,
5 object_backend::ObjectBackend,
6 operation::OperationState,
7 };
8 use giterated_protocol::{
9 handlers::{NetworkedObject, NetworkedOperation},
10 AuthenticatedPayload,
11 };
12 use giterated_runtime::RuntimeHandle;
13 use tokio::net::TcpStream;
14 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
15
16 pub async fn client_wrapper(
17 _our_instance: Instance,
18 mut socket: WebSocketStream<TcpStream>,
19 _runtime: RuntimeHandle,
20 ) {
21 loop {
22 let message = socket.next().await;
23
24 if message.is_none() {
25 // Keep an eye out for this, I dont see why we shouldn't end the connection
26 unreachable!()
27 }
28
29 let message = message.unwrap();
30
31 let payload = match message {
32 Ok(message) => match message {
33 Message::Binary(payload) => payload,
34 Message::Ping(_) => {
35 let _ = socket.send(Message::Pong(vec![])).await;
36 continue;
37 }
38 Message::Close(_) => return,
39 _ => continue,
40 },
41 Err(err) => {
42 // Connection error
43 warn!("A connection error has occured: {:?}", err);
44
45 return;
46 }
47 };
48
49 trace!("Read payload from client");
50
51 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
52 Ok(payload) => payload,
53 Err(e) => {
54 warn!(
55 "A network payload deserialization failure has occurred: {:?}",
56 e
57 );
58
59 continue;
60 }
61 };
62
63 trace!(
64 "Deserialized payload for operation {} from client",
65 payload.operation
66 );
67
68 todo!()
69
70 // let operation_state = StackOperationState {
71 // our_instance: our_instance.clone(),
72 // instance: None,
73 // user: None,
74 // };
75
76 // let result = handle_client_message(payload, operation_state, runtime.clone()).await;
77
78 // Grab operation errors so we can log them, they don't make it across the network
79 // if let Err(OperationError::Internal(internal_error)) = &result {
80 // error!("An internal error has occurred:\n{:?}", internal_error);
81 // }
82
83 // // Map error to the network variant
84 // let result = result.map_err(|e| e.into_network());
85
86 // socket
87 // .send(Message::Binary(bincode::serialize(&result).unwrap()))
88 // .await
89 // .expect("there was an error sending a message, this is a problem for the receiver");
90 }
91 }
92
93 pub async fn handle_client_message(
94 payload: AuthenticatedPayload,
95 operation_state: OperationState,
96 runtime: RuntimeHandle,
97 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
98 let mut networked_object = runtime
99 .get_object::<NetworkedObject>(&payload.object, &operation_state)
100 .await
101 .as_internal_error_with_context("handling client message")?;
102
103 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
104 payload.into_message();
105
106 let networked_operation = NetworkedOperation::new(
107 message.payload.name.clone(),
108 message.payload.payload.clone(),
109 );
110
111 trace!("Calling handler for networked operation");
112
113 networked_object
114 .request(networked_operation, &operation_state)
115 .await
116 }
117