use std::{fmt::Debug, str::FromStr, sync::Arc}; use futures_util::{SinkExt, StreamExt}; use giterated_models::{ authenticated::{Authenticated, AuthenticationSourceProviders}, error::OperationError, error::{IntoInternalError, NetworkOperationError}, message::GiteratedMessage, object::{GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse}, object_backend::ObjectBackend, operation::GiteratedOperation, }; use serde::de::DeserializeOwned; use tokio_tungstenite::tungstenite::Message; use crate::{DaemonConnectionPool, Socket}; #[derive(Clone)] pub struct NetworkOperationState { authentication: Vec>, } impl NetworkOperationState { pub fn new() -> Self { Self { authentication: vec![], } } pub fn authenticate( &mut self, source: S, ) { self.authentication.push(Arc::new(source)) } } #[async_trait::async_trait(?Send)] impl ObjectBackend for DaemonConnectionPool { async fn object_operation + Debug>( &self, object: O, operation: &str, payload: D, operation_state: &NetworkOperationState, ) -> Result> { let message = GiteratedMessage { object, operation: operation.to_string(), payload, }; let mut connection = self.0.get().await.map_err(|e| { OperationError::Internal(anyhow::Error::from(ConnectionFailed(e.to_string()))) })?; let mut authenticated = Authenticated::new(message); for authentication in &operation_state.authentication { authenticated.append_authentication(authentication.clone()); } send_expect(&mut connection, authenticated).await } async fn get_object( &self, object_str: &str, operation_state: &NetworkOperationState, ) -> Result, OperationError> { let operation = ObjectRequest(object_str.to_string()); info!("Get object: {:?}", operation); let message = GiteratedMessage { object: self.0.manager().target_instance.clone(), operation: ObjectRequest::operation_name().to_string(), payload: operation, }; let mut connection = self.0.get().await.map_err(|e| { OperationError::Internal(anyhow::Error::from(ConnectionFailed(e.to_string()))) })?; let mut authenticated = Authenticated::new(message); for authentication in &operation_state.authentication { authenticated.append_authentication(authentication.clone()); } let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?; if let Ok(object) = O::from_str(&object_raw.0) { Ok(unsafe { Object::new_unchecked(object, self.clone()) }) } else { panic!() } } } async fn send_expect< O: GiteratedObject, D: GiteratedOperation, B: DeserializeOwned, R: DeserializeOwned, >( socket: &mut Socket, message: Authenticated, ) -> Result> { let payload = bincode::serialize(&message.into_payload()).unwrap(); socket .send(Message::Binary(payload)) .await .as_internal_error()?; while let Some(message) = socket.next().await { let payload = match message.as_internal_error()? { Message::Binary(payload) => payload, _ => { continue; } }; let raw_result = bincode::deserialize::, NetworkOperationError>>>(&payload) .map_err(|e| OperationError::Internal(anyhow::Error::from(e)))?; // Map ok let raw_result = match raw_result { Ok(raw) => Ok(serde_json::from_slice(&raw) .map_err(|e| OperationError::Internal(anyhow::Error::from(e)))?), Err(err) => Err(match err { NetworkOperationError::Operation(err) => OperationError::Operation( serde_json::from_slice(&err) .map_err(|e| OperationError::Internal(anyhow::Error::from(e)))?, ), NetworkOperationError::Internal => { OperationError::Internal(anyhow::Error::from(NetworkError)) } NetworkOperationError::Unhandled => OperationError::Unhandled, }), }; return raw_result; } panic!() } #[derive(Debug, thiserror::Error)] #[error("a remote internal error occurred")] pub struct NetworkError; #[derive(Debug, thiserror::Error)] #[error("failed to get connection from pool: {0}")] pub struct ConnectionFailed(String);