use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; use futures_util::{sink::SinkExt, StreamExt}; use giterated_stack::{ models::{ Error, GiteratedObject, GiteratedOperation, Instance, IntoInternalError, NetworkOperationError, OperationError, }, AnyFailure, AnyObject, AnyOperation, AnySuccess, GiteratedStack, ObjectOperationPair, OperationState, StackOperationState, SubstackBuilder, }; use serde::{Deserialize, Serialize}; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; use tracing::{info, trace, warn}; use crate::{ AuthenticatedPayload, AuthenticationSourceProviders, GiteratedMessage, NetworkOperationState, }; /// A Giterated substack that attempts to resolve with a remote, networked Giterated Daemon. /// /// # Usage /// /// Convert the [`NetworkedSubstack`] into a [`SubStackBuilder`] and merge it with /// a runtime. /// /// ``` /// let mut runtime = GiteratedStack::default(); /// /// let network_substack = NetworkedSubstack::default(); /// /// runtime.merge_builder(network_substack.into_substack()); /// ``` /// /// To handle messages that are sourced from the network, use [`NetworkedObject`] and [`NetworkedOperation`]. /// /// These are wrappers around the raw payloads from the network. The return payload from handling [`NetworkedOperation`] is then /// sent back to the requester. /// /// ``` /// // Start with a network payload /// let network_payload: AuthenticatedPayload = { todo!() }; /// /// let networked_object = runtime.get_object::(network_payload.object).await?; /// let operation_name = payload.operation; /// let networked_operation = NetworkedOperation(payload); /// /// // Operation state depends on the authentication in the payload, it /// // isn't relevant here. /// let operation_state = StackOperationState::default(); /// /// let result = networked_object.request(networked_operation, &operation_state); /// /// // `result` is Result, OperationError> which is also the type that /// // giterated's networked protocol uses for responses, so you can send it directly. /// ``` /// /// TODO: The above docs are 100% false about the network protocol type #[derive(Clone)] pub struct NetworkedSubstack { pub home_uri: Option, } impl Default for NetworkedSubstack { fn default() -> Self { Self { home_uri: None } } } impl NetworkedSubstack { pub fn into_server_substack(self) -> SubstackBuilder { let mut stack = SubstackBuilder::new(self); stack.object::(); stack.operation(handle_network_operation); // TODO: optional stack.dynamic_operation(try_handle_with_remote); stack } pub fn into_client_substack(self) -> SubstackBuilder { let mut stack: SubstackBuilder = SubstackBuilder::new(self); stack.object::(); // TODO: optional stack.dynamic_operation(try_handle_with_remote); stack } } pub async fn handle_network_operation( object: NetworkedObject, operation: NetworkedOperation, _state: NetworkedSubstack, OperationState(operation_state): OperationState, stack: GiteratedStack, ) -> Result, OperationError>> { trace!("Handle network operation {}", operation.name); let mut result = None; for (_, object_meta) in &stack.inner.metadata.objects { if object_meta.name == NetworkedObject::object_name() { continue; } if let Ok(object) = (object_meta.from_str)(&object.0) { result = Some((object, object_meta)); break; } } let (object, object_meta) = result.ok_or_else(|| OperationError::Unhandled)?; trace!( "Resolved object type {} for network operation {}.", object_meta.name, operation.name ); let operation_meta = stack .inner .metadata .operations .get(&ObjectOperationPair { object_name: &object_meta.name, operation_name: &operation.name, }) .ok_or_else(|| OperationError::Unhandled)?; trace!( "Resolved operation {}::{} for network operation.", object_meta.name, operation_meta.name ); info!("Operation: {:?}", operation.payload); let operation = (operation_meta.deserialize)(&operation.payload) .as_internal_error_with_context(format!( "deserializing object operation {}::{}", object_meta.name, operation_meta.name ))?; trace!( "Deserialized operation {}::{} for network operation.", object_meta.name, operation_meta.name ); let result = stack .new_operation_func(object, operation, operation_state) .await; match result { Ok(success) => { trace!( "Network operation {}::{} was successful", object_meta.name, operation_meta.name ); Ok( (operation_meta.serialize_success)(success).as_internal_error_with_context( format!( "serializing success for object operation {}::{}", object_meta.name, operation_meta.name ), )?, ) } Err(err) => { trace!( "Network operation {}::{} failed", object_meta.name, operation_meta.name ); Err(match err { OperationError::Operation(failure) => OperationError::Operation( (operation_meta.serialize_error)(failure).as_internal_error_with_context( format!( "serializing error for object operation {}::{}", object_meta.name, operation_meta.name ), )?, ), OperationError::Internal(internal) => { warn!( "A networked operation encountered an internal error: {:#?}", internal ); OperationError::Internal(internal) } OperationError::Unhandled => OperationError::Unhandled, }) } } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkedObject(pub String); impl FromStr for NetworkedObject { type Err = (); fn from_str(s: &str) -> Result { Ok(NetworkedObject(s.to_string())) } } impl Display for NetworkedObject { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(&self.0) } } impl GiteratedObject for NetworkedObject { fn object_name() -> &'static str { "networked_object" } fn from_object_str(_object_str: &str) -> Result { todo!() } fn home_uri(&self) -> String { todo!() } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkedOperation { pub name: String, pub payload: Vec, } impl NetworkedOperation { pub fn new(name: String, payload: Vec) -> Self { Self { name, payload } } } impl GiteratedOperation for NetworkedOperation { type Success = Vec; type Failure = Vec; fn operation_name() -> &'static str { "networked_operation" } } /// Handler which will attempt to resolve any operation that doesn't resolve locally /// against a remote instance. pub async fn try_handle_with_remote( object: AnyObject, operation: AnyOperation, state: NetworkedSubstack, _stack: GiteratedStack, ) -> Result> { if object.is::() { return Err(OperationError::Unhandled); } trace!( "Try handling object operation {}::{} with remote", object.kind(), operation.kind().operation_name ); // TODO: // Ideally we support pass-through on object types that aren't used locally. // For now, we aren't worrying about that. let object_meta = object.meta().clone(); let operation_meta = operation.meta().clone(); trace!( "Serializing with {}::{}", operation.kind().object_name, operation.kind().operation_name ); let object_home_uri = (object_meta.home_uri)(object.clone()); if let Some(home_uri) = state.home_uri { if home_uri == object_home_uri { // This isn't a remote request, requests aren't supposed to hit this layer // if they're not remote. warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(), operation.kind().operation_name); return Err(OperationError::Unhandled); } } trace!( "Handling object operation {}::{} sending payload", object.kind(), operation.kind().operation_name ); let object = NetworkedObject((object_meta.to_str)(object).to_string()); let operation = NetworkedOperation::new( operation_meta.name.clone(), (operation_meta.serialize)(operation.clone()).as_internal_error_with_context(format!( "try serializing object operation {}::{} for remote", object_meta.name, operation_meta.name ))?, ); // let authenticated = Authenticated::new(object, operation); let message = GiteratedMessage { object, operation: NetworkedOperation::operation_name().to_string(), payload: operation, }; let authenticated = Authenticated::new(message); let mut socket: WebSocketStream> = connect_to( &Instance::from_str(&object_home_uri).unwrap(), &Some(("127.0.0.1:1111").parse().unwrap()), ) .await .as_internal_error()?; // TODO AUTH let result: Result, OperationError>> = send_expect(&mut socket, authenticated).await; match result { Ok(success) => { let success = (operation_meta.deserialize_success)(success) .as_internal_error_with_context(format!( "try deserializing object operation success {}::{} for remote", object_meta.name, operation_meta.name ))?; Ok(success) } Err(err) => Err(match err { OperationError::Operation(failure) => { let failure = (operation_meta.deserialize_failure)(failure) .as_internal_error_with_context(format!( "try deserializing object operation failure {}::{} for remote", object_meta.name, operation_meta.name ))?; OperationError::Operation(failure) } OperationError::Internal(internal) => OperationError::Internal(internal), OperationError::Unhandled => OperationError::Unhandled, }), } } type Socket = WebSocketStream>; async fn connect_to( instance: &Instance, socket_addr: &Option, ) -> Result { if let Some(addr) = socket_addr { info!( "Connecting to {}", format!("ws://{}/.giterated/daemon/", addr) ); let (websocket, _response) = connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; info!("Connection established with {}", addr); Ok(websocket) } else { info!( "Connecting to {}", format!("wss://{}/.giterated/daemon/", instance.0) ); let (websocket, _response) = connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?; info!("Connection established with {}", instance.0); Ok(websocket) } } async fn send_expect>( socket: &mut Socket, message: Authenticated, ) -> Result, OperationError>> { let payload = bincode::serialize(&message.into_payload()).unwrap(); socket .send(Message::Binary(payload)) .await .as_internal_error()?; while let Some(message) = socket.next().await { let payload = match message.as_internal_error()? { Message::Binary(payload) => payload, _ => { continue; } }; let raw_result = bincode::deserialize::, NetworkOperationError>>>(&payload) .map_err(|e| OperationError::Internal(Error::from(e)))?; trace!( "Received response for networked operation {}::{}.", O::object_name(), D::operation_name() ); return match raw_result { Ok(success) => Ok(success), Err(err) => Err(match err { NetworkOperationError::Operation(operation_error) => { OperationError::Operation(operation_error) } NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()), NetworkOperationError::Unhandled => OperationError::Unhandled, }), }; } panic!() } #[derive(Debug, Clone, thiserror::Error)] #[error("a remote internal error occurred")] pub struct RemoteError; #[derive(Debug, thiserror::Error)] #[error("a remote internal error occurred")] pub struct NetworkError; #[derive(Debug)] pub struct Authenticated> { pub source: Vec>, pub message: GiteratedMessage, } impl> Authenticated { pub fn new(message: GiteratedMessage) -> Self { Self { source: vec![], message, } } pub fn append_authentication( &mut self, authentication: Arc, ) { self.source.push(authentication); } pub fn into_payload(mut self) -> AuthenticatedPayload { let payload = serde_json::to_vec(&self.message.payload).unwrap(); AuthenticatedPayload { object: self.message.object.to_string(), operation: self.message.operation, source: self .source .drain(..) .map(|provider| provider.as_ref().authenticate_all(&payload)) .flatten() .collect::>(), payload, } } }