JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add total amount of branches after filter

Emilia - ⁨1⁩ year ago

parent: tbd commit: ⁨c29b3b8

⁨giterated-daemon/src/client.rs⁩ - ⁨3839⁩ bytes
Raw
1 use futures_util::{SinkExt, StreamExt};
2 use giterated_models::{
3 error::{IntoInternalError, OperationError},
4 instance::Instance,
5 object_backend::ObjectBackend,
6 user::User,
7 };
8 use giterated_protocol::{AuthenticatedPayload, NetworkedObject, NetworkedOperation};
9 use giterated_stack::{AuthenticatedUser, GiteratedStack, StackOperationState};
10 use tokio::net::TcpStream;
11 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
12
13 pub async fn client_wrapper(
14 our_instance: Instance,
15 mut socket: WebSocketStream<TcpStream>,
16 runtime: GiteratedStack,
17 ) {
18 loop {
19 let message = socket.next().await;
20
21 if message.is_none() {
22 // Keep an eye out for this, I dont see why we shouldn't end the connection
23 unreachable!()
24 }
25
26 let message = message.unwrap();
27
28 let payload = match message {
29 Ok(message) => {
30 let payload = match message {
31 Message::Binary(payload) => payload,
32 Message::Ping(_) => {
33 let _ = socket.send(Message::Pong(vec![])).await;
34 continue;
35 }
36 Message::Close(_) => return,
37 _ => continue,
38 };
39
40 payload
41 }
42 Err(err) => {
43 // Connection error
44 warn!("A connection error has occured: {:?}", err);
45
46 return;
47 }
48 };
49
50 trace!("Read payload from client");
51
52 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
53 Ok(payload) => payload,
54 Err(e) => {
55 warn!(
56 "A network payload deserialization failure has occurred: {:?}",
57 e
58 );
59
60 continue;
61 }
62 };
63
64 trace!(
65 "Deserialized payload for operation {} from client",
66 payload.operation
67 );
68
69 let operation_state = StackOperationState {
70 our_instance: our_instance.clone(),
71 runtime: runtime.clone(),
72 instance: None,
73 user: Some(AuthenticatedUser::new(User {
74 username: "uwu".to_string(),
75 instance: Instance("giterated.dev".to_string()),
76 })),
77 };
78
79 let result = handle_client_message(payload, operation_state, runtime.clone()).await;
80
81 // Grab operation errors so we can log them, they don't make it across the network
82 if let Err(OperationError::Internal(internal_error)) = &result {
83 error!("An internal error has occurred:\n{:?}", internal_error);
84 }
85
86 // Map error to the network variant
87 let result = result.map_err(|e| e.into_network());
88
89 socket
90 .send(Message::Binary(bincode::serialize(&result).unwrap()))
91 .await
92 .expect("there was an error sending a message, this is a problem for the receiver");
93 }
94 }
95
96 pub async fn handle_client_message(
97 payload: AuthenticatedPayload,
98 operation_state: StackOperationState,
99 runtime: GiteratedStack,
100 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
101 let mut networked_object = runtime
102 .get_object::<NetworkedObject>(&payload.object, &operation_state)
103 .await
104 .as_internal_error_with_context("handling client message")?;
105
106 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
107 payload.into_message();
108
109 let networked_operation = NetworkedOperation::new(
110 message.payload.name.clone(),
111 message.payload.payload.clone(),
112 );
113
114 trace!("Calling handler for networked operation");
115
116 networked_object
117 .request(networked_operation, &operation_state)
118 .await
119 }
120