use futures_util::{SinkExt, StreamExt}; use giterated_models::{ error::{IntoInternalError, OperationError}, instance::Instance, object_backend::ObjectBackend, }; use giterated_protocol::{AuthenticatedPayload, NetworkedObject, NetworkedOperation}; use giterated_stack::{GiteratedStack, StackOperationState}; use tokio::net::TcpStream; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; pub async fn client_wrapper( our_instance: Instance, mut socket: WebSocketStream, runtime: GiteratedStack, ) { loop { let message = socket.next().await; if message.is_none() { // Keep an eye out for this, I dont see why we shouldn't end the connection unreachable!() } let message = message.unwrap(); let payload = match message { Ok(message) => { let payload = match message { Message::Binary(payload) => payload, Message::Ping(_) => { let _ = socket.send(Message::Pong(vec![])).await; continue; } Message::Close(_) => return, _ => continue, }; payload } Err(err) => { // Connection error warn!("A connection error has occured: {:?}", err); return; } }; trace!("Read payload from client"); let payload = match bincode::deserialize::(&payload) { Ok(payload) => payload, Err(e) => { warn!( "A network payload deserialization failure has occurred: {:?}", e ); continue; } }; trace!( "Deserialized payload for operation {} from client", payload.operation ); let operation_state = StackOperationState { our_instance: our_instance.clone(), runtime: runtime.clone(), instance: None, user: None, }; let result = handle_client_message(payload, operation_state, runtime.clone()).await; // Grab operation errors so we can log them, they don't make it across the network if let Err(OperationError::Internal(internal_error)) = &result { error!("An internal error has occurred:\n{:?}", internal_error); } // Map error to the network variant let result = result.map_err(|e| e.into_network()); socket .send(Message::Binary(bincode::serialize(&result).unwrap())) .await .expect("there was an error sending a message, this is a problem for the receiver"); } } pub async fn handle_client_message( payload: AuthenticatedPayload, operation_state: StackOperationState, runtime: GiteratedStack, ) -> Result, OperationError>> { let mut networked_object = runtime .get_object::(&payload.object, &operation_state) .await .as_internal_error_with_context("handling client message")?; let message: giterated_protocol::GiteratedMessage = payload.into_message(); let networked_operation = NetworkedOperation::new( message.payload.name.clone(), message.payload.payload.clone(), ); trace!("Calling handler for networked operation"); networked_object .request(networked_operation, &operation_state) .await }