diff --git a/Cargo.lock b/Cargo.lock index a07bcf9..8b5bc9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -846,6 +846,7 @@ dependencies = [ "giterated-api", "giterated-cache", "giterated-models", + "giterated-protocol", "giterated-stack", "jsonwebtoken", "log", @@ -891,6 +892,15 @@ dependencies = [ ] [[package]] +name = "giterated-protocol" +version = "0.1.0" +dependencies = [ + "giterated-stack", + "serde", + "tracing", +] + +[[package]] name = "giterated-stack" version = "0.1.0" dependencies = [ diff --git a/Cargo.toml b/Cargo.toml index 93dbda5..25943fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "giterated-daemon", "giterated-models", "giterated-stack", - "giterated-cache" + "giterated-cache", + "giterated-protocol" ] \ No newline at end of file diff --git a/README.md b/README.md index 00d0733..e048bee 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,9 @@ allows for easy extension of the daemon. ### Licensing Intent -Our intent is to foster a strong and compatible ecosystem of giterated frontend implementations, as well as foster the growth of services that are compatible with Giterated. +*Our declaration of intent is __**NOT**__ a promise, guarantee, or contract we are providing. Nothing stated in this section shall take precedent over the license, license text, or any other legally binding agreement. This is a plain english description of our intents, not a contract, license, or legally binding agreement.* + +Our intent is to foster a strong and compatible ecosystem of giterated frontend implementations, as well as foster the growth of services that are compatible with Giterated. We want you to be able to link any part of Giterated with your proprietary or closed source solutions without concerns over license violations. With that in mind, we have chosen to license core parts of the Giterated Daemon with the MPL-2.0. This is because there is no expected value provided to the community through closed-source versions of the core components, as they are generic. We felt that licenses such as the GPL and AGPL may make potential adopters uneasy, and we encourage the slight bit of extra flexibility the MPL-2.0 provides for adopters who are interested in making modifications. License aside, please consider upstreaming any improvements you identify. diff --git a/giterated-daemon/Cargo.toml b/giterated-daemon/Cargo.toml index deb447d..84c55a3 100644 --- a/giterated-daemon/Cargo.toml +++ b/giterated-daemon/Cargo.toml @@ -33,6 +33,7 @@ giterated-models = { path = "../giterated-models" } giterated-api = { path = "../../giterated-api" } giterated-stack = { path = "../giterated-stack" } giterated-cache = { path = "../giterated-cache" } +giterated-protocol = { path = "../giterated-protocol" } deadpool = "0.9" bincode = "1.3" tokio-util = {version = "0.7", features = ["rt"]} diff --git a/giterated-daemon/src/authentication.rs b/giterated-daemon/src/authentication.rs index f36df8c..a37712b 100644 --- a/giterated-daemon/src/authentication.rs +++ b/giterated-daemon/src/authentication.rs @@ -6,12 +6,11 @@ use giterated_models::instance::Instance; use giterated_models::user::User; use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, TokenData, Validation}; +use std::collections::HashMap; use std::{sync::Arc, time::SystemTime}; use tokio::{fs::File, io::AsyncReadExt, sync::Mutex}; use toml::Table; -use crate::keys::PublicKeyCache; - pub struct AuthenticationTokenGranter { pub config: Table, pub instance: Instance, @@ -161,3 +160,25 @@ impl AuthenticationTokenGranter { Ok(Some(UserAuthenticationToken::from(token))) } } + +#[derive(Default)] +pub struct PublicKeyCache { + pub keys: HashMap, +} + +impl PublicKeyCache { + pub async fn get(&mut self, instance: &Instance) -> Result { + if let Some(key) = self.keys.get(instance) { + Ok(key.clone()) + } else { + let key = reqwest::get(format!("https://{}/.giterated/pubkey.pem", instance)) + .await? + .text() + .await?; + + self.keys.insert(instance.clone(), key); + + Ok(self.keys.get(instance).unwrap().clone()) + } + } +} diff --git a/giterated-daemon/src/authorization.rs b/giterated-daemon/src/authorization.rs deleted file mode 100644 index 4ed564c..0000000 --- a/giterated-daemon/src/authorization.rs +++ /dev/null @@ -1,138 +0,0 @@ -use std::fmt::Debug; - -use crate::connection::wrapper::ConnectionState; -use giterated_models::error::OperationError; - -use giterated_models::object::GiteratedObject; - -use giterated_models::repository::{ - Repository, RepositoryFileInspectRequest, RepositoryIssueLabelsRequest, - RepositoryIssuesCountRequest, RepositoryIssuesRequest, -}; - -use giterated_models::user::User; - -use giterated_models::value::GetValueTyped; -use giterated_models::{object::ObjectRequest, settings::SetSetting, value::GiteratedObjectValue}; - -#[derive(Debug, thiserror::Error)] -#[error("unauthorized")] -pub struct UnauthorizedError; - -#[async_trait::async_trait] -pub trait AuthorizedOperation { - /// Authorizes the operation, returning whether the operation was - /// authorized or not. - async fn authorize( - &self, - authenticating_user: Option<&User>, - object: &O, - state: &mut S, - ) -> Result>; -} - -#[async_trait::async_trait] -impl AuthorizedOperation for SetSetting { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &User, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl AuthorizedOperation for SetSetting { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl - AuthorizedOperation for GetValueTyped -{ - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl AuthorizedOperation for ObjectRequest { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl AuthorizedOperation for RepositoryFileInspectRequest { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl AuthorizedOperation for RepositoryIssuesRequest { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl AuthorizedOperation for RepositoryIssueLabelsRequest { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} - -#[async_trait::async_trait] -impl AuthorizedOperation for RepositoryIssuesCountRequest { - async fn authorize( - &self, - _authenticating_user: Option<&User>, - _object: &Repository, - _state: &mut ConnectionState, - ) -> Result> { - // TODO - Ok(true) - } -} diff --git a/giterated-daemon/src/cache_backend.rs b/giterated-daemon/src/cache_backend.rs deleted file mode 100644 index 8b13789..0000000 --- a/giterated-daemon/src/cache_backend.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/giterated-daemon/src/client.rs b/giterated-daemon/src/client.rs new file mode 100644 index 0000000..1093c29 --- /dev/null +++ b/giterated-daemon/src/client.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; + +use futures_util::{SinkExt, StreamExt}; +use giterated_models::{ + authenticated::AuthenticatedPayload, + error::{IntoInternalError, OperationError}, + instance::Instance, + object_backend::ObjectBackend, +}; +use giterated_protocol::{NetworkedObject, NetworkedOperation}; +use giterated_stack::{GiteratedStack, StackOperationState}; +use tokio::net::TcpStream; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + +pub async fn client_wrapper( + our_instance: Instance, + mut socket: WebSocketStream, + runtime: Arc, +) { + loop { + let message = socket.next().await; + + if message.is_none() { + // Keep an eye out for this, I dont see why we shouldn't end the connection + unreachable!() + } + + let message = message.unwrap(); + + let payload = match message { + Ok(message) => { + let payload = match message { + Message::Binary(payload) => payload, + Message::Ping(_) => { + let _ = socket.send(Message::Pong(vec![])).await; + continue; + } + Message::Close(_) => return, + _ => continue, + }; + + payload + } + Err(err) => { + // Connection error + warn!("A connection error has occured: {:?}", err); + + return; + } + }; + + let payload = match bincode::deserialize::(&payload) { + Ok(payload) => payload, + Err(e) => { + warn!( + "A network payload deserialization failure has occurred: {:?}", + e + ); + + continue; + } + }; + + let operation_state = StackOperationState { + our_instance: our_instance.clone(), + runtime: runtime.clone(), + instance: None, + user: None, + }; + + let result = handle_client_message(payload, operation_state, runtime.clone()).await; + + // Grab operation errors so we can log them, they don't make it across the network + if let Err(OperationError::Internal(internal_error)) = &result { + error!("An internal error has occurred:\n{:?}", internal_error); + } + + // Map error to the network variant + let result = result.map_err(|e| e.into_network()); + + socket + .send(Message::Binary(bincode::serialize(&result).unwrap())) + .await + .expect("there was an error sending a message, this is a problem for the receiver"); + } +} + +pub async fn handle_client_message( + payload: AuthenticatedPayload, + operation_state: StackOperationState, + runtime: Arc, +) -> Result, OperationError>> { + let mut networked_object = runtime + .get_object::(&payload.object, &operation_state) + .await + .as_internal_error_with_context("handling client message")?; + + let networked_operation = NetworkedOperation::new(payload.object, payload.payload); + + networked_object + .request(networked_operation, &operation_state) + .await +} diff --git a/giterated-daemon/src/connection.rs b/giterated-daemon/src/connection.rs deleted file mode 100644 index 222e5f5..0000000 --- a/giterated-daemon/src/connection.rs +++ /dev/null @@ -1,88 +0,0 @@ -pub mod wrapper; - -use giterated_models::instance::Instance; - -use giterated_models::instance::InstanceMeta; - -use std::{any::type_name, collections::HashMap}; - -use anyhow::Error; -use serde::{de::DeserializeOwned, Serialize}; -use tokio::{net::TcpStream, task::JoinHandle}; -use tokio_tungstenite::WebSocketStream; - -#[derive(Debug, thiserror::Error)] -pub enum ConnectionError { - #[error("connection should close")] - Shutdown, - #[error("internal error {0}")] - InternalError(#[from] Error), -} - -pub struct RawConnection { - pub task: JoinHandle<()>, -} - -pub struct InstanceConnection { - pub instance: InstanceMeta, - pub task: JoinHandle<()>, -} - -/// Represents a connection which hasn't finished the handshake. -pub struct UnestablishedConnection { - pub socket: WebSocketStream, -} - -#[derive(Default)] -pub struct Connections { - pub connections: Vec, - pub instance_connections: HashMap, -} - -#[derive(Debug, thiserror::Error)] -#[error("handler did not handle")] -pub struct HandlerUnhandled; - -pub trait MessageHandling { - fn message_type() -> &'static str; -} - -impl MessageHandling<(T1,), M, R> for F -where - F: FnOnce(T1) -> R, - T1: Serialize + DeserializeOwned, -{ - fn message_type() -> &'static str { - type_name::() - } -} - -impl MessageHandling<(T1, T2), M, R> for F -where - F: FnOnce(T1, T2) -> R, - T1: Serialize + DeserializeOwned, -{ - fn message_type() -> &'static str { - type_name::() - } -} - -impl MessageHandling<(T1, T2, T3), M, R> for F -where - F: FnOnce(T1, T2, T3) -> R, - T1: Serialize + DeserializeOwned, -{ - fn message_type() -> &'static str { - type_name::() - } -} - -impl MessageHandling<(T1, T2, T3, T4), M, R> for F -where - F: FnOnce(T1, T2, T3, T4) -> R, - T1: Serialize + DeserializeOwned, -{ - fn message_type() -> &'static str { - type_name::() - } -} diff --git a/giterated-daemon/src/connection/wrapper.rs b/giterated-daemon/src/connection/wrapper.rs deleted file mode 100644 index 2fc86ff..0000000 --- a/giterated-daemon/src/connection/wrapper.rs +++ /dev/null @@ -1,236 +0,0 @@ -use std::{ - net::SocketAddr, - ops::Deref, - sync::{atomic::AtomicBool, Arc}, -}; - -use anyhow::Error; -use futures_util::{SinkExt, StreamExt}; - -use giterated_models::{ - authenticated::{AuthenticationSource, UserTokenMetadata}, - error::OperationError, - instance::Instance, -}; - -use giterated_models::authenticated::AuthenticatedPayload; -use giterated_stack::{ - AuthenticatedInstance, AuthenticatedUser, GiteratedStack, StackOperationState, -}; -use jsonwebtoken::{DecodingKey, TokenData, Validation}; -use rsa::{ - pkcs1::DecodeRsaPublicKey, - pss::{Signature, VerifyingKey}, - sha2::Sha256, - signature::Verifier, - RsaPublicKey, -}; -use serde::Serialize; - -use tokio::{net::TcpStream, sync::Mutex}; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; -use toml::Table; - -use crate::{ - authentication::AuthenticationTokenGranter, - backend::{MetadataBackend, RepositoryBackend, UserBackend}, - federation::connections::InstanceConnections, - keys::PublicKeyCache, -}; - -use super::Connections; - -pub async fn connection_wrapper( - socket: WebSocketStream, - connections: Arc>, - repository_backend: Arc>, - user_backend: Arc>, - auth_granter: Arc>, - settings_backend: Arc>, - addr: SocketAddr, - instance: impl ToOwned, - instance_connections: Arc>, - config: Table, - runtime: Arc, - mut operation_state: StackOperationState, -) { - let connection_state = ConnectionState { - socket: Arc::new(Mutex::new(socket)), - connections, - repository_backend, - user_backend, - auth_granter, - settings_backend, - addr, - instance: instance.to_owned(), - handshaked: Arc::new(AtomicBool::new(false)), - key_cache: Arc::default(), - instance_connections: instance_connections.clone(), - config, - }; - - let _handshaked = false; - let mut key_cache = PublicKeyCache::default(); - - loop { - let mut socket = connection_state.socket.lock().await; - let message = socket.next().await; - drop(socket); - - match message { - Some(Ok(message)) => { - let payload = match message { - Message::Binary(payload) => payload, - Message::Ping(_) => { - let mut socket = connection_state.socket.lock().await; - let _ = socket.send(Message::Pong(vec![])).await; - drop(socket); - continue; - } - Message::Close(_) => return, - _ => continue, - }; - - let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap(); - - // Get authentication - let instance = { - let mut verified_instance: Option = None; - for source in &message.source { - if let AuthenticationSource::Instance { - instance, - signature, - } = source - { - let public_key = key_cache.get(instance).await.unwrap(); - let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap(); - let verifying_key = VerifyingKey::::new(public_key); - - if verifying_key - .verify( - &message.payload, - &Signature::try_from(signature.as_ref()).unwrap(), - ) - .is_ok() - { - verified_instance = - Some(AuthenticatedInstance::new(instance.clone())); - - break; - } - } - } - - verified_instance - }; - - let user = { - let mut verified_user = None; - if let Some(verified_instance) = &instance { - for source in &message.source { - if let AuthenticationSource::User { user, token } = source { - // Get token - let public_key = key_cache.get(verified_instance).await.unwrap(); - - let token: TokenData = jsonwebtoken::decode( - token.as_ref(), - &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(), - &Validation::new(jsonwebtoken::Algorithm::RS256), - ) - .unwrap(); - - if token.claims.generated_for != *verified_instance.deref() { - // Nope! - break; - } - - if token.claims.user != *user { - // Nope! - break; - } - - verified_user = Some(AuthenticatedUser::new(user.clone())); - break; - } - } - } - - verified_user - }; - - operation_state.user = user; - operation_state.instance = instance; - - let result = runtime - .handle_network_message(message, &operation_state) - .await; - - // Asking for exploits here - operation_state.user = None; - operation_state.instance = None; - - if let Err(OperationError::Internal(internal_error)) = &result { - error!("An internal error has occurred:\n{:?}", internal_error); - } - - // Map error to the network variant - let result = result.map_err(|e| e.into_network()); - - let mut socket = connection_state.socket.lock().await; - let _ = socket - .send(Message::Binary(bincode::serialize(&result).unwrap())) - .await; - - drop(socket); - } - _ => { - return; - } - } - } -} - -#[derive(Clone)] -pub struct ConnectionState { - socket: Arc>>, - pub connections: Arc>, - pub repository_backend: Arc>, - pub user_backend: Arc>, - pub auth_granter: Arc>, - pub settings_backend: Arc>, - pub addr: SocketAddr, - pub instance: Instance, - pub handshaked: Arc, - pub key_cache: Arc>, - pub instance_connections: Arc>, - pub config: Table, -} - -impl ConnectionState { - pub async fn send(&self, message: T) -> Result<(), Error> { - let payload = serde_json::to_string(&message)?; - self.socket - .lock() - .await - .send(Message::Binary(payload.into_bytes())) - .await?; - - Ok(()) - } - - pub async fn send_raw(&self, message: T) -> Result<(), Error> { - let payload = serde_json::to_string(&message)?; - self.socket - .lock() - .await - .send(Message::Binary(payload.into_bytes())) - .await?; - - Ok(()) - } - - pub async fn public_key(&self, instance: &Instance) -> Result { - let mut keys = self.key_cache.lock().await; - keys.get(instance).await - } -} diff --git a/giterated-daemon/src/federation/connections.rs b/giterated-daemon/src/federation/connections.rs deleted file mode 100644 index e41c311..0000000 --- a/giterated-daemon/src/federation/connections.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::collections::HashMap; - -use anyhow::Error; -use giterated_api::DaemonConnectionPool; -use giterated_models::instance::Instance; - -#[derive(Default)] -pub struct InstanceConnections { - pools: HashMap, -} - -impl InstanceConnections { - pub fn get_or_open(&mut self, instance: &Instance) -> Result { - if let Some(pool) = self.pools.get(instance) { - Ok(pool.clone()) - } else { - let pool = DaemonConnectionPool::connect(instance.clone()).unwrap(); - self.pools.insert(instance.clone(), pool.clone()); - - Ok(pool) - } - } -} diff --git a/giterated-daemon/src/federation/mod.rs b/giterated-daemon/src/federation/mod.rs deleted file mode 100644 index a67f22c..0000000 --- a/giterated-daemon/src/federation/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod connections; diff --git a/giterated-daemon/src/keys.rs b/giterated-daemon/src/keys.rs deleted file mode 100644 index b82d3c0..0000000 --- a/giterated-daemon/src/keys.rs +++ /dev/null @@ -1,27 +0,0 @@ -use anyhow::Error; - -use giterated_models::instance::Instance; - -use std::collections::HashMap; - -#[derive(Default)] -pub struct PublicKeyCache { - pub keys: HashMap, -} - -impl PublicKeyCache { - pub async fn get(&mut self, instance: &Instance) -> Result { - if let Some(key) = self.keys.get(instance) { - Ok(key.clone()) - } else { - let key = reqwest::get(format!("https://{}/.giterated/pubkey.pem", instance)) - .await? - .text() - .await?; - - self.keys.insert(instance.clone(), key); - - Ok(self.keys.get(instance).unwrap().clone()) - } - } -} diff --git a/giterated-daemon/src/lib.rs b/giterated-daemon/src/lib.rs index 6e84069..2d8d4b6 100644 --- a/giterated-daemon/src/lib.rs +++ b/giterated-daemon/src/lib.rs @@ -3,13 +3,9 @@ use std::str::FromStr; use semver::{Version, VersionReq}; pub mod authentication; -pub mod authorization; pub mod backend; -pub mod cache_backend; -pub mod connection; +pub mod client; pub mod database_backend; -pub mod federation; -pub mod keys; #[macro_use] extern crate tracing; diff --git a/giterated-daemon/src/main.rs b/giterated-daemon/src/main.rs index 321bbf7..62d7262 100644 --- a/giterated-daemon/src/main.rs +++ b/giterated-daemon/src/main.rs @@ -1,19 +1,17 @@ use anyhow::Error; -use connection::{Connections, RawConnection}; use giterated_cache::CacheSubstack; use giterated_daemon::{ authentication::AuthenticationTokenGranter, backend::{ git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend, }, - connection::{self, wrapper::connection_wrapper}, + client::client_wrapper, database_backend::DatabaseBackend, - federation::connections::InstanceConnections, }; use giterated_models::instance::Instance; -use giterated_stack::{GiteratedStack, StackOperationState}; +use giterated_stack::GiteratedStack; use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool}; use std::{net::SocketAddr, str::FromStr, sync::Arc}; use tokio::{ @@ -39,8 +37,6 @@ async fn main() -> Result<(), Error> { text.parse()? }; let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?; - let connections: Arc> = Arc::default(); - let instance_connections: Arc> = Arc::default(); let db_conn_options = PgConnectOptions::new() .host(config["postgres"]["host"].as_str().unwrap()) .port(config["postgres"]["port"].as_integer().unwrap() as u16) @@ -110,16 +106,6 @@ async fn main() -> Result<(), Error> { .set(runtime.clone()) .expect("failed to store global daemon stack"); - let operation_state = { - StackOperationState { - our_instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()) - .unwrap(), - runtime: runtime.clone(), - instance: None, - user: None, - } - }; - let pool = LocalPoolHandle::new(5); loop { @@ -150,38 +136,12 @@ async fn main() -> Result<(), Error> { }; info!("Websocket connection established with {}", address); - let connections_cloned = connections.clone(); - let repository_backend = repository_backend.clone(); - let user_backend = user_backend.clone(); - let token_granter = token_granter.clone(); - let settings = settings.clone(); - let instance_connections = instance_connections.clone(); - let config = config.clone(); + + let our_instance = + Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(); let runtime = runtime.clone(); - let operation_state = operation_state.clone(); - - pool.spawn_pinned(move || { - connection_wrapper( - connection, - connections_cloned, - repository_backend, - user_backend, - token_granter, - settings, - address, - Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(), - instance_connections, - config, - runtime, - operation_state, - ) - }); - - let connection = RawConnection { - task: tokio::spawn(async move { () }), - }; - connections.lock().await.connections.push(connection); + pool.spawn_pinned(move || client_wrapper(our_instance, connection, runtime)); } } diff --git a/giterated-protocol/Cargo.toml b/giterated-protocol/Cargo.toml index 39e68b6..78fc0d4 100644 --- a/giterated-protocol/Cargo.toml +++ b/giterated-protocol/Cargo.toml @@ -13,3 +13,6 @@ keywords = ["giterated"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +giterated-stack = {path = "../giterated-stack" } +serde = { version = "1.0.188", features = [ "derive" ]} +tracing = "0.1" \ No newline at end of file diff --git a/giterated-protocol/src/lib.rs b/giterated-protocol/src/lib.rs index 7d12d9a..b5d92fb 100644 --- a/giterated-protocol/src/lib.rs +++ b/giterated-protocol/src/lib.rs @@ -1,14 +1,3 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} +mod substack; -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +pub use substack::{NetworkedObject, NetworkedOperation, NetworkedSubstack}; diff --git a/giterated-protocol/src/substack.rs b/giterated-protocol/src/substack.rs new file mode 100644 index 0000000..1e1087e --- /dev/null +++ b/giterated-protocol/src/substack.rs @@ -0,0 +1,245 @@ +use std::{fmt::Display, str::FromStr, sync::Arc}; + +use giterated_stack::{ + models::{Error, GiteratedObject, GiteratedOperation, IntoInternalError, OperationError}, + AnyFailure, AnyObject, AnyOperation, AnySuccess, GiteratedStack, ObjectOperationPair, + StackOperationState, SubstackBuilder, +}; +use serde::{Deserialize, Serialize}; +use tracing::{trace, warn}; + +/// A Giterated substack that attempts to resolve with a remote, networked Giterated Daemon. +/// +/// # Usage +/// +/// Convert the [`NetworkedSubstack`] into a [`SubStackBuilder`] and merge it with +/// a runtime. +/// +/// ``` +/// let mut runtime = GiteratedStack::default(); +/// +/// let network_substack = NetworkedSubstack::default(); +/// +/// runtime.merge_builder(network_substack.into_substack()); +/// ``` +/// +/// To handle messages that are sourced from the network, use [`NetworkedObject`] and [`NetworkedOperation`]. +/// +/// These are wrappers around the raw payloads from the network. The return payload from handling [`NetworkedOperation`] is then +/// sent back to the requester. +/// +/// ``` +/// // Start with a network payload +/// let network_payload: AuthenticatedPayload = { todo!() }; +/// +/// let networked_object = runtime.get_object::(network_payload.object).await?; +/// let operation_name = payload.operation; +/// let networked_operation = NetworkedOperation(payload); +/// +/// // Operation state depends on the authentication in the payload, it +/// // isn't relevant here. +/// let operation_state = StackOperationState::default(); +/// +/// let result = networked_object.request(networked_operation, &operation_state); +/// +/// // `result` is Result, OperationError> which is also the type that +/// // giterated's networked protocol uses for responses, so you can send it directly. +/// ``` +/// +/// TODO: The above docs are 100% false about the network protocol type +#[derive(Clone)] +pub struct NetworkedSubstack { + home_uri: Option, +} + +impl Default for NetworkedSubstack { + fn default() -> Self { + todo!() + } +} + +impl NetworkedSubstack { + pub fn into_substack(self) -> SubstackBuilder { + let mut stack = SubstackBuilder::new(self); + + stack.operation(handle_network_operation); + + // TODO: optional + stack.dynamic_operation(try_handle_with_remote); + + stack + } +} + +pub async fn handle_network_operation( + object: NetworkedObject, + operation: NetworkedOperation, + _state: NetworkedSubstack, + operation_state: StackOperationState, + stack: Arc, +) -> Result, OperationError>> { + trace!("Handle network operation"); + let mut result = None; + + for (_, object_meta) in &stack.metadata.objects { + if let Ok(object) = (object_meta.from_str)(&object.0) { + // TODO: This is definitely going to resolve us + result = Some((object, object_meta)); + break; + } + } + + let (object, object_meta) = result.ok_or_else(|| OperationError::Unhandled)?; + + trace!( + "Resolved object type {} for network operation.", + object_meta.name + ); + + let operation_meta = stack + .metadata + .operations + .get(&ObjectOperationPair { + object_name: &object_meta.name, + operation_name: &operation.name, + }) + .ok_or_else(|| OperationError::Unhandled)?; + + trace!( + "Resolved operation {}::{} for network operation.", + object_meta.name, + operation_meta.name + ); + + let operation = (operation_meta.deserialize)(&operation.payload) + .as_internal_error_with_context(format!( + "deserializing object operation {}::{}", + object_meta.name, operation_meta.name + ))?; + + trace!( + "Deserialized operation {}::{} for network operation.", + object_meta.name, + operation_meta.name + ); + + let result = stack + .new_operation_func(object, operation, operation_state) + .await; + + match result { + Ok(success) => Ok((operation_meta.serialize_success)(success) + .as_internal_error_with_context(format!( + "serializing success for object operation {}::{}", + object_meta.name, operation_meta.name + ))?), + Err(err) => Err(match err { + OperationError::Operation(failure) => OperationError::Operation( + (operation_meta.serialize_error)(failure).as_internal_error_with_context( + format!( + "serializing error for object operation {}::{}", + object_meta.name, operation_meta.name + ), + )?, + ), + OperationError::Internal(internal) => OperationError::Internal(internal), + OperationError::Unhandled => OperationError::Unhandled, + }), + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkedObject(pub String); + +impl FromStr for NetworkedObject { + type Err = (); + + fn from_str(_s: &str) -> Result { + todo!() + } +} + +impl Display for NetworkedObject { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl GiteratedObject for NetworkedObject { + fn object_name() -> &'static str { + todo!() + } + + fn from_object_str(_object_str: &str) -> Result { + todo!() + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkedOperation { + name: String, + payload: Vec, +} + +impl NetworkedOperation { + pub fn new(_name: String, _payload: Vec) -> Self { + todo!() + } +} + +impl GiteratedOperation for NetworkedOperation { + type Success = Vec; + + type Failure = Vec; + + fn operation_name() -> &'static str { + "network_operation" + } +} + +/// Handler which will attempt to resolve any operation that doesn't resolve locally +/// against a remote instance. +pub async fn try_handle_with_remote( + object: AnyObject, + operation: AnyOperation, + state: NetworkedSubstack, + _operation_state: StackOperationState, + stack: Arc, +) -> Result> { + trace!( + "Try handling object operation {}::{} with remote", + object.kind(), + operation.kind().operation_name + ); + // TODO: + // Ideally we support pass-through on object types that aren't used locally. + // For now, we aren't worrying about that. + let object_meta = stack + .metadata + .objects + .get(object.kind()) + .ok_or_else(|| OperationError::Unhandled)?; + + let _operation_meta = stack + .metadata + .operations + .get(&operation.kind()) + .ok_or_else(|| OperationError::Unhandled)?; + + let object_home_uri = (object_meta.home_uri)(object.clone()); + + if let Some(home_uri) = state.home_uri { + if home_uri == object_home_uri { + // This isn't a remote request, requests aren't supposed to hit this layer + // if they're not remote. + warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(), + operation.kind().operation_name); + + return Err(OperationError::Unhandled); + } + } + + // Blah blah connect and do the stuff + + todo!() +} diff --git a/giterated-stack/src/lib.rs b/giterated-stack/src/lib.rs index 7e5c4b3..439484c 100644 --- a/giterated-stack/src/lib.rs +++ b/giterated-stack/src/lib.rs @@ -12,6 +12,14 @@ pub use substack::*; pub mod state; pub mod update; +// Temp pub use to figure out what's important +pub mod models { + pub use anyhow::Error; + pub use giterated_models::error::{IntoInternalError, OperationError}; + pub use giterated_models::object::GiteratedObject; + pub use giterated_models::operation::GiteratedOperation; +} + use std::{any::Any, convert::Infallible, ops::Deref, sync::Arc}; use core::fmt::Debug; diff --git a/giterated-stack/src/meta/mod.rs b/giterated-stack/src/meta/mod.rs index a9033fa..eefa2c6 100644 --- a/giterated-stack/src/meta/mod.rs +++ b/giterated-stack/src/meta/mod.rs @@ -237,12 +237,14 @@ pub struct ObjectMeta { pub name: String, pub to_str: Box String + Send + Sync>, pub from_str: Box Result + Send + Sync>, + pub home_uri: fn(AnyObject) -> String, pub any_is_same: fn(&dyn Any) -> bool, } pub trait IntoObjectMeta: FromStr { fn name() -> String; fn any_is_same(other: &dyn Any) -> bool; + fn home_uri(object: AnyObject) -> String; } impl IntoObjectMeta for O { @@ -253,6 +255,10 @@ impl IntoObjectMeta for O { fn any_is_same(other: &dyn Any) -> bool { other.is::() } + + fn home_uri(_object: AnyObject) -> String { + todo!() + } } impl ObjectMeta { @@ -270,6 +276,7 @@ impl ObjectMeta { object.to_string() }), any_is_same: I::any_is_same, + home_uri: I::home_uri, } } } diff --git a/giterated-stack/src/stack.rs b/giterated-stack/src/stack.rs index c5c8a8c..b8bb016 100644 --- a/giterated-stack/src/stack.rs +++ b/giterated-stack/src/stack.rs @@ -1149,6 +1149,18 @@ impl ObjectBackend for Arc { } } +// Placeholder +impl GiteratedStack { + pub async fn new_operation_func( + &self, + _object: AnyObject, + _operation: AnyOperation, + _operation_state: StackOperationState, + ) -> Result> { + todo!() + } +} + /// Defines a type that is a valid Giterated runtime state. /// /// This allows for extraction of state in handlers, based on a diff --git a/giterated-stack/src/substack.rs b/giterated-stack/src/substack.rs index 052d3c6..d0c44ee 100644 --- a/giterated-stack/src/substack.rs +++ b/giterated-stack/src/substack.rs @@ -313,6 +313,12 @@ impl SubstackBuilder { } } +// Placeholder +impl SubstackBuilder { + pub fn dynamic_operation(&mut self, _handler: H) -> &mut Self { + todo!() + } +} #[derive(Debug, Clone, thiserror::Error)] #[error("downcast error")] pub struct DowncastError; diff --git a/migrations/20230913232554_repository_metadata.sql b/migrations/20230913232554_repository_metadata.sql deleted file mode 100644 index 8ddc1d3..0000000 --- a/migrations/20230913232554_repository_metadata.sql +++ /dev/null @@ -1 +0,0 @@ --- Add migration script here