JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Set up the daemon for the plugin architecture

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨d046615

⁨giterated-daemon/src/client.rs⁩ - ⁨3786⁩ bytes
Raw
1 use std::sync::Arc;
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_models::{
5 error::{IntoInternalError, OperationError},
6 instance::Instance,
7 object_backend::ObjectBackend,
8 };
9 use giterated_plugin::new_stack::{runtime_handler::RuntimeHandle, Runtime};
10 use giterated_protocol::{
11 handlers::{NetworkedObject, NetworkedOperation},
12 AuthenticatedPayload, StackOperationState,
13 };
14 use tokio::net::TcpStream;
15 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
16
17 pub async fn client_wrapper(
18 our_instance: Instance,
19 mut socket: WebSocketStream<TcpStream>,
20 runtime: Arc<Runtime>,
21 ) {
22 loop {
23 let message = socket.next().await;
24
25 if message.is_none() {
26 // Keep an eye out for this, I dont see why we shouldn't end the connection
27 unreachable!()
28 }
29
30 let message = message.unwrap();
31
32 let payload = match message {
33 Ok(message) => {
34 let payload = match message {
35 Message::Binary(payload) => payload,
36 Message::Ping(_) => {
37 let _ = socket.send(Message::Pong(vec![])).await;
38 continue;
39 }
40 Message::Close(_) => return,
41 _ => continue,
42 };
43
44 payload
45 }
46 Err(err) => {
47 // Connection error
48 warn!("A connection error has occured: {:?}", err);
49
50 return;
51 }
52 };
53
54 trace!("Read payload from client");
55
56 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
57 Ok(payload) => payload,
58 Err(e) => {
59 warn!(
60 "A network payload deserialization failure has occurred: {:?}",
61 e
62 );
63
64 continue;
65 }
66 };
67
68 trace!(
69 "Deserialized payload for operation {} from client",
70 payload.operation
71 );
72
73 todo!()
74
75 // let operation_state = StackOperationState {
76 // our_instance: our_instance.clone(),
77 // instance: None,
78 // user: None,
79 // };
80
81 // let result = handle_client_message(payload, operation_state, runtime.clone()).await;
82
83 // Grab operation errors so we can log them, they don't make it across the network
84 // if let Err(OperationError::Internal(internal_error)) = &result {
85 // error!("An internal error has occurred:\n{:?}", internal_error);
86 // }
87
88 // // Map error to the network variant
89 // let result = result.map_err(|e| e.into_network());
90
91 // socket
92 // .send(Message::Binary(bincode::serialize(&result).unwrap()))
93 // .await
94 // .expect("there was an error sending a message, this is a problem for the receiver");
95 }
96 }
97
98 pub async fn handle_client_message(
99 payload: AuthenticatedPayload,
100 operation_state: StackOperationState,
101 runtime: RuntimeHandle,
102 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
103 let mut networked_object = runtime
104 .get_object::<NetworkedObject>(&payload.object, &operation_state)
105 .await
106 .as_internal_error_with_context("handling client message")?;
107
108 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
109 payload.into_message();
110
111 let networked_operation = NetworkedOperation::new(
112 message.payload.name.clone(),
113 message.payload.payload.clone(),
114 );
115
116 trace!("Calling handler for networked operation");
117
118 networked_object
119 .request(networked_operation, &operation_state)
120 .await
121 }
122