JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Re-expose Operation State in generics.

This is the worst code I have ever written. I hate the way this changes everything. ugh.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨90db3e2

⁨giterated-daemon/src/client.rs⁩ - ⁨3245⁩ bytes
Raw
1 use futures_util::{SinkExt, StreamExt};
2 use giterated_models::{
3 error::{IntoInternalError, OperationError},
4 instance::Instance,
5 object_backend::ObjectBackend,
6 };
7 use giterated_protocol::{AuthenticatedPayload, NetworkedObject, NetworkedOperation};
8 use giterated_stack::{GiteratedStack, StackOperationState};
9 use tokio::net::TcpStream;
10 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
11
12 pub async fn client_wrapper(
13 our_instance: Instance,
14 mut socket: WebSocketStream<TcpStream>,
15 runtime: GiteratedStack,
16 ) {
17 loop {
18 let message = socket.next().await;
19
20 if message.is_none() {
21 // Keep an eye out for this, I dont see why we shouldn't end the connection
22 unreachable!()
23 }
24
25 let message = message.unwrap();
26
27 let payload = match message {
28 Ok(message) => {
29 let payload = match message {
30 Message::Binary(payload) => payload,
31 Message::Ping(_) => {
32 let _ = socket.send(Message::Pong(vec![])).await;
33 continue;
34 }
35 Message::Close(_) => return,
36 _ => continue,
37 };
38
39 payload
40 }
41 Err(err) => {
42 // Connection error
43 warn!("A connection error has occured: {:?}", err);
44
45 return;
46 }
47 };
48
49 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
50 Ok(payload) => payload,
51 Err(e) => {
52 warn!(
53 "A network payload deserialization failure has occurred: {:?}",
54 e
55 );
56
57 continue;
58 }
59 };
60
61 let operation_state = StackOperationState {
62 our_instance: our_instance.clone(),
63 runtime: runtime.clone(),
64 instance: None,
65 user: None,
66 };
67
68 let result = handle_client_message(payload, operation_state, runtime.clone()).await;
69
70 // Grab operation errors so we can log them, they don't make it across the network
71 if let Err(OperationError::Internal(internal_error)) = &result {
72 error!("An internal error has occurred:\n{:?}", internal_error);
73 }
74
75 // Map error to the network variant
76 let result = result.map_err(|e| e.into_network());
77
78 socket
79 .send(Message::Binary(bincode::serialize(&result).unwrap()))
80 .await
81 .expect("there was an error sending a message, this is a problem for the receiver");
82 }
83 }
84
85 pub async fn handle_client_message(
86 payload: AuthenticatedPayload,
87 operation_state: StackOperationState,
88 runtime: GiteratedStack,
89 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
90 let mut networked_object = runtime
91 .get_object::<NetworkedObject>(&payload.object, &operation_state)
92 .await
93 .as_internal_error_with_context("handling client message")?;
94
95 let networked_operation = NetworkedOperation::new(payload.object, payload.payload);
96
97 networked_object
98 .request(networked_operation, &operation_state)
99 .await
100 }
101