JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Create `NetworkedSubstack`.

# giterated-protocol - Create `NetworkedSubstack` which will handle all networked operations giterated needs - Add support for `NetworkedSubstack` for both the daemon and client - Pipe everything through but leave APIs temp # `giterated-daemon` - Remove a bunch of random old code, dead code, and files that aren't needed. - Moved all connection handling to `client.rs`, simplified connection logic with new types

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨202bb12

Showing ⁨⁨23⁩ changed files⁩ with ⁨⁨432⁩ insertions⁩ and ⁨⁨583⁩ deletions⁩

Cargo.lock

View file
@@ -846,6 +846,7 @@ dependencies = [
846 846 "giterated-api",
847 847 "giterated-cache",
848 848 "giterated-models",
849 "giterated-protocol",
849 850 "giterated-stack",
850 851 "jsonwebtoken",
851 852 "log",
@@ -891,6 +892,15 @@ dependencies = [
891 892 ]
892 893
893 894 [[package]]
895 name = "giterated-protocol"
896 version = "0.1.0"
897 dependencies = [
898 "giterated-stack",
899 "serde",
900 "tracing",
901 ]
902
903 [[package]]
894 904 name = "giterated-stack"
895 905 version = "0.1.0"
896 906 dependencies = [

Cargo.toml

View file
@@ -3,5 +3,6 @@ members = [
3 3 "giterated-daemon",
4 4 "giterated-models",
5 5 "giterated-stack",
6 "giterated-cache"
6 "giterated-cache",
7 "giterated-protocol"
7 8 ]
7 8 \ No newline at end of file

README.md

View file
@@ -92,7 +92,9 @@ allows for easy extension of the daemon.
92 92
93 93 ### Licensing Intent
94 94
95 Our intent is to foster a strong and compatible ecosystem of giterated frontend implementations, as well as foster the growth of services that are compatible with Giterated.
95 *Our declaration of intent is __**NOT**__ a promise, guarantee, or contract we are providing. Nothing stated in this section shall take precedent over the license, license text, or any other legally binding agreement. This is a plain english description of our intents, not a contract, license, or legally binding agreement.*
96
97 Our intent is to foster a strong and compatible ecosystem of giterated frontend implementations, as well as foster the growth of services that are compatible with Giterated. We want you to be able to link any part of Giterated with your proprietary or closed source solutions without concerns over license violations.
96 98
97 99
98 100 With that in mind, we have chosen to license core parts of the Giterated Daemon with the MPL-2.0. This is because there is no expected value provided to the community through closed-source versions of the core components, as they are generic. We felt that licenses such as the GPL and AGPL may make potential adopters uneasy, and we encourage the slight bit of extra flexibility the MPL-2.0 provides for adopters who are interested in making modifications. License aside, please consider upstreaming any improvements you identify.

giterated-daemon/Cargo.toml

View file
@@ -33,6 +33,7 @@ giterated-models = { path = "../giterated-models" }
33 33 giterated-api = { path = "../../giterated-api" }
34 34 giterated-stack = { path = "../giterated-stack" }
35 35 giterated-cache = { path = "../giterated-cache" }
36 giterated-protocol = { path = "../giterated-protocol" }
36 37 deadpool = "0.9"
37 38 bincode = "1.3"
38 39 tokio-util = {version = "0.7", features = ["rt"]}

giterated-daemon/src/authentication.rs

View file
@@ -6,12 +6,11 @@ use giterated_models::instance::Instance;
6 6 use giterated_models::user::User;
7 7
8 8 use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, TokenData, Validation};
9 use std::collections::HashMap;
9 10 use std::{sync::Arc, time::SystemTime};
10 11 use tokio::{fs::File, io::AsyncReadExt, sync::Mutex};
11 12 use toml::Table;
12 13
13 use crate::keys::PublicKeyCache;
14
15 14 pub struct AuthenticationTokenGranter {
16 15 pub config: Table,
17 16 pub instance: Instance,
@@ -161,3 +160,25 @@ impl AuthenticationTokenGranter {
161 160 Ok(Some(UserAuthenticationToken::from(token)))
162 161 }
163 162 }
163
164 #[derive(Default)]
165 pub struct PublicKeyCache {
166 pub keys: HashMap<Instance, String>,
167 }
168
169 impl PublicKeyCache {
170 pub async fn get(&mut self, instance: &Instance) -> Result<String, Error> {
171 if let Some(key) = self.keys.get(instance) {
172 Ok(key.clone())
173 } else {
174 let key = reqwest::get(format!("https://{}/.giterated/pubkey.pem", instance))
175 .await?
176 .text()
177 .await?;
178
179 self.keys.insert(instance.clone(), key);
180
181 Ok(self.keys.get(instance).unwrap().clone())
182 }
183 }
184 }

giterated-daemon/src/client.rs

View file
@@ -0,0 +1,103 @@
1 use std::sync::Arc;
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_models::{
5 authenticated::AuthenticatedPayload,
6 error::{IntoInternalError, OperationError},
7 instance::Instance,
8 object_backend::ObjectBackend,
9 };
10 use giterated_protocol::{NetworkedObject, NetworkedOperation};
11 use giterated_stack::{GiteratedStack, StackOperationState};
12 use tokio::net::TcpStream;
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 pub async fn client_wrapper(
16 our_instance: Instance,
17 mut socket: WebSocketStream<TcpStream>,
18 runtime: Arc<GiteratedStack>,
19 ) {
20 loop {
21 let message = socket.next().await;
22
23 if message.is_none() {
24 // Keep an eye out for this, I dont see why we shouldn't end the connection
25 unreachable!()
26 }
27
28 let message = message.unwrap();
29
30 let payload = match message {
31 Ok(message) => {
32 let payload = match message {
33 Message::Binary(payload) => payload,
34 Message::Ping(_) => {
35 let _ = socket.send(Message::Pong(vec![])).await;
36 continue;
37 }
38 Message::Close(_) => return,
39 _ => continue,
40 };
41
42 payload
43 }
44 Err(err) => {
45 // Connection error
46 warn!("A connection error has occured: {:?}", err);
47
48 return;
49 }
50 };
51
52 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
53 Ok(payload) => payload,
54 Err(e) => {
55 warn!(
56 "A network payload deserialization failure has occurred: {:?}",
57 e
58 );
59
60 continue;
61 }
62 };
63
64 let operation_state = StackOperationState {
65 our_instance: our_instance.clone(),
66 runtime: runtime.clone(),
67 instance: None,
68 user: None,
69 };
70
71 let result = handle_client_message(payload, operation_state, runtime.clone()).await;
72
73 // Grab operation errors so we can log them, they don't make it across the network
74 if let Err(OperationError::Internal(internal_error)) = &result {
75 error!("An internal error has occurred:\n{:?}", internal_error);
76 }
77
78 // Map error to the network variant
79 let result = result.map_err(|e| e.into_network());
80
81 socket
82 .send(Message::Binary(bincode::serialize(&result).unwrap()))
83 .await
84 .expect("there was an error sending a message, this is a problem for the receiver");
85 }
86 }
87
88 pub async fn handle_client_message(
89 payload: AuthenticatedPayload,
90 operation_state: StackOperationState,
91 runtime: Arc<GiteratedStack>,
92 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
93 let mut networked_object = runtime
94 .get_object::<NetworkedObject>(&payload.object, &operation_state)
95 .await
96 .as_internal_error_with_context("handling client message")?;
97
98 let networked_operation = NetworkedOperation::new(payload.object, payload.payload);
99
100 networked_object
101 .request(networked_operation, &operation_state)
102 .await
103 }

