JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

More progress :)

Amber - ⁨1⁩ year ago

parent: tbd commit: ⁨92c3f32

⁨giterated-daemon/src/client.rs⁩ - ⁨3686⁩ bytes
Raw
1 use std::sync::Arc;
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_models::{
5 error::{IntoInternalError, OperationError},
6 instance::Instance,
7 object_backend::ObjectBackend,
8 operation::OperationState,
9 };
10 use giterated_plugin::new_stack::{handle::RuntimeHandle, Runtime};
11 use giterated_protocol::{
12 handlers::{NetworkedObject, NetworkedOperation},
13 AuthenticatedPayload,
14 };
15 use tokio::net::TcpStream;
16 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
17
18 pub async fn client_wrapper(
19 _our_instance: Instance,
20 mut socket: WebSocketStream<TcpStream>,
21 _runtime: Arc<Box<Runtime>>,
22 ) {
23 loop {
24 let message = socket.next().await;
25
26 if message.is_none() {
27 // Keep an eye out for this, I dont see why we shouldn't end the connection
28 unreachable!()
29 }
30
31 let message = message.unwrap();
32
33 let payload = match message {
34 Ok(message) => match message {
35 Message::Binary(payload) => payload,
36 Message::Ping(_) => {
37 let _ = socket.send(Message::Pong(vec![])).await;
38 continue;
39 }
40 Message::Close(_) => return,
41 _ => continue,
42 },
43 Err(err) => {
44 // Connection error
45 warn!("A connection error has occured: {:?}", err);
46
47 return;
48 }
49 };
50
51 trace!("Read payload from client");
52
53 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
54 Ok(payload) => payload,
55 Err(e) => {
56 warn!(
57 "A network payload deserialization failure has occurred: {:?}",
58 e
59 );
60
61 continue;
62 }
63 };
64
65 trace!(
66 "Deserialized payload for operation {} from client",
67 payload.operation
68 );
69
70 todo!()
71
72 // let operation_state = StackOperationState {
73 // our_instance: our_instance.clone(),
74 // instance: None,
75 // user: None,
76 // };
77
78 // let result = handle_client_message(payload, operation_state, runtime.clone()).await;
79
80 // Grab operation errors so we can log them, they don't make it across the network
81 // if let Err(OperationError::Internal(internal_error)) = &result {
82 // error!("An internal error has occurred:\n{:?}", internal_error);
83 // }
84
85 // // Map error to the network variant
86 // let result = result.map_err(|e| e.into_network());
87
88 // socket
89 // .send(Message::Binary(bincode::serialize(&result).unwrap()))
90 // .await
91 // .expect("there was an error sending a message, this is a problem for the receiver");
92 }
93 }
94
95 pub async fn handle_client_message(
96 payload: AuthenticatedPayload,
97 operation_state: OperationState,
98 runtime: RuntimeHandle,
99 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
100 let mut networked_object = runtime
101 .get_object::<NetworkedObject>(&payload.object, &operation_state)
102 .await
103 .as_internal_error_with_context("handling client message")?;
104
105 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
106 payload.into_message();
107
108 let networked_operation = NetworkedOperation::new(
109 message.payload.name.clone(),
110 message.payload.payload.clone(),
111 );
112
113 trace!("Calling handler for networked operation");
114
115 networked_object
116 .request(networked_operation, &operation_state)
117 .await
118 }
119