use std::any::Any; use std::fmt::Debug; use std::{collections::HashMap, sync::Arc}; use giterated_models::authenticated::AuthenticatedPayload; use giterated_models::error::{GetValueError, IntoInternalError}; use giterated_models::message::GiteratedMessage; use giterated_models::object::NetworkAnyObject; use giterated_models::operation::NetworkAnyOperation; use giterated_models::settings::{GetSettingError, Setting}; use giterated_models::value::{GetValue, GiteratedObjectValue}; use giterated_models::{ error::OperationError, object::{GiteratedObject, Object, ObjectRequest, ObjectRequestError}, object_backend::ObjectBackend, operation::GiteratedOperation, settings::{GetSetting, SetSetting}, }; use serde_json::Value; use tracing::{info, trace}; use crate::handler::HandlerTree; use crate::provider::MetadataProvider; use crate::{ AnyFailure, AnyObject, AnyOperation, AnySetting, AnySuccess, AnyValue, HandlerResolvable, HandlerWrapper, MissingValue, ObjectOperationPair, ObjectSettingPair, ObjectValuePair, RuntimeMetadata, StackOperationState, SubstackBuilder, }; pub type OperationHandler = HandlerWrapper<(AnyObject, AnyOperation), AnySuccess, AnyFailure>; pub type ValueGetter = HandlerWrapper<(AnyObject,), AnyValue, AnyFailure>; pub type SettingGetter = HandlerWrapper<(AnyObject,), AnySetting, AnyFailure>; pub type ValueChange = HandlerWrapper<(AnyObject, AnyValue), (), anyhow::Error>; pub type SettingChange = HandlerWrapper<(AnyObject, AnySetting), (), anyhow::Error>; #[derive(Default)] pub struct GiteratedStack { operation_handlers: HashMap, HandlerTree>, value_getters: HashMap, ValueGetter>, setting_getters: HashMap<&'static str, SettingGetter>, value_change: HashMap, ValueChange>, setting_change: HashMap, SettingChange>, metadata_providers: Vec>, pub metadata: RuntimeMetadata, } impl Debug for GiteratedStack { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GiteratedStack").finish() } } impl GiteratedStack { pub fn merge_builder( &mut self, mut builder: SubstackBuilder, ) -> &mut Self { for (target, handler) in builder.operation_handlers { let tree = self.get_or_create_tree(&target); tree.push(handler); } for (target, handler) in builder.value_getters { assert!(self.value_getters.insert(target, handler).is_none()); } for (target, handler) in builder.setting_getters { assert!(self.setting_getters.insert(target, handler).is_none()); } for (target, handler) in builder.value_change { self.value_change.insert(target, handler); } for (target, handler) in builder.setting_change { self.setting_change.insert(target, handler); } self.metadata_providers .append(&mut builder.metadata_providers); self.metadata.append(builder.metadata); self } pub async fn value_update( &self, object: O, new_value: V, operation_state: &StackOperationState, ) where O: GiteratedObject + 'static, V: GiteratedObjectValue + 'static, { trace!("value updated {}::{}", O::object_name(), V::value_name()); let target = ObjectValuePair::from_types::(); if let Some(handler) = self.value_change.get(&target) { // TODO let _ = handler .handle( (AnyObject(Arc::new(object)), AnyValue(Arc::new(new_value))), operation_state.clone(), ) .await; } } pub async fn setting_update( &self, object: O, new_setting: S, operation_state: &StackOperationState, ) where O: GiteratedObject + 'static, S: Setting + 'static, { trace!("setting updated {}::{}", O::object_name(), S::name()); let target = ObjectSettingPair::from_types::(); if let Some(handler) = self.setting_change.get(&target) { let _ = handler .handle( ( AnyObject(Arc::new(object)), AnySetting(Arc::new(new_setting)), ), operation_state.clone(), ) .await; } } pub async fn new_object(&self, _new_object: &O, _operation_state: &StackOperationState) where O: GiteratedObject, { // TODO } /// Writes a setting for the specified object. pub async fn write_setting( &self, object: &O, setting: S, ) -> Result<(), OperationError<()>> where O: GiteratedObject + 'static + Clone, S: Setting + 'static + Clone, { for provider in self.metadata_providers.iter() { if provider.provides_for(object as &dyn Any) { let setting_meta = self .metadata .settings .get(&ObjectSettingPair::from_types::()) .ok_or_else(|| OperationError::Unhandled)?; let object_meta = self .metadata .objects .get(O::object_name()) .ok_or_else(|| OperationError::Unhandled)?; let result = provider .write( AnyObject(Arc::new(object.clone())), object_meta, AnySetting(Arc::new(setting)), setting_meta, ) .await .as_internal_error_with_context(format!("writing setting {}", S::name())); return result; } } Err(OperationError::Unhandled) } /// Gets a setting for the specified object. pub async fn new_get_setting(&self, object: &O) -> Result> where O: GiteratedObject + 'static + Clone, S: Setting + 'static, { for provider in self.metadata_providers.iter() { if provider.provides_for(object as &dyn Any) { trace!( "Resolving setting {} for object {} from provider.", S::name(), O::object_name() ); let setting_meta = self .metadata .settings .get(&ObjectSettingPair::from_types::()) .ok_or_else(|| OperationError::Unhandled)?; let object_meta = self .metadata .objects .get(O::object_name()) .ok_or_else(|| OperationError::Unhandled)?; let value = provider .read( AnyObject(Arc::new(object.clone())), object_meta, setting_meta, ) .await .as_internal_error_with_context(format!("getting setting {}", S::name()))?; return serde_json::from_value(value) .as_internal_error_with_context("deserializing setting"); } } trace!( "No provider registered for setting {} and object {}", S::name(), O::object_name() ); Err(OperationError::Unhandled) } fn get_or_create_tree( &mut self, target: &ObjectOperationPair<'static>, ) -> &mut HandlerTree { if self.operation_handlers.contains_key(target) { self.operation_handlers.get_mut(target).unwrap() } else { self.operation_handlers .insert(target.clone(), HandlerTree::default()); self.operation_handlers.get_mut(target).unwrap() } } } impl GiteratedStack { /// Handles a giterated network message, returning either a raw success /// payload or a serialized error payload. pub async fn handle_network_message( &self, message: AuthenticatedPayload, operation_state: &StackOperationState, ) -> Result, OperationError>> { let message: GiteratedMessage = message.into_message(); // Deserialize the object, also getting the object type's name let (object_type, object) = { let mut result = None; for (object_type, object_meta) in self.metadata.objects.iter() { if let Ok(object) = (object_meta.from_str)(&message.object.0) { result = Some((object_type.clone(), object)); break; } } result } .ok_or_else(|| OperationError::Unhandled)?; trace!( "Handling network message {}::<{}>", message.operation, object_type ); if message.operation == "get_value" { // Special case let operation: GetValue = serde_json::from_slice(&message.payload.0).unwrap(); let result = self .network_get_value( object, object_type.clone(), operation.clone(), operation_state, ) .await; // In the case of internal errors, attach context let result = result.map_err(|err| match err { OperationError::Operation(operation) => OperationError::Operation(operation), OperationError::Internal(internal) => { OperationError::Internal(internal.context(format!( "{}::get_value::<{}> outcome", object_type, operation.value_name ))) } OperationError::Unhandled => OperationError::Unhandled, }); let result = result.map(|r| serde_json::to_vec(&r).unwrap()); return result; } else if message.operation == "get_setting" { let operation: GetSetting = serde_json::from_slice(&message.payload.0).unwrap(); let setting_meta = self .metadata .settings .get(&ObjectSettingPair { object_kind: &object_type, setting_name: &operation.setting_name, }) .ok_or_else(|| OperationError::Unhandled)?; let raw_result = self .get_setting( object, object_type.clone(), operation.clone(), operation_state, ) .await; return match raw_result { Ok(success) => { // Success is the setting type, serialize it let serialized = (setting_meta.serialize)(success).unwrap(); Ok(serde_json::to_vec(&serialized).unwrap()) } Err(err) => Err(match err { OperationError::Operation(failure) => { // We know how to resolve this type let failure: GetSettingError = *failure.downcast().unwrap(); OperationError::Operation(serde_json::to_vec(&failure).unwrap()) } OperationError::Internal(internal) => { OperationError::Internal(internal.context(format!( "{}::get_setting::<{}> handler outcome", object_type, setting_meta.name ))) } OperationError::Unhandled => OperationError::Unhandled, }), }; } else if message.operation == "set_setting" { let operation: SetSetting = serde_json::from_slice(&message.payload.0).unwrap(); trace!( "Handling network {}::set_setting for {}", object_type, operation.setting_name ); let setting_meta = self .metadata .settings .get(&ObjectSettingPair { object_kind: &object_type, setting_name: &operation.setting_name, }) .unwrap(); let setting = (setting_meta.deserialize)(operation.value) .as_internal_error_with_context(format!( "deserializing setting {} for object {}", operation.setting_name, object_type ))?; trace!( "Deserialized setting {} for object {}", operation.setting_name, object_type, ); for provider in self.metadata_providers.iter() { if provider.provides_for(object.0.as_ref()) { trace!( "Resolved setting provider for setting {} for object {}", operation.setting_name, object_type, ); let object_meta = self .metadata .objects .get(&object_type) .ok_or_else(|| OperationError::Unhandled)?; let raw_result = provider .write(object.clone(), object_meta, setting.clone(), setting_meta) .await; return match raw_result { Ok(_) => { (setting_meta.setting_updated)( object, setting, operation_state.runtime.clone(), operation_state, ) .await; Ok(serde_json::to_vec(&()).unwrap()) } Err(e) => Err(OperationError::Internal(e.context(format!( "writing object {} setting {}", object_type, operation.setting_name )))), }; } trace!( "Failed to resolve setting provider for setting {} for object {}", operation.setting_name, object_type, ); } } let target = ObjectOperationPair { object_name: &object_type, operation_name: &message.operation, }; // Resolve the target operations from the handlers table let handler = self .operation_handlers .get(&target) .ok_or_else(|| OperationError::Unhandled)?; trace!( "Resolved operation handler for network message {}::<{}>", message.operation, object_type ); // Deserialize the operation let meta = self .metadata .operations .get(&target) .ok_or_else(|| OperationError::Unhandled)?; let operation = (meta.deserialize)(&message.payload.0).as_internal_error_with_context(format!( "deserializing operation {}::{}", target.object_name, target.operation_name ))?; trace!( "Deserialized operation for network message {}::<{}>", message.operation, object_type ); trace!( "Calling handler for network message {}::<{}>", message.operation, object_type ); // Get the raw result of the operation, where the return values are boxed. let raw_result = handler .handle((object.clone(), operation.clone()), operation_state.clone()) .await; trace!( "Finished handling network message {}::<{}>", message.operation, object_type ); // Deserialize the raw result for the network match raw_result { Ok(success) => Ok((meta.serialize_success)(success).as_internal_error()?), Err(err) => Err(match err { OperationError::Operation(failure) => { OperationError::Operation((meta.serialize_error)(failure).as_internal_error()?) } OperationError::Internal(internal) => { OperationError::Internal(internal.context(format!( "operation {}::{} handler outcome", target.object_name, target.operation_name ))) } OperationError::Unhandled => OperationError::Unhandled, }), } } pub async fn network_get_value( &self, object: AnyObject, object_kind: String, operation: GetValue, operation_state: &StackOperationState, ) -> Result, OperationError>> { trace!("Handling network get_value for {}", operation.value_name); let value_meta = self .metadata .values .get(&ObjectValuePair { object_kind: &object_kind, value_kind: &operation.value_name, }) .ok_or_else(|| OperationError::Unhandled)?; for (target, getter) in self.value_getters.iter() { if target.object_kind != object_kind { continue; } if target.value_kind != operation.value_name { continue; } return match getter .handle((object.clone(),), operation_state.clone()) .await { Ok(success) => { // Serialize success, which is the value type itself let serialized = (value_meta.serialize)(success).as_internal_error()?; Ok(serialized) } Err(err) => Err(match err { OperationError::Operation(failure) => { // Failure is sourced from GetValue operation, but this is hardcoded for now let failure: &GetValueError = failure.0.downcast_ref().unwrap(); OperationError::Operation(serde_json::to_vec(&failure).as_internal_error()?) } OperationError::Internal(internal) => OperationError::Internal(internal), OperationError::Unhandled => OperationError::Unhandled, }), }; } Err(OperationError::Unhandled) } pub async fn get_setting( &self, object: AnyObject, object_kind: String, operation: GetSetting, _operation_state: &StackOperationState, ) -> Result>> { trace!( "Handling network {}::get_setting for {}", object_kind, operation.setting_name ); for provider in self.metadata_providers.iter() { if provider.provides_for(object.0.as_ref()) { let setting_meta = self .metadata .settings .get(&ObjectSettingPair { object_kind: &object_kind, setting_name: &operation.setting_name, }) .ok_or_else(|| OperationError::Unhandled)?; let object_meta = self .metadata .objects .get(&object_kind) .ok_or_else(|| OperationError::Unhandled)?; let result = provider .read(object.clone(), object_meta, setting_meta) .await .as_internal_error_with_context(format!( "reading setting {}", operation.setting_name ))?; return (setting_meta.deserialize)(result).as_internal_error_with_context(format!( "deserializing setting {}", operation.setting_name )); } } trace!("setting {} doesn't exist", operation.setting_name); Err(OperationError::Unhandled) } pub async fn network_set_setting( &self, object: AnyObject, object_kind: String, operation: SetSetting, operation_state: &StackOperationState, ) -> Result, OperationError>> { trace!( "Handling network {}::set_setting for {}", object_kind, operation.setting_name ); let target = ObjectSettingPair { object_kind: &object_kind, setting_name: &operation.setting_name, }; let handler = self.setting_change.get(&target).unwrap(); let setting_meta = self .metadata .settings .get(&ObjectSettingPair { object_kind: &object_kind, setting_name: &operation.setting_name, }) .ok_or_else(|| OperationError::Unhandled)?; let setting = (setting_meta.deserialize)(operation.value).as_internal_error_with_context(format!( "deserializing setting {} for object {}", operation.setting_name, object_kind ))?; let raw_result = handler .handle((object, setting.clone()), operation_state.clone()) .await; match raw_result { Ok(_) => { // Serialize success, which is the value type itself let serialized = serde_json::to_vec(&()).as_internal_error()?; Ok(serialized) } Err(err) => Err(match err { OperationError::Operation(operation) => OperationError::Internal(operation), OperationError::Internal(internal) => OperationError::Internal(internal), OperationError::Unhandled => OperationError::Unhandled, }), } } } #[async_trait::async_trait(?Send)] impl ObjectBackend for Arc { async fn object_operation( &self, in_object: O, operation_name: &str, payload: D, operation_state: &StackOperationState, ) -> Result> where O: GiteratedObject + Debug + 'static, D: GiteratedOperation + Debug + 'static, D::Success: Clone, D::Failure: Clone, { // Erase object and operation types. let object = AnyObject(Arc::new(in_object.clone()) as Arc); let operation = AnyOperation(Arc::new(payload) as Arc); // We need to hijack get_value, set_setting, and get_setting. if operation_name == "get_value" { let get_value = operation .0 .downcast_ref::() .ok_or_else(|| OperationError::Unhandled)?; let value_meta = self .metadata .values .get(&ObjectValuePair { object_kind: O::object_name(), value_kind: &get_value.value_name, }) .ok_or_else(|| OperationError::Unhandled)?; let value_name = value_meta.name.clone(); trace!( "Handling get_value for {}::{}", O::object_name(), value_name ); for (target, getter) in self.value_getters.iter() { if target.object_kind != O::object_name() { continue; } if target.value_kind != value_name { continue; } trace!( "Calling handler for get_value {}::{}", O::object_name(), value_name ); return match getter .handle((object.clone(),), operation_state.clone()) .await { Ok(success) => Ok(*(Box::new((value_meta.serialize)(success).unwrap()) as Box) .downcast() .unwrap()), Err(err) => Err(match err { OperationError::Operation(failure) => OperationError::Operation( failure.0.downcast_ref::().unwrap().clone(), ), OperationError::Internal(internal) => { OperationError::Internal(internal.context(format!( "{}::get_value::<{}> handler outcome", O::object_name(), value_name ))) } OperationError::Unhandled => OperationError::Unhandled, }), }; } } else if operation.0.is::() { let get_setting: &GetSetting = operation.0.downcast_ref().unwrap(); let setting_name = get_setting.setting_name.clone(); let raw_result = self .get_setting( object, O::object_name().to_string(), get_setting.clone(), operation_state, ) .await; return match raw_result { Ok(success) => { // Success is the setting type, serialize it // let serialized = (setting_meta.serialize)(success).unwrap(); // Ok(serde_json::to_vec(&serialized).unwrap()) Ok(success.0.downcast_ref::().unwrap().clone()) } Err(err) => Err(match err { OperationError::Operation(failure) => { // We know this is the right type OperationError::Operation(*failure.downcast().unwrap()) } OperationError::Internal(internal) => { OperationError::Internal(internal.context(format!( "{}::get_setting::<{}> handler outcome", O::object_name(), setting_name ))) } OperationError::Unhandled => OperationError::Unhandled, }), }; } else if operation.0.is::() { todo!() } else if operation.0.is::() { todo!() } // Resolve the operation from the known operations table. let operation_type = { let mut operation_type = None; for (target, operation_meta) in self.metadata.operations.iter() { // Skip elements that we know will not match if target.object_name != O::object_name() { continue; } if target.operation_name != operation_name { continue; } if (operation_meta.any_is_same)(&operation) { operation_type = Some(target.clone()); break; } } operation_type } .ok_or_else(|| OperationError::Unhandled)?; // Resolve the handler from our handler tree let handler_tree = self .operation_handlers .get(&operation_type) .ok_or_else(|| OperationError::Unhandled)?; let raw_result = handler_tree .handle((object, operation), operation_state.clone()) .await; // Convert the dynamic result back into its concrete type match raw_result { Ok(result) => Ok(result.0.downcast_ref::().unwrap().clone()), Err(err) => Err(match err { OperationError::Internal(internal) => { OperationError::Internal(internal.context(format!( "operation {}::{} handler outcome", operation_type.object_name, operation_type.operation_name ))) } OperationError::Operation(boxed_error) => OperationError::Operation( boxed_error.0.downcast_ref::().unwrap().clone(), ), OperationError::Unhandled => OperationError::Unhandled, }), } } async fn get_object( &self, object_str: &str, _operation_state: &StackOperationState, ) -> Result, OperationError> where O: GiteratedObject + Debug + 'static, { // TODO: Authorization? for (_object_name, object_meta) in self.metadata.objects.iter() { if let Ok(object) = (object_meta.from_str)(object_str) { return Ok(unsafe { Object::new_unchecked( object.0.downcast_ref::().unwrap().clone(), self.clone(), ) }); } } Err(OperationError::Unhandled) } } /// Defines a type that is a valid Giterated runtime state. /// /// This allows for extraction of state in handlers, based on a /// [`FromOperationState`] impl on (what is in this case) [`Self`]. pub trait GiteratedStackState: Send + Sync + Clone {} impl GiteratedStackState for T {} #[async_trait::async_trait(?Send)] impl HandlerResolvable for Option where T: HandlerResolvable, { type Error = MissingValue; async fn from_handler_state( required_parameters: &R, operation_state: &S, ) -> Result, MissingValue> { Ok(T::from_handler_state(required_parameters, operation_state) .await .ok()) } }