giterated-daemon/src/lib.rs

View file
@@ -3,13 +3,9 @@ use std::str::FromStr;
3 3 use semver::{Version, VersionReq};
4 4
5 5 pub mod authentication;
6 pub mod authorization;
7 6 pub mod backend;
8 pub mod cache_backend;
9 pub mod connection;
7 pub mod client;
10 8 pub mod database_backend;
11 pub mod federation;
12 pub mod keys;
13 9
14 10 #[macro_use]
15 11 extern crate tracing;

giterated-daemon/src/main.rs

View file
@@ -1,19 +1,17 @@
1 1 use anyhow::Error;
2 use connection::{Connections, RawConnection};
3 2 use giterated_cache::CacheSubstack;
4 3 use giterated_daemon::{
5 4 authentication::AuthenticationTokenGranter,
6 5 backend::{
7 6 git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend,
8 7 },
9 connection::{self, wrapper::connection_wrapper},
8 client::client_wrapper,
10 9 database_backend::DatabaseBackend,
11 federation::connections::InstanceConnections,
12 10 };
13 11
14 12 use giterated_models::instance::Instance;
15 13
16 use giterated_stack::{GiteratedStack, StackOperationState};
14 use giterated_stack::GiteratedStack;
17 15 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
18 16 use std::{net::SocketAddr, str::FromStr, sync::Arc};
19 17 use tokio::{
@@ -39,8 +37,6 @@ async fn main() -> Result<(), Error> {
39 37 text.parse()?
40 38 };
41 39 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
42 let connections: Arc<Mutex<Connections>> = Arc::default();
43 let instance_connections: Arc<Mutex<InstanceConnections>> = Arc::default();
44 40 let db_conn_options = PgConnectOptions::new()
45 41 .host(config["postgres"]["host"].as_str().unwrap())
46 42 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
@@ -110,16 +106,6 @@ async fn main() -> Result<(), Error> {
110 106 .set(runtime.clone())
111 107 .expect("failed to store global daemon stack");
112 108
113 let operation_state = {
114 StackOperationState {
115 our_instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
116 .unwrap(),
117 runtime: runtime.clone(),
118 instance: None,
119 user: None,
120 }
121 };
122
123 109 let pool = LocalPoolHandle::new(5);
124 110
125 111 loop {
@@ -150,38 +136,12 @@ async fn main() -> Result<(), Error> {
150 136 };
151 137
152 138 info!("Websocket connection established with {}", address);
153 let connections_cloned = connections.clone();
154 let repository_backend = repository_backend.clone();
155 let user_backend = user_backend.clone();
156 let token_granter = token_granter.clone();
157 let settings = settings.clone();
158 let instance_connections = instance_connections.clone();
159 let config = config.clone();
139
140 let our_instance =
141 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap();
160 142 let runtime = runtime.clone();
161 let operation_state = operation_state.clone();
162
163 pool.spawn_pinned(move || {
164 connection_wrapper(
165 connection,
166 connections_cloned,
167 repository_backend,
168 user_backend,
169 token_granter,
170 settings,
171 address,
172 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
173 instance_connections,
174 config,
175 runtime,
176 operation_state,
177 )
178 });
179
180 let connection = RawConnection {
181 task: tokio::spawn(async move { () }),
182 };
183 143
184 connections.lock().await.connections.push(connection);
144 pool.spawn_pinned(move || client_wrapper(our_instance, connection, runtime));
185 145 }
186 146 }
187 147

giterated-protocol/Cargo.toml

View file
@@ -13,3 +13,6 @@ keywords = ["giterated"]
13 13 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
14 14
15 15 [dependencies]
16 giterated-stack = {path = "../giterated-stack" }
17 serde = { version = "1.0.188", features = [ "derive" ]}
18 tracing = "0.1"
18 \ No newline at end of file

giterated-protocol/src/lib.rs

View file
@@ -1,14 +1,3 @@
1 pub fn add(left: usize, right: usize) -> usize {
2 left + right
3 }
1 mod substack;
4 2
5 #[cfg(test)]
6 mod tests {
7 use super::*;
8
9 #[test]
10 fn it_works() {
11 let result = add(2, 2);
12 assert_eq!(result, 4);
13 }
14 }
3 pub use substack::{NetworkedObject, NetworkedOperation, NetworkedSubstack};

giterated-protocol/src/substack.rs

View file
@@ -0,0 +1,245 @@
1 use std::{fmt::Display, str::FromStr, sync::Arc};
2
3 use giterated_stack::{
4 models::{Error, GiteratedObject, GiteratedOperation, IntoInternalError, OperationError},
5 AnyFailure, AnyObject, AnyOperation, AnySuccess, GiteratedStack, ObjectOperationPair,
6 StackOperationState, SubstackBuilder,
7 };
8 use serde::{Deserialize, Serialize};
9 use tracing::{trace, warn};
10
11 /// A Giterated substack that attempts to resolve with a remote, networked Giterated Daemon.
12 ///
13 /// # Usage
14 ///
15 /// Convert the [`NetworkedSubstack`] into a [`SubStackBuilder<NetworkedSubstack>`] and merge it with
16 /// a runtime.
17 ///
18 /// ```
19 /// let mut runtime = GiteratedStack::default();
20 ///
21 /// let network_substack = NetworkedSubstack::default();
22 ///
23 /// runtime.merge_builder(network_substack.into_substack());
24 /// ```
25 ///
26 /// To handle messages that are sourced from the network, use [`NetworkedObject`] and [`NetworkedOperation`].
27 ///
28 /// These are wrappers around the raw payloads from the network. The return payload from handling [`NetworkedOperation`] is then
29 /// sent back to the requester.
30 ///
31 /// ```
32 /// // Start with a network payload
33 /// let network_payload: AuthenticatedPayload = { todo!() };
34 ///
35 /// let networked_object = runtime.get_object::<NetworkedObject>(network_payload.object).await?;
36 /// let operation_name = payload.operation;
37 /// let networked_operation = NetworkedOperation(payload);
38 ///
39 /// // Operation state depends on the authentication in the payload, it
40 /// // isn't relevant here.
41 /// let operation_state = StackOperationState::default();
42 ///
43 /// let result = networked_object.request(networked_operation, &operation_state);
44 ///
45 /// // `result` is Result<Vec<u8>, OperationError<Vec<u8>> which is also the type that
46 /// // giterated's networked protocol uses for responses, so you can send it directly.
47 /// ```
48 ///
49 /// TODO: The above docs are 100% false about the network protocol type
50 #[derive(Clone)]
51 pub struct NetworkedSubstack {
52 home_uri: Option<String>,
53 }
54
55 impl Default for NetworkedSubstack {
56 fn default() -> Self {
57 todo!()
58 }
59 }
60
61 impl NetworkedSubstack {
62 pub fn into_substack(self) -> SubstackBuilder<Self> {
63 let mut stack = SubstackBuilder::new(self);
64
65 stack.operation(handle_network_operation);
66
67 // TODO: optional
68 stack.dynamic_operation(try_handle_with_remote);
69
70 stack
71 }
72 }
73
74 pub async fn handle_network_operation(
75 object: NetworkedObject,
76 operation: NetworkedOperation,
77 _state: NetworkedSubstack,
78 operation_state: StackOperationState,
79 stack: Arc<GiteratedStack>,
80 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
81 trace!("Handle network operation");
82 let mut result = None;
83
84 for (_, object_meta) in &stack.metadata.objects {
85 if let Ok(object) = (object_meta.from_str)(&object.0) {
86 // TODO: This is definitely going to resolve us
87 result = Some((object, object_meta));
88 break;
89 }
90 }
91
92 let (object, object_meta) = result.ok_or_else(|| OperationError::Unhandled)?;
93
94 trace!(
95 "Resolved object type {} for network operation.",
96 object_meta.name
97 );
98
99 let operation_meta = stack
100 .metadata
101 .operations
102 .get(&ObjectOperationPair {
103 object_name: &object_meta.name,
104 operation_name: &operation.name,
105 })
106 .ok_or_else(|| OperationError::Unhandled)?;
107
108 trace!(
109 "Resolved operation {}::{} for network operation.",
110 object_meta.name,
111 operation_meta.name
112 );
113
114 let operation = (operation_meta.deserialize)(&operation.payload)
115 .as_internal_error_with_context(format!(
116 "deserializing object operation {}::{}",
117 object_meta.name, operation_meta.name
118 ))?;
119
120 trace!(
121 "Deserialized operation {}::{} for network operation.",
122 object_meta.name,
123 operation_meta.name
124 );
125
126 let result = stack
127 .new_operation_func(object, operation, operation_state)
128 .await;
129
130 match result {
131 Ok(success) => Ok((operation_meta.serialize_success)(success)
132 .as_internal_error_with_context(format!(
133 "serializing success for object operation {}::{}",
134 object_meta.name, operation_meta.name
135 ))?),
136 Err(err) => Err(match err {
137 OperationError::Operation(failure) => OperationError::Operation(
138 (operation_meta.serialize_error)(failure).as_internal_error_with_context(
139 format!(
140 "serializing error for object operation {}::{}",
141 object_meta.name, operation_meta.name
142 ),
143 )?,
144 ),
145 OperationError::Internal(internal) => OperationError::Internal(internal),
146 OperationError::Unhandled => OperationError::Unhandled,
147 }),
148 }
149 }
150
151 #[derive(Clone, Debug, Serialize, Deserialize)]
152 pub struct NetworkedObject(pub String);
153
154 impl FromStr for NetworkedObject {
155 type Err = ();
156
157 fn from_str(_s: &str) -> Result<Self, Self::Err> {
158 todo!()
159 }
160 }
161
162 impl Display for NetworkedObject {
163 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 todo!()
165 }
166 }
167
168 impl GiteratedObject for NetworkedObject {
169 fn object_name() -> &'static str {
170 todo!()
171 }
172
173 fn from_object_str(_object_str: &str) -> Result<Self, Error> {
174 todo!()
175 }
176 }
177
178 #[derive(Clone, Debug, Serialize, Deserialize)]
179 pub struct NetworkedOperation {
180 name: String,
181 payload: Vec<u8>,
182 }
183
184 impl NetworkedOperation {
185 pub fn new(_name: String, _payload: Vec<u8>) -> Self {
186 todo!()
187 }
188 }
189
190 impl GiteratedOperation<NetworkedObject> for NetworkedOperation {
191 type Success = Vec<u8>;
192
193 type Failure = Vec<u8>;
194
195 fn operation_name() -> &'static str {
196 "network_operation"
197 }
198 }
199
200 /// Handler which will attempt to resolve any operation that doesn't resolve locally
201 /// against a remote instance.
202 pub async fn try_handle_with_remote(
203 object: AnyObject,
204 operation: AnyOperation,
205 state: NetworkedSubstack,
206 _operation_state: StackOperationState,
207 stack: Arc<GiteratedStack>,
208 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
209 trace!(
210 "Try handling object operation {}::{} with remote",
211 object.kind(),
212 operation.kind().operation_name
213 );
214 // TODO:
215 // Ideally we support pass-through on object types that aren't used locally.
216 // For now, we aren't worrying about that.
217 let object_meta = stack
218 .metadata
219 .objects
220 .get(object.kind())
221 .ok_or_else(|| OperationError::Unhandled)?;
222
223 let _operation_meta = stack
224 .metadata
225 .operations
226 .get(&operation.kind())
227 .ok_or_else(|| OperationError::Unhandled)?;
228
229 let object_home_uri = (object_meta.home_uri)(object.clone());
230
231 if let Some(home_uri) = state.home_uri {
232 if home_uri == object_home_uri {
233 // This isn't a remote request, requests aren't supposed to hit this layer
234 // if they're not remote.
235 warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(),
236 operation.kind().operation_name);
237
238 return Err(OperationError::Unhandled);
239 }
240 }
241
242 // Blah blah connect and do the stuff
243
244 todo!()
245 }

