use std::{fmt::Display, net::SocketAddr, str::FromStr}; use anyhow::Error; use futures_util::{SinkExt, StreamExt}; use giterated_models::{ error::{NetworkOperationError, OperationError}, instance::Instance, object::GiteratedObject, operation::GiteratedOperation, }; use giterated_plugin::{abi::vtable::runtime::RuntimeHandle, Failure, Object, Operation, Success}; use serde::{Deserialize, Serialize}; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; use crate::{Authenticated, GiteratedMessage, ProtocolState, RemoteError}; pub async fn handle_network_operation( _state: ProtocolState, object: NetworkedObject, operation: NetworkedOperation, runtime: RuntimeHandle, ) -> Result, OperationError>> { trace!("Handle network operation {}", operation.name); runtime .handle_serialized(&object.0, &operation.name, &operation.payload) .await } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkedObject(pub String); impl FromStr for NetworkedObject { type Err = (); fn from_str(s: &str) -> Result { Ok(NetworkedObject(s.to_string())) } } impl Display for NetworkedObject { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(&self.0) } } impl GiteratedObject for NetworkedObject { fn object_name() -> &'static str { "networked_object" } fn from_object_str(_object_str: &str) -> Result { todo!() } fn home_uri(&self) -> String { todo!() } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkedOperation { pub name: String, pub payload: Vec, } impl NetworkedOperation { pub fn new(name: String, payload: Vec) -> Self { Self { name, payload } } } impl GiteratedOperation for NetworkedOperation { type Success = Vec; type Failure = Vec; fn operation_name() -> &'static str { "networked_operation" } } /// Handler which will attempt to resolve any operation that doesn't resolve locally /// against a remote instance. pub async fn try_handle_with_remote( state: ProtocolState, object: Object, operation: Operation, _runtime: RuntimeHandle, ) -> Result> { // if object.is::() { // return Err(OperationError::Unhandled); // } // trace!( // "Try handling object operation {}::{} with remote", // object.kind(), // operation.kind().operation_name // ); // TODO: // Ideally we support pass-through on object types that aren't used locally. // For now, we aren't worrying about that. // let object_meta = object.vtable(); let operation_meta = operation.vtable(); // trace!( // "Serializing with {}::{}", // operation.kind().object_name, // operation.kind().operation_name // ); let object_home_uri = object.home_uri(); if let Some(home_uri) = state.home_uri { if home_uri == object_home_uri { // This isn't a remote request, requests aren't supposed to hit this layer // if they're not remote. // warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(), // operation.kind().operation_name); return Err(OperationError::Unhandled); } } // trace!( // "Handling object operation {}::{} sending payload", // object.kind(), // operation.kind().operation_name // ); // let object = NetworkedObject(unsafe { (object_meta.to_str)(object).as_ref().to_string() }); let object = todo!(); let payload = operation.serialize(); let operation = NetworkedOperation::new(operation.operation_kind().to_string(), payload); // let authenticated = Authenticated::new(object, operation); let message = GiteratedMessage { object, operation: NetworkedOperation::operation_name().to_string(), payload: operation, }; let authenticated = Authenticated::new(message); let mut socket: WebSocketStream> = connect_to( &Instance::from_str(&object_home_uri).unwrap(), &Some(("127.0.0.1:1111").parse().unwrap()), ) .await .unwrap(); // TODO AUTH let result: Result, OperationError>> = send_expect(&mut socket, authenticated).await; match result { Ok(success) => { let success = unsafe { operation_meta.deserialize_success(&success) } .map_err(|e| e.into_internal())?; Ok(success) } Err(err) => Err(match err { OperationError::Operation(failure) => { let failure = unsafe { operation_meta.deserialize_failure(&failure) } .map_err(|e| e.into_internal())?; OperationError::Operation(failure) } OperationError::Internal(internal) => OperationError::Internal(internal), OperationError::Unhandled => OperationError::Unhandled, }), } } type Socket = WebSocketStream>; async fn connect_to( instance: &Instance, socket_addr: &Option, ) -> Result { if let Some(addr) = socket_addr { info!( "Connecting to {}", format!("ws://{}/.giterated/daemon/", addr) ); let (websocket, _response) = connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; info!("Connection established with {}", addr); Ok(websocket) } else { info!( "Connecting to {}", format!("wss://{}/.giterated/daemon/", instance.0) ); let (websocket, _response) = connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?; info!("Connection established with {}", instance.0); Ok(websocket) } } async fn send_expect>( socket: &mut Socket, message: Authenticated, ) -> Result, OperationError>> { let payload = bincode::serialize(&message.into_payload()).unwrap(); socket.send(Message::Binary(payload)).await.unwrap(); while let Some(message) = socket.next().await { let payload = match message.unwrap() { Message::Binary(payload) => payload, _ => { continue; } }; let raw_result = bincode::deserialize::, NetworkOperationError>>>(&payload) .map_err(|e| OperationError::Internal(Error::from(e)))?; trace!( "Received response for networked operation {}::{}.", O::object_name(), D::operation_name() ); return match raw_result { Ok(success) => Ok(success), Err(err) => Err(match err { NetworkOperationError::Operation(operation_error) => { OperationError::Operation(operation_error) } NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()), NetworkOperationError::Unhandled => OperationError::Unhandled, }), }; } panic!() }