JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Create `NetworkedSubstack`.

# giterated-protocol - Create `NetworkedSubstack` which will handle all networked operations giterated needs - Add support for `NetworkedSubstack` for both the daemon and client - Pipe everything through but leave APIs temp # `giterated-daemon` - Remove a bunch of random old code, dead code, and files that aren't needed. - Moved all connection handling to `client.rs`, simplified connection logic with new types

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨202bb12

⁨giterated-daemon/src/client.rs⁩ - ⁨3295⁩ bytes
Raw
1 use std::sync::Arc;
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_models::{
5 authenticated::AuthenticatedPayload,
6 error::{IntoInternalError, OperationError},
7 instance::Instance,
8 object_backend::ObjectBackend,
9 };
10 use giterated_protocol::{NetworkedObject, NetworkedOperation};
11 use giterated_stack::{GiteratedStack, StackOperationState};
12 use tokio::net::TcpStream;
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 pub async fn client_wrapper(
16 our_instance: Instance,
17 mut socket: WebSocketStream<TcpStream>,
18 runtime: Arc<GiteratedStack>,
19 ) {
20 loop {
21 let message = socket.next().await;
22
23 if message.is_none() {
24 // Keep an eye out for this, I dont see why we shouldn't end the connection
25 unreachable!()
26 }
27
28 let message = message.unwrap();
29
30 let payload = match message {
31 Ok(message) => {
32 let payload = match message {
33 Message::Binary(payload) => payload,
34 Message::Ping(_) => {
35 let _ = socket.send(Message::Pong(vec![])).await;
36 continue;
37 }
38 Message::Close(_) => return,
39 _ => continue,
40 };
41
42 payload
43 }
44 Err(err) => {
45 // Connection error
46 warn!("A connection error has occured: {:?}", err);
47
48 return;
49 }
50 };
51
52 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
53 Ok(payload) => payload,
54 Err(e) => {
55 warn!(
56 "A network payload deserialization failure has occurred: {:?}",
57 e
58 );
59
60 continue;
61 }
62 };
63
64 let operation_state = StackOperationState {
65 our_instance: our_instance.clone(),
66 runtime: runtime.clone(),
67 instance: None,
68 user: None,
69 };
70
71 let result = handle_client_message(payload, operation_state, runtime.clone()).await;
72
73 // Grab operation errors so we can log them, they don't make it across the network
74 if let Err(OperationError::Internal(internal_error)) = &result {
75 error!("An internal error has occurred:\n{:?}", internal_error);
76 }
77
78 // Map error to the network variant
79 let result = result.map_err(|e| e.into_network());
80
81 socket
82 .send(Message::Binary(bincode::serialize(&result).unwrap()))
83 .await
84 .expect("there was an error sending a message, this is a problem for the receiver");
85 }
86 }
87
88 pub async fn handle_client_message(
89 payload: AuthenticatedPayload,
90 operation_state: StackOperationState,
91 runtime: Arc<GiteratedStack>,
92 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
93 let mut networked_object = runtime
94 .get_object::<NetworkedObject>(&payload.object, &operation_state)
95 .await
96 .as_internal_error_with_context("handling client message")?;
97
98 let networked_operation = NetworkedOperation::new(payload.object, payload.payload);
99
100 networked_object
101 .request(networked_operation, &operation_state)
102 .await
103 }
104