giterated-stack/src/lib.rs

View file
@@ -12,6 +12,14 @@ pub use substack::*;
12 12 pub mod state;
13 13 pub mod update;
14 14
15 // Temp pub use to figure out what's important
16 pub mod models {
17 pub use anyhow::Error;
18 pub use giterated_models::error::{IntoInternalError, OperationError};
19 pub use giterated_models::object::GiteratedObject;
20 pub use giterated_models::operation::GiteratedOperation;
21 }
22
15 23 use std::{any::Any, convert::Infallible, ops::Deref, sync::Arc};
16 24
17 25 use core::fmt::Debug;

giterated-stack/src/meta/mod.rs

View file
@@ -237,12 +237,14 @@ pub struct ObjectMeta {
237 237 pub name: String,
238 238 pub to_str: Box<dyn Fn(AnyObject) -> String + Send + Sync>,
239 239 pub from_str: Box<dyn Fn(&str) -> Result<AnyObject, ()> + Send + Sync>,
240 pub home_uri: fn(AnyObject) -> String,
240 241 pub any_is_same: fn(&dyn Any) -> bool,
241 242 }
242 243
243 244 pub trait IntoObjectMeta: FromStr {
244 245 fn name() -> String;
245 246 fn any_is_same(other: &dyn Any) -> bool;
247 fn home_uri(object: AnyObject) -> String;
246 248 }
247 249
248 250 impl<O: GiteratedObject + 'static> IntoObjectMeta for O {
@@ -253,6 +255,10 @@ impl<O: GiteratedObject + 'static> IntoObjectMeta for O {
253 255 fn any_is_same(other: &dyn Any) -> bool {
254 256 other.is::<O>()
255 257 }
258
259 fn home_uri(_object: AnyObject) -> String {
260 todo!()
261 }
256 262 }
257 263
258 264 impl ObjectMeta {
@@ -270,6 +276,7 @@ impl ObjectMeta {
270 276 object.to_string()
271 277 }),
272 278 any_is_same: I::any_is_same,
279 home_uri: I::home_uri,
273 280 }
274 281 }
275 282 }

giterated-stack/src/stack.rs

View file
@@ -1149,6 +1149,18 @@ impl ObjectBackend<StackOperationState> for Arc<GiteratedStack> {
1149 1149 }
1150 1150 }
1151 1151
1152 // Placeholder
1153 impl GiteratedStack {
1154 pub async fn new_operation_func(
1155 &self,
1156 _object: AnyObject,
1157 _operation: AnyOperation,
1158 _operation_state: StackOperationState,
1159 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
1160 todo!()
1161 }
1162 }
1163
1152 1164 /// Defines a type that is a valid Giterated runtime state.
1153 1165 ///
1154 1166 /// This allows for extraction of state in handlers, based on a

giterated-stack/src/substack.rs

View file
@@ -313,6 +313,12 @@ impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S> {
313 313 }
314 314 }
315 315
316 // Placeholder
317 impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S> {
318 pub fn dynamic_operation<H>(&mut self, _handler: H) -> &mut Self {
319 todo!()
320 }
321 }
316 322 #[derive(Debug, Clone, thiserror::Error)]
317 323 #[error("downcast error")]
318 324 pub struct DowncastError;