pub mod handler; pub mod runtime; pub mod state; pub mod update; use std::{ any::Any, collections::HashMap, future::Future, ops::Deref, pin::Pin, str::FromStr, sync::Arc, }; use futures_util::FutureExt; use giterated_models::{ error::OperationError, instance::{ AuthenticationTokenRequest, Instance, RegisterAccountRequest, RepositoryCreateRequest, }, object::{ AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse, }, object_backend::ObjectBackend, operation::{AnyOperation, GiteratedOperation}, repository::{AccessList, Repository}, settings::{AnySetting, GetSetting, SetSetting, Setting}, user::User, value::{AnyValue, GetValue, GiteratedObjectValue}, }; use handler::GiteratedBackend; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use state::HandlerState; use tokio::{ sync::{mpsc::channel, Mutex}, task::JoinHandle, }; use tracing::{error, warn}; use update::{ HandleSettingUpdate, HandleSettingUpdatedFunction, HandleValueUpdate, HandleValueUpdatedFunction, SettingUpdateKind, ValueUpdateKind, }; #[derive(Clone, Debug, Hash, Eq, PartialEq)] struct ObjectOperationPair { object_name: String, operation_name: String, } pub struct SettingMeta { name: String, deserialize: Box Result, serde_json::Error> + Send + Sync>, } pub struct ValueMeta { name: String, deserialize: Box Result, serde_json::Error> + Send + Sync>, } pub struct ObjectMeta { name: String, from_str: Box Result, ()> + Send + Sync>, any_is_same: Box bool + Send + Sync>, } pub struct OperationMeta { name: String, object_kind: String, deserialize: Box Result, ()> + Send + Sync>, any_is_same: Box bool + Send + Sync>, serialize_success: Box) -> Result, serde_json::Error> + Send + Sync>, serialize_error: Box) -> Result, serde_json::Error> + Send + Sync>, } pub struct OperationHandlers { operations: HashMap>, get_object: Vec>, value_getters: HashMap>, settings_getter: HashMap>, settings: HashMap, objects: HashMap, operations_meta: HashMap, value_updated: HashMap, setting_updated: HashMap, } #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct ObjectValuePair { pub object_kind: String, pub value_kind: String, } impl Default for OperationHandlers { fn default() -> Self { Self { operations: HashMap::new(), get_object: Vec::new(), value_updated: HashMap::default(), setting_updated: HashMap::default(), value_getters: HashMap::default(), settings_getter: HashMap::default(), settings: HashMap::default(), objects: HashMap::default(), operations_meta: HashMap::default(), } } } impl OperationHandlers { fn insert_operation + Send + Sync + 'static>( &mut self, ) { // let object_name = O::object_name().to_string(); // let operation_name = D::operation_name().to_string(); // self.operations_meta.insert( // ObjectOperationPair { // object_name: object_name.clone(), // operation_name: operation_name.clone(), // }, // OperationMeta { // name: operation_name, // object_kind: object_name, // deserialize: Box::new(|bytes| { // Ok(Box::new(serde_json::from_slice::(bytes).unwrap()) // as Box) // }), // any_is_same: Box::new(|any_box| any_box.is::()), // }, // ); } pub fn setting(&mut self) -> &mut Self { let setting_meta = SettingMeta { name: T::name().to_string(), deserialize: Box::new(|slice| { Ok(Box::new(serde_json::from_slice(slice)?) as Box) }), }; self.settings.insert(T::name().to_string(), setting_meta); self } pub fn insert< A, O: GiteratedObject + Send + Sync + 'static, D: GiteratedOperation + Send + Sync + 'static, H: GiteratedOperationHandler + Send + Sync + 'static + Clone, >( &mut self, handler: H, ) -> &mut Self { let object_name = handler.object_name().to_string(); let operation_name = handler.operation_name().to_string(); let wrapped = OperationWrapper::new(handler); let pair = ObjectOperationPair { object_name, operation_name, }; assert!(self.operations.insert(pair, wrapped).is_none()); self.insert_operation::(); self } pub fn object(&mut self) -> &mut Self { // let object_meta = ObjectMeta { // name: O::object_name().to_string(), // from_str: Box::new(|str| Ok(Box::new(O::from_str(&str).map_err(|_| ())?))), // }; // self.objects // .insert(O::object_name().to_string(), object_meta); let closure = |_: &Instance, operation: ObjectRequest, _state| { async move { if O::from_str(&operation.0).is_ok() { Ok(ObjectResponse(operation.0)) } else { Err(OperationError::Unhandled) } } .boxed() }; let wrapped = OperationWrapper::new(closure); self.get_object.push(wrapped); self } pub async fn handle( &self, object: &O, operation_name: &str, operation: AnyOperation, state: S, operation_state: &StackOperationState, ) -> Result, OperationError>> { // TODO let object = object.to_string(); let object_name = O::object_name().to_string(); let object_meta = self .objects .get(&object_name) .ok_or_else(|| OperationError::Unhandled)?; let object_box = (object_meta.from_str)(&object).map_err(|_| OperationError::Unhandled)?; let target_handler = ObjectOperationPair { object_name, operation_name: operation_name.to_string(), }; let operation_meta = self .operations_meta .get(&target_handler) .ok_or_else(|| OperationError::Unhandled)?; let operation_box = (operation_meta.deserialize)(&serde_json::to_vec(&operation.0).unwrap()) .map_err(|_| OperationError::Unhandled)?; if let Some(handler) = self.operations.get(&target_handler) { handler .handle(object_box, operation_box, state.clone(), operation_state) .await } else { Err(OperationError::Unhandled) } } pub async fn resolve_object( &self, instance: Instance, request: ObjectRequest, state: S, operation_state: &StackOperationState, ) -> Result, OperationError>> { for handler in self.get_object.iter() { if let Ok(response) = handler .handle( Box::new(instance.clone()) as _, Box::new(request.clone()) as _, state.clone(), operation_state, ) .await { return Ok(response); } } Err(OperationError::Unhandled) } pub fn insert_value_update_handler< H: HandleValueUpdate + Send + Sync + Clone + 'static, O: GiteratedObject + Send + Sync, V: GiteratedObjectValue + Send + Sync, >( &mut self, handler: H, ) -> &mut Self { let wrapper = HandleValueUpdatedFunction::new(handler, V::value_name()); assert!(self .value_updated .insert(wrapper.target.clone(), wrapper) .is_none()); self } pub fn insert_setting_update_handler< H: HandleSettingUpdate + Send + Sync + Clone + 'static, O: GiteratedObject + Send + Sync, T: Setting + Send + Sync, >( &mut self, handler: H, ) -> &mut Self { let wrapper = HandleSettingUpdatedFunction::new(handler, T::name()); assert!(self .setting_updated .insert(wrapper.target.clone(), wrapper) .is_none()); self } pub fn value_getter(&mut self, handler: F) -> &mut Self where O: GiteratedObject + Send + Sync + 'static, V: GiteratedObjectValue + Send + Sync + 'static, F: GiteratedOperationHandler, S> + Send + Sync + Clone + 'static, { let object_name = handler.object_name().to_string(); let value_name = V::value_name().to_string(); let wrapped = OperationWrapper::new(handler); assert!(self .value_getters .insert( ValueGetter { object_type: object_name, value_type: value_name }, wrapped ) .is_none()); self } pub fn setting_getter(&mut self, handler: F) -> &mut Self where O: GiteratedObject + Send + Sync + 'static, F: GiteratedOperationHandler + Send + Sync + Clone + 'static, { let object_name = handler.object_name().to_string(); let wrapped = OperationWrapper::new(handler); assert!(self.settings_getter.insert(object_name, wrapped).is_none()); self } } #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct ValueGetter { pub object_type: String, pub value_type: String, } impl Default for ValueGetter { fn default() -> Self { Self { object_type: AnyObject::object_name().to_string(), value_type: "any".to_string(), } } } #[async_trait::async_trait] pub trait GiteratedOperationHandler< L, O: GiteratedObject, D: GiteratedOperation, S: Send + Sync + Clone, >: Send + Sync { fn operation_name(&self) -> &str; fn object_name(&self) -> &str; async fn handle( &self, object: &O, operation: D, state: S, operation_state: &StackOperationState, ) -> Result>; } #[async_trait::async_trait] impl GiteratedOperationHandler<(), O, D, S> for F where F: FnMut( &O, D, S, ) -> Pin< Box>> + Send>, > + Send + Sync + Clone, O: GiteratedObject + Send + Sync, D: GiteratedOperation + 'static, >::Failure: Send, S: Send + Sync + Clone + 'static, { fn operation_name(&self) -> &str { D::operation_name() } fn object_name(&self) -> &str { O::object_name() } async fn handle( &self, object: &O, operation: D, state: S, _operation_state: &StackOperationState, ) -> Result> { self.clone()(object, operation, state).await } } #[async_trait::async_trait] impl GiteratedOperationHandler<(O1,), O, D, S> for F where F: FnMut( &O, D, S, O1, ) -> Pin< Box>> + Send>, > + Send + Sync + Clone, O: GiteratedObject + Send + Sync, D: GiteratedOperation + 'static + Send + Sync, >::Failure: Send, S: Send + Sync + Clone + 'static, O1: FromOperationState, { fn operation_name(&self) -> &str { D::operation_name() } fn object_name(&self) -> &str { O::object_name() } async fn handle( &self, object: &O, operation: D, state: S, operation_state: &StackOperationState, ) -> Result> { let o1 = O1::from_state(object, &operation, operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; self.clone()(object, operation, state, o1).await } } #[async_trait::async_trait] impl GiteratedOperationHandler<(O1, O2), O, D, S> for F where F: FnMut( &O, D, S, O1, O2, ) -> Pin< Box>> + Send>, > + Send + Sync + Clone, O: GiteratedObject + Send + Sync, D: GiteratedOperation + 'static + Send + Sync, >::Failure: Send, S: Send + Sync + Clone + 'static, O1: FromOperationState, O2: FromOperationState, { fn operation_name(&self) -> &str { D::operation_name() } fn object_name(&self) -> &str { O::object_name() } async fn handle( &self, object: &O, operation: D, state: S, operation_state: &StackOperationState, ) -> Result> { let o1 = O1::from_state(object, &operation, operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; let o2 = O2::from_state(object, &operation, operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; self.clone()(object, operation, state, o1, o2).await } } #[async_trait::async_trait] impl GiteratedOperationHandler<(O1, O2, O3), O, D, S> for F where F: FnMut( &O, D, S, O1, O2, O3, ) -> Pin< Box>> + Send>, > + Send + Sync + Clone, O: GiteratedObject + Send + Sync, D: GiteratedOperation + 'static + Send + Sync, >::Failure: Send, S: Send + Sync + Clone + 'static, O1: FromOperationState, O2: FromOperationState, O3: FromOperationState, { fn operation_name(&self) -> &str { D::operation_name() } fn object_name(&self) -> &str { O::object_name() } async fn handle( &self, object: &O, operation: D, state: S, operation_state: &StackOperationState, ) -> Result> { let o1 = O1::from_state(object, &operation, operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; let o2 = O2::from_state(object, &operation, operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; let o3 = O3::from_state(object, &operation, operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; self.clone()(object, operation, state, o1, o2, o3).await } } pub struct OperationWrapper { func: Box< dyn Fn( Box, Box, S, StackOperationState, ) -> Pin, OperationError>>> + Send>> + Send + Sync, >, object_name: String, } impl OperationWrapper { pub fn new< A, O: GiteratedObject + Send + Sync + 'static, D: GiteratedOperation + 'static, F: GiteratedOperationHandler + Send + Sync + 'static + Clone, >( handler: F, ) -> Self { let handler = Arc::new(Box::pin(handler)); Self { func: Box::new(move |object, operation, state, operation_state| { let handler = handler.clone(); async move { let handler = handler.clone(); let object: Box = object.downcast().unwrap(); let operation: Box = operation.downcast().unwrap(); let result = handler .handle(&object, *operation, state, &operation_state) .await; result .map(|success| serde_json::to_vec(&success).unwrap()) .map_err(|err| match err { OperationError::Operation(err) => { OperationError::Operation(serde_json::to_vec(&err).unwrap()) } OperationError::Internal(internal) => { OperationError::Internal(internal) } OperationError::Unhandled => OperationError::Unhandled, }) } .boxed() }), object_name: O::object_name().to_string(), } } async fn handle( &self, object: Box, operation: Box, state: S, operation_state: &StackOperationState, ) -> Result, OperationError>> { (self.func)(object, operation, state, operation_state.clone()).await } } #[async_trait::async_trait] pub trait FromOperationState + Send + Sync>: Sized + Clone + Send { type Error: Serialize + DeserializeOwned; async fn from_state( object: &O, operation: &D, state: &StackOperationState, ) -> Result>; } #[async_trait::async_trait] impl + Send + Sync> FromOperationState for BackendWrapper { type Error = (); async fn from_state( _object: &O, _operation: &D, state: &StackOperationState, ) -> Result> { Ok(state.giterated_backend.clone()) } } #[async_trait::async_trait] impl + Send + Sync> FromOperationState for StackOperationState { type Error = (); async fn from_state( _object: &O, _operation: &D, state: &StackOperationState, ) -> Result> { Ok(state.clone()) } } #[async_trait::async_trait] impl + Send + Sync> FromOperationState for AuthenticatedUser { type Error = (); async fn from_state( _object: &O, _operation: &D, state: &StackOperationState, ) -> Result> { state .user .clone() .ok_or_else(|| OperationError::Operation(())) } } #[async_trait::async_trait] impl + Send + Sync> FromOperationState for AuthenticatedInstance { type Error = (); async fn from_state( _object: &O, _operation: &D, state: &StackOperationState, ) -> Result> { state .instance .clone() .ok_or_else(|| OperationError::Operation(())) } } #[async_trait::async_trait] impl< T: FromOperationState + Send + Sync, O: GiteratedObject + Sync, D: GiteratedOperation + Send + Sync, > FromOperationState for Option { type Error = (); async fn from_state( object: &O, operation: &D, state: &StackOperationState, ) -> Result, OperationError<()>> { Ok(T::from_state(object, operation, state).await.ok()) } } #[derive(Clone)] pub struct AuthorizedUser(AuthenticatedUser); #[derive(Clone)] pub struct AuthorizedInstance(AuthenticatedInstance); #[async_trait::async_trait] pub trait AuthorizedOperation: GiteratedOperation { async fn authorize( &self, authorize_for: &O, state: &StackOperationState, ) -> Result>; } #[async_trait::async_trait] impl< O: GiteratedObject + Send + Sync + Debug, V: GiteratedObjectValue + Send + Sync, > AuthorizedOperation for GetValue { async fn authorize( &self, authorize_for: &O, operation_state: &StackOperationState, ) -> Result> { Ok(operation_state .giterated_backend .get_object::(&authorize_for.to_string(), operation_state) .await .is_ok()) } } #[async_trait::async_trait] impl AuthorizedOperation for SetSetting { async fn authorize( &self, authorize_for: &User, operation_state: &StackOperationState, ) -> Result> { let authenticated_user = operation_state .user .as_ref() .ok_or_else(|| OperationError::Operation(()))?; Ok(authorize_for == authenticated_user.deref()) } } #[async_trait::async_trait] impl AuthorizedOperation for GetSetting { async fn authorize( &self, authorize_for: &User, operation_state: &StackOperationState, ) -> Result> { let authenticated_user = operation_state .user .as_ref() .ok_or_else(|| OperationError::Operation(()))?; Ok(authorize_for == authenticated_user.deref()) } } #[async_trait::async_trait] impl AuthorizedOperation for SetSetting { async fn authorize( &self, authorize_for: &Repository, operation_state: &StackOperationState, ) -> Result> { let authenticated_user = operation_state .user .as_ref() .ok_or_else(|| OperationError::Operation(()))?; let mut object = operation_state .giterated_backend .get_object::(&authorize_for.to_string(), operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; let access_list = object .get_setting::(operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; if access_list .0 .iter() .find(|user| *user == authenticated_user.deref()) .is_some() { Ok(true) } else { Ok(false) } } } #[async_trait::async_trait] impl AuthorizedOperation for GetSetting { async fn authorize( &self, authorize_for: &Repository, operation_state: &StackOperationState, ) -> Result> { let authenticated_user = operation_state .user .as_ref() .ok_or_else(|| OperationError::Operation(()))?; let mut object = operation_state .giterated_backend .get_object::(&authorize_for.to_string(), operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; let access_list = object .get_setting::(operation_state) .await .map_err(|e| OperationError::Internal(e.to_string()))?; if access_list .0 .iter() .find(|user| *user == authenticated_user.deref()) .is_some() { Ok(true) } else { Ok(false) } } } #[async_trait::async_trait] impl AuthorizedOperation for RegisterAccountRequest { async fn authorize( &self, authorize_for: &Instance, state: &StackOperationState, ) -> Result> { if state.our_instance == *authorize_for { Ok(true) } else { Ok(false) } } } #[async_trait::async_trait] impl AuthorizedOperation for AuthenticationTokenRequest { async fn authorize( &self, authorize_for: &Instance, state: &StackOperationState, ) -> Result> { if state.our_instance == *authorize_for { Ok(true) } else { Ok(false) } } } #[async_trait::async_trait] impl AuthorizedOperation for RepositoryCreateRequest { async fn authorize( &self, authorize_for: &Instance, state: &StackOperationState, ) -> Result> { if state.our_instance == *authorize_for { Ok(true) } else { Ok(false) } } } #[async_trait::async_trait] impl + Send + Sync> FromOperationState for AuthorizedUser { type Error = (); async fn from_state( object: &User, operation: &A, state: &StackOperationState, ) -> Result> { let authenticated = AuthenticatedUser::from_state(object, operation, state).await?; match operation.authorize(object, state).await { Ok(authorized) => { assert!(authorized); } Err(err) => return Err(OperationError::Internal(err.to_string())), }; Ok(AuthorizedUser(authenticated)) } } #[async_trait::async_trait] impl + Send + Sync> FromOperationState for AuthorizedInstance { type Error = (); async fn from_state( object: &Instance, operation: &A, state: &StackOperationState, ) -> Result> { let authenticated = AuthenticatedInstance::from_state(object, operation, state).await?; match operation.authorize(object, state).await { Ok(authorized) => { assert!(authorized); } Err(err) => return Err(OperationError::Internal(err.to_string())), }; Ok(AuthorizedInstance(authenticated)) } } // #[async_trait::async_trait> FromOperationState for Option { // type Error = (); // async fn from_state(state: &StackOperationState) -> Result, OperationError<()>> { // Ok(T::from_state(] // impl, pub user: Option, } #[derive(Clone, Debug)] pub struct AuthenticatedInstance(Instance); impl AuthenticatedInstance { pub fn new(instance: Instance) -> Self { AuthenticatedInstance(instance) } } impl Deref for AuthenticatedInstance { type Target = Instance; fn deref(&self) -> &Self::Target { &self.0 } } #[derive(Clone, Debug)] pub struct AuthenticatedUser(User); impl AuthenticatedUser { pub fn new(user: User) -> Self { AuthenticatedUser(user) } } impl Deref for AuthenticatedUser { type Target = User; fn deref(&self) -> &Self::Target { &self.0 } } #[derive(Clone)] pub struct BackendWrapper { sender: tokio::sync::mpsc::Sender<( tokio::sync::oneshot::Sender>>, WrappedOperation, )>, task: Arc>, } pub struct WrappedOperation { object: AnyObject, operation_payload: AnyOperation, operation_name: String, state: StackOperationState, } impl BackendWrapper { pub fn new(backend: GiteratedBackend) -> Self { // Spawn listener task let (send, mut recv) = channel::<( tokio::sync::oneshot::Sender>>, WrappedOperation, )>(1024); let task = tokio::spawn(async move { while let Some((responder, message)) = recv.recv().await { let raw_result = backend .object_operation( message.object, &message.operation_name, message.operation_payload, &message.state, ) .await; responder.send(raw_result).unwrap(); } error!("Error, thing's dead"); }); Self { sender: send, task: Arc::new(task), } } pub async fn call(&self, operation: WrappedOperation) -> Result> { let (sender, response) = tokio::sync::oneshot::channel(); self.sender .send((sender, operation)) .await .map_err(|e| OperationError::Internal(e.to_string()))?; match response.await { Ok(result) => Ok(result?), Err(err) => Err(OperationError::Internal(err.to_string())), } } } use std::fmt::Debug; #[async_trait::async_trait] impl ObjectBackend for BackendWrapper { async fn object_operation( &self, object: O, operation: &str, payload: D, operation_state: &StackOperationState, ) -> Result> where O: GiteratedObject + Debug, D: GiteratedOperation + Debug, { let operation = WrappedOperation { object: AnyObject(object.to_string()), operation_name: operation.to_string(), operation_payload: AnyOperation(serde_json::to_value(payload).unwrap()), state: operation_state.clone(), }; let raw_result = self.call(operation).await; match raw_result { Ok(result) => Ok(serde_json::from_value(result) .map_err(|e| OperationError::Internal(e.to_string()))?), Err(err) => match err { OperationError::Internal(internal) => { warn!( "Internal Error: {:?}", OperationError::<()>::Internal(internal.clone()) ); Err(OperationError::Internal(internal)) } OperationError::Unhandled => Err(OperationError::Unhandled), OperationError::Operation(err) => Err(OperationError::Operation( serde_json::from_value(err) .map_err(|e| OperationError::Internal(e.to_string()))?, )), }, } } async fn get_object( &self, object_str: &str, operation_state: &StackOperationState, ) -> Result, OperationError> { let operation = WrappedOperation { object: AnyObject(object_str.to_string()), operation_name: ObjectRequest::operation_name().to_string(), operation_payload: AnyOperation( serde_json::to_value(ObjectRequest(object_str.to_string())).unwrap(), ), state: operation_state.clone(), }; let raw_result = self.call(operation).await; let object: ObjectResponse = match raw_result { Ok(result) => Ok(serde_json::from_value(result) .map_err(|e| OperationError::Internal(e.to_string()))?), Err(err) => match err { OperationError::Internal(internal) => { warn!( "Internal Error: {:?}", OperationError::<()>::Internal(internal.clone()) ); Err(OperationError::Internal(internal)) } OperationError::Unhandled => Err(OperationError::Unhandled), OperationError::Operation(err) => Err(OperationError::Operation( serde_json::from_value(err) .map_err(|e| OperationError::Internal(e.to_string()))?, )), }, }?; unsafe { Ok(Object::new_unchecked( O::from_str(&object.0) .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?, self.clone(), )) } } }