JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

The long awaited, exhalted huge networking stack change.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨21b6a72

⁨giterated-daemon/src/client.rs⁩ - ⁨3648⁩ bytes
Raw
1 use futures_util::{SinkExt, StreamExt};
2 use giterated_models::{
3 error::{IntoInternalError, OperationError},
4 instance::Instance,
5 object_backend::ObjectBackend,
6 };
7 use giterated_protocol::{AuthenticatedPayload, NetworkedObject, NetworkedOperation};
8 use giterated_stack::{GiteratedStack, StackOperationState};
9 use tokio::net::TcpStream;
10 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
11
12 pub async fn client_wrapper(
13 our_instance: Instance,
14 mut socket: WebSocketStream<TcpStream>,
15 runtime: GiteratedStack,
16 ) {
17 loop {
18 let message = socket.next().await;
19
20 if message.is_none() {
21 // Keep an eye out for this, I dont see why we shouldn't end the connection
22 unreachable!()
23 }
24
25 let message = message.unwrap();
26
27 let payload = match message {
28 Ok(message) => {
29 let payload = match message {
30 Message::Binary(payload) => payload,
31 Message::Ping(_) => {
32 let _ = socket.send(Message::Pong(vec![])).await;
33 continue;
34 }
35 Message::Close(_) => return,
36 _ => continue,
37 };
38
39 payload
40 }
41 Err(err) => {
42 // Connection error
43 warn!("A connection error has occured: {:?}", err);
44
45 return;
46 }
47 };
48
49 trace!("Read payload from client");
50
51 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
52 Ok(payload) => payload,
53 Err(e) => {
54 warn!(
55 "A network payload deserialization failure has occurred: {:?}",
56 e
57 );
58
59 continue;
60 }
61 };
62
63 trace!(
64 "Deserialized payload for operation {} from client",
65 payload.operation
66 );
67
68 let operation_state = StackOperationState {
69 our_instance: our_instance.clone(),
70 runtime: runtime.clone(),
71 instance: None,
72 user: None,
73 };
74
75 let result = handle_client_message(payload, operation_state, runtime.clone()).await;
76
77 // Grab operation errors so we can log them, they don't make it across the network
78 if let Err(OperationError::Internal(internal_error)) = &result {
79 error!("An internal error has occurred:\n{:?}", internal_error);
80 }
81
82 // Map error to the network variant
83 let result = result.map_err(|e| e.into_network());
84
85 socket
86 .send(Message::Binary(bincode::serialize(&result).unwrap()))
87 .await
88 .expect("there was an error sending a message, this is a problem for the receiver");
89 }
90 }
91
92 pub async fn handle_client_message(
93 payload: AuthenticatedPayload,
94 operation_state: StackOperationState,
95 runtime: GiteratedStack,
96 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
97 let mut networked_object = runtime
98 .get_object::<NetworkedObject>(&payload.object, &operation_state)
99 .await
100 .as_internal_error_with_context("handling client message")?;
101
102 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
103 payload.into_message();
104
105 let networked_operation = NetworkedOperation::new(
106 message.payload.name.clone(),
107 message.payload.payload.clone(),
108 );
109
110 trace!("Calling handler for networked operation");
111
112 networked_object
113 .request(networked_operation, &operation_state)
114 .await
115 }
116