JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

The long awaited, exhalted huge networking stack change.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨21b6a72

Showing ⁨⁨17⁩ changed files⁩ with ⁨⁨830⁩ insertions⁩ and ⁨⁨218⁩ deletions⁩

Cargo.lock

View file
@@ -820,11 +820,17 @@ dependencies = [
820 820 name = "giterated-protocol"
821 821 version = "0.1.0"
822 822 dependencies = [
823 "async-trait",
824 "bincode",
825 "futures-util",
823 826 "giterated-stack",
824 827 "rand",
825 828 "rsa",
826 829 "serde",
827 830 "serde_json",
831 "thiserror",
832 "tokio",
833 "tokio-tungstenite",
828 834 "tracing",
829 835 ]
830 836

giterated-daemon/src/client.rs

View file
@@ -46,6 +46,8 @@ pub async fn client_wrapper(
46 46 }
47 47 };
48 48
49 trace!("Read payload from client");
50
49 51 let payload = match bincode::deserialize::<AuthenticatedPayload>(&payload) {
50 52 Ok(payload) => payload,
51 53 Err(e) => {
@@ -58,6 +60,11 @@ pub async fn client_wrapper(
58 60 }
59 61 };
60 62
63 trace!(
64 "Deserialized payload for operation {} from client",
65 payload.operation
66 );
67
61 68 let operation_state = StackOperationState {
62 69 our_instance: our_instance.clone(),
63 70 runtime: runtime.clone(),
@@ -92,7 +99,15 @@ pub async fn handle_client_message(
92 99 .await
93 100 .as_internal_error_with_context("handling client message")?;
94 101
95 let networked_operation = NetworkedOperation::new(payload.object, payload.payload);
102 let message: giterated_protocol::GiteratedMessage<NetworkedObject, NetworkedOperation> =
103 payload.into_message();
104
105 let networked_operation = NetworkedOperation::new(
106 message.payload.name.clone(),
107 message.payload.payload.clone(),
108 );
109
110 trace!("Calling handler for networked operation");
96 111
97 112 networked_object
98 113 .request(networked_operation, &operation_state)

giterated-daemon/src/main.rs

View file
@@ -11,6 +11,7 @@ use giterated_daemon::{
11 11
12 12 use giterated_models::instance::Instance;
13 13
14 use giterated_protocol::NetworkedSubstack;
14 15 use giterated_stack::GiteratedStackBuilder;
15 16 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
16 17 use std::{net::SocketAddr, str::FromStr, sync::Arc};
@@ -100,6 +101,15 @@ async fn main() -> Result<(), Error> {
100 101 let cache_backend = CacheSubstack::default();
101 102 runtime.merge_builder(cache_backend.into_substack());
102 103
104 let networked_stack = NetworkedSubstack {
105 home_uri: Some(
106 Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
107 .unwrap()
108 .0,
109 ),
110 };
111 runtime.merge_builder(networked_stack.into_server_substack());
112
103 113 let runtime = runtime.finish();
104 114
105 115 stack_cell

giterated-models/src/instance/mod.rs

View file
@@ -45,6 +45,10 @@ impl GiteratedObject for Instance {
45 45 fn from_object_str(object_str: &str) -> Result<Self, anyhow::Error> {
46 46 Ok(Instance::from_str(object_str).unwrap())
47 47 }
48
49 fn home_uri(&self) -> String {
50 self.0.clone()
51 }
48 52 }
49 53
50 54 impl Display for Instance {

giterated-models/src/object.rs

View file
@@ -58,6 +58,7 @@ impl<
58 58
59 59 pub trait GiteratedObject: Send + Display + FromStr + Sync + Clone {
60 60 fn object_name() -> &'static str;
61 fn home_uri(&self) -> String;
61 62
62 63 fn from_object_str(object_str: &str) -> Result<Self, Error>;
63 64 }

giterated-models/src/object/operations.rs

View file
@@ -37,6 +37,10 @@ impl GiteratedObject for NetworkAnyObject {
37 37 fn from_object_str(object_str: &str) -> Result<Self, anyhow::Error> {
38 38 Ok(Self(object_str.to_string()))
39 39 }
40
41 fn home_uri(&self) -> String {
42 todo!()
43 }
40 44 }
41 45
42 46 impl Display for NetworkAnyObject {

giterated-models/src/repository/mod.rs

View file
@@ -63,6 +63,10 @@ impl GiteratedObject for Repository {
63 63 fn from_object_str(object_str: &str) -> Result<Self, anyhow::Error> {
64 64 Ok(Repository::from_str(object_str)?)
65 65 }
66
67 fn home_uri(&self) -> String {
68 self.instance.home_uri()
69 }
66 70 }
67 71
68 72 impl TryFrom<String> for Repository {

giterated-models/src/user/mod.rs

View file
@@ -49,6 +49,10 @@ impl GiteratedObject for User {
49 49 fn from_object_str(object_str: &str) -> Result<Self, anyhow::Error> {
50 50 Ok(User::from_str(object_str)?)
51 51 }
52
53 fn home_uri(&self) -> String {
54 self.instance.home_uri()
55 }
52 56 }
53 57
54 58 impl Display for User {

giterated-protocol/Cargo.toml

View file
@@ -18,4 +18,10 @@ serde = { version = "1.0.188", features = [ "derive" ]}
18 18 tracing = "0.1"
19 19 rand = "0.8"
20 20 rsa = {version = "0.9", features = ["sha2"]}
21 serde_json = "1.0"
21 \ No newline at end of file
21 serde_json = "1.0"
22 async-trait = "0.1"
23 tokio-tungstenite = { version = "0.20" }
24 tokio = { version = "1.32.0", features = ["full"] }
25 thiserror = "1"
26 bincode = "1.3"
27 futures-util = "0.3"

giterated-protocol/src/lib.rs

View file
@@ -9,6 +9,7 @@ use giterated_stack::models::{
9 9 Error, GiteratedObject, GiteratedOperation, Instance, InstanceSignature, User,
10 10 UserAuthenticationToken,
11 11 };
12 use giterated_stack::{GiteratedStack, HandlerResolvable, MissingValue};
12 13 use rsa::pkcs1::DecodeRsaPrivateKey;
13 14 use rsa::pkcs1v15::SigningKey;
14 15 use rsa::sha2::Sha256;
@@ -17,12 +18,22 @@ use rsa::RsaPrivateKey;
17 18 use serde::{Deserialize, Serialize};
18 19 pub use substack::{NetworkedObject, NetworkedOperation, NetworkedSubstack};
19 20
20 #[derive(Clone, Default)]
21 #[derive(Clone)]
21 22 pub struct NetworkOperationState {
23 runtime: GiteratedStack<NetworkOperationState>,
22 24 authentication: Vec<Arc<dyn AuthenticationSourceProviders + Send + Sync>>,
23 25 }
24 26
25 27 impl NetworkOperationState {
28 pub fn new(stack: GiteratedStack<NetworkOperationState>) -> Self {
29 Self {
30 runtime: stack,
31 authentication: vec![],
32 }
33 }
34 }
35
36 impl NetworkOperationState {
26 37 pub fn authenticate(
27 38 &mut self,
28 39 provider: impl AuthenticationSourceProviders + Send + Sync + 'static,
@@ -40,11 +51,11 @@ pub struct AuthenticatedPayload {
40 51 }
41 52
42 53 impl AuthenticatedPayload {
43 pub fn into_message(self) -> GiteratedMessage<NetworkAnyObject, NetworkAnyOperation> {
54 pub fn into_message(self) -> GiteratedMessage<NetworkedObject, NetworkedOperation> {
44 55 GiteratedMessage {
45 object: NetworkAnyObject(self.object),
56 object: NetworkedObject::from_str(&self.object).unwrap(),
46 57 operation: self.operation,
47 payload: NetworkAnyOperation(self.payload),
58 payload: serde_json::from_slice(&self.payload).unwrap(),
48 59 }
49 60 }
50 61 }
@@ -199,12 +210,16 @@ pub struct NetworkAnyObject(pub String);
199 210
200 211 impl GiteratedObject for NetworkAnyObject {
201 212 fn object_name() -> &'static str {
202 "any"
213 "network_object"
203 214 }
204 215
205 216 fn from_object_str(object_str: &str) -> Result<Self, Error> {
206 217 Ok(Self(object_str.to_string()))
207 218 }
219
220 fn home_uri(&self) -> String {
221 todo!()
222 }
208 223 }
209 224
210 225 impl Display for NetworkAnyObject {
@@ -231,3 +246,29 @@ impl<O: GiteratedObject> GiteratedOperation<O> for NetworkAnyOperation {
231 246
232 247 type Failure = Vec<u8>;
233 248 }
249
250 #[async_trait::async_trait(?Send)]
251 impl<R1> HandlerResolvable<(R1,), NetworkOperationState> for GiteratedStack<NetworkOperationState> {
252 type Error = MissingValue;
253
254 async fn from_handler_state(
255 _required_parameters: &(R1,),
256 operation_state: &NetworkOperationState,
257 ) -> Result<Self, Self::Error> {
258 Ok(operation_state.runtime.clone())
259 }
260 }
261
262 #[async_trait::async_trait(?Send)]
263 impl<R1, R2> HandlerResolvable<(R1, R2), NetworkOperationState>
264 for GiteratedStack<NetworkOperationState>
265 {
266 type Error = MissingValue;
267
268 async fn from_handler_state(
269 _required_parameters: &(R1, R2),
270 operation_state: &NetworkOperationState,
271 ) -> Result<Self, Self::Error> {
272 Ok(operation_state.runtime.clone())
273 }
274 }

giterated-protocol/src/substack.rs

View file
@@ -1,14 +1,22 @@
1 use std::{fmt::Display, str::FromStr, sync::Arc};
1 use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
2 2
3 use futures_util::{sink::SinkExt, StreamExt};
3 4 use giterated_stack::{
4 models::{Error, GiteratedObject, GiteratedOperation, IntoInternalError, OperationError},
5 AnyFailure, AnyObject, AnyOperation, AnySuccess, GiteratedStack, GiteratedStackInner,
6 ObjectOperationPair, OperationState, StackOperationState, SubstackBuilder,
5 models::{
6 Error, GiteratedObject, GiteratedOperation, Instance, IntoInternalError,
7 NetworkOperationError, OperationError,
8 },
9 AnyFailure, AnyObject, AnyOperation, AnySuccess, GiteratedStack, ObjectOperationPair,
10 OperationState, StackOperationState, SubstackBuilder,
7 11 };
8 12 use serde::{Deserialize, Serialize};
9 use tracing::{trace, warn};
13 use tokio::net::TcpStream;
14 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
15 use tracing::{info, trace, warn};
10 16
11 use crate::NetworkOperationState;
17 use crate::{
18 AuthenticatedPayload, AuthenticationSourceProviders, GiteratedMessage, NetworkOperationState,
19 };
12 20
13 21 /// A Giterated substack that attempts to resolve with a remote, networked Giterated Daemon.
14 22 ///
@@ -51,7 +59,7 @@ use crate::NetworkOperationState;
51 59 /// TODO: The above docs are 100% false about the network protocol type
52 60 #[derive(Clone)]
53 61 pub struct NetworkedSubstack {
54 home_uri: Option<String>,
62 pub home_uri: Option<String>,
55 63 }
56 64
57 65 impl Default for NetworkedSubstack {
@@ -64,7 +72,8 @@ impl NetworkedSubstack {
64 72 pub fn into_server_substack(self) -> SubstackBuilder<Self, StackOperationState> {
65 73 let mut stack = SubstackBuilder::new(self);
66 74
67 stack.operation(handle_network_operation::<StackOperationState>);
75 stack.object::<NetworkedObject>();
76 stack.operation(handle_network_operation);
68 77
69 78 // TODO: optional
70 79 stack.dynamic_operation(try_handle_with_remote);
@@ -73,7 +82,10 @@ impl NetworkedSubstack {
73 82 }
74 83
75 84 pub fn into_client_substack(self) -> SubstackBuilder<Self, NetworkOperationState> {
76 let mut stack = SubstackBuilder::new(self);
85 let mut stack: SubstackBuilder<NetworkedSubstack, NetworkOperationState> =
86 SubstackBuilder::new(self);
87
88 stack.object::<NetworkedObject>();
77 89
78 90 // TODO: optional
79 91 stack.dynamic_operation(try_handle_with_remote);
@@ -89,12 +101,15 @@ pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
89 101 OperationState(operation_state): OperationState<OS>,
90 102 stack: GiteratedStack<OS>,
91 103 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
92 trace!("Handle network operation");
104 trace!("Handle network operation {}", operation.name);
93 105 let mut result = None;
94 106
95 107 for (_, object_meta) in &stack.inner.metadata.objects {
108 if object_meta.name == NetworkedObject::object_name() {
109 continue;
110 }
111
96 112 if let Ok(object) = (object_meta.from_str)(&object.0) {
97 // TODO: This is definitely going to resolve us
98 113 result = Some((object, object_meta));
99 114 break;
100 115 }
@@ -103,8 +118,9 @@ pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
103 118 let (object, object_meta) = result.ok_or_else(|| OperationError::Unhandled)?;
104 119
105 120 trace!(
106 "Resolved object type {} for network operation.",
107 object_meta.name
121 "Resolved object type {} for network operation {}.",
122 object_meta.name,
123 operation.name
108 124 );
109 125
110 126 let operation_meta = stack
@@ -123,6 +139,7 @@ pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
123 139 operation_meta.name
124 140 );
125 141
142 info!("Operation: {:?}", operation.payload);
126 143 let operation = (operation_meta.deserialize)(&operation.payload)
127 144 .as_internal_error_with_context(format!(
128 145 "deserializing object operation {}::{}",
@@ -140,23 +157,48 @@ pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
140 157 .await;
141 158
142 159 match result {
143 Ok(success) => Ok((operation_meta.serialize_success)(success)
144 .as_internal_error_with_context(format!(
145 "serializing success for object operation {}::{}",
146 object_meta.name, operation_meta.name
147 ))?),
148 Err(err) => Err(match err {
149 OperationError::Operation(failure) => OperationError::Operation(
150 (operation_meta.serialize_error)(failure).as_internal_error_with_context(
160 Ok(success) => {
161 trace!(
162 "Network operation {}::{} was successful",
163 object_meta.name,
164 operation_meta.name
165 );
166
167 Ok(
168 (operation_meta.serialize_success)(success).as_internal_error_with_context(
151 169 format!(
152 "serializing error for object operation {}::{}",
170 "serializing success for object operation {}::{}",
153 171 object_meta.name, operation_meta.name
154 172 ),
155 173 )?,
156 ),
157 OperationError::Internal(internal) => OperationError::Internal(internal),
158 OperationError::Unhandled => OperationError::Unhandled,
159 }),
174 )
175 }
176 Err(err) => {
177 trace!(
178 "Network operation {}::{} failed",
179 object_meta.name,
180 operation_meta.name
181 );
182 Err(match err {
183 OperationError::Operation(failure) => OperationError::Operation(
184 (operation_meta.serialize_error)(failure).as_internal_error_with_context(
185 format!(
186 "serializing error for object operation {}::{}",
187 object_meta.name, operation_meta.name
188 ),
189 )?,
190 ),
191 OperationError::Internal(internal) => {
192 warn!(
193 "A networked operation encountered an internal error: {:#?}",
194 internal
195 );
196
197 OperationError::Internal(internal)
198 }
199 OperationError::Unhandled => OperationError::Unhandled,
200 })
201 }
160 202 }
161 203 }
162 204
@@ -166,36 +208,40 @@ pub struct NetworkedObject(pub String);
166 208 impl FromStr for NetworkedObject {
167 209 type Err = ();
168 210
169 fn from_str(_s: &str) -> Result<Self, Self::Err> {
170 todo!()
211 fn from_str(s: &str) -> Result<Self, Self::Err> {
212 Ok(NetworkedObject(s.to_string()))
171 213 }
172 214 }
173 215
174 216 impl Display for NetworkedObject {
175 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 todo!()
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 f.write_str(&self.0)
177 219 }
178 220 }
179 221
180 222 impl GiteratedObject for NetworkedObject {
181 223 fn object_name() -> &'static str {
182 todo!()
224 "networked_object"
183 225 }
184 226
185 227 fn from_object_str(_object_str: &str) -> Result<Self, Error> {
186 228 todo!()
187 229 }
230
231 fn home_uri(&self) -> String {
232 todo!()
233 }
188 234 }
189 235
190 236 #[derive(Clone, Debug, Serialize, Deserialize)]
191 237 pub struct NetworkedOperation {
192 name: String,
193 payload: Vec<u8>,
238 pub name: String,
239 pub payload: Vec<u8>,
194 240 }
195 241
196 242 impl NetworkedOperation {
197 pub fn new(_name: String, _payload: Vec<u8>) -> Self {
198 todo!()
243 pub fn new(name: String, payload: Vec<u8>) -> Self {
244 Self { name, payload }
199 245 }
200 246 }
201 247
@@ -205,19 +251,21 @@ impl GiteratedOperation<NetworkedObject> for NetworkedOperation {
205 251 type Failure = Vec<u8>;
206 252
207 253 fn operation_name() -> &'static str {
208 "network_operation"
254 "networked_operation"
209 255 }
210 256 }
211 257
212 258 /// Handler which will attempt to resolve any operation that doesn't resolve locally
213 259 /// against a remote instance.
214 pub async fn try_handle_with_remote(
260 pub async fn try_handle_with_remote<OS>(
215 261 object: AnyObject,
216 262 operation: AnyOperation,
217 263 state: NetworkedSubstack,
218 _operation_state: StackOperationState,
219 stack: Arc<GiteratedStackInner>,
264 _stack: GiteratedStack<OS>,
220 265 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
266 if object.is::<NetworkedObject>() {
267 return Err(OperationError::Unhandled);
268 }
221 269 trace!(
222 270 "Try handling object operation {}::{} with remote",
223 271 object.kind(),
@@ -226,17 +274,15 @@ pub async fn try_handle_with_remote(
226 274 // TODO:
227 275 // Ideally we support pass-through on object types that aren't used locally.
228 276 // For now, we aren't worrying about that.
229 let object_meta = stack
230 .metadata
231 .objects
232 .get(object.kind())
233 .ok_or_else(|| OperationError::Unhandled)?;
277 let object_meta = object.meta().clone();
234 278
235 let _operation_meta = stack
236 .metadata
237 .operations
238 .get(&operation.kind())
239 .ok_or_else(|| OperationError::Unhandled)?;
279 let operation_meta = operation.meta().clone();
280
281 trace!(
282 "Serializing with {}::{}",
283 operation.kind().object_name,
284 operation.kind().operation_name
285 );
240 286
241 287 let object_home_uri = (object_meta.home_uri)(object.clone());
242 288
@@ -251,7 +297,191 @@ pub async fn try_handle_with_remote(
251 297 }
252 298 }
253 299
254 // Blah blah connect and do the stuff
300 trace!(
301 "Handling object operation {}::{} sending payload",
302 object.kind(),
303 operation.kind().operation_name
304 );
305
306 let object = NetworkedObject((object_meta.to_str)(object).to_string());
307 let operation = NetworkedOperation::new(
308 operation_meta.name.clone(),
309 (operation_meta.serialize)(operation.clone()).as_internal_error_with_context(format!(
310 "try serializing object operation {}::{} for remote",
311 object_meta.name, operation_meta.name
312 ))?,
313 );
314
315 // let authenticated = Authenticated::new(object, operation);
316
317 let message = GiteratedMessage {
318 object,
319 operation: NetworkedOperation::operation_name().to_string(),
320 payload: operation,
321 };
322
323 let authenticated = Authenticated::new(message);
324
325 let mut socket: WebSocketStream<MaybeTlsStream<TcpStream>> = connect_to(
326 &Instance::from_str(&object_home_uri).unwrap(),
327 &Some(("127.0.0.1:1111").parse().unwrap()),
328 )
329 .await
330 .as_internal_error()?;
331
332 // TODO AUTH
255 333
256 todo!()
334 let result: Result<Vec<u8>, OperationError<Vec<u8>>> =
335 send_expect(&mut socket, authenticated).await;
336
337 match result {
338 Ok(success) => {
339 let success = (operation_meta.deserialize_success)(success)
340 .as_internal_error_with_context(format!(
341 "try deserializing object operation success {}::{} for remote",
342 object_meta.name, operation_meta.name
343 ))?;
344
345 Ok(success)
346 }
347 Err(err) => Err(match err {
348 OperationError::Operation(failure) => {
349 let failure = (operation_meta.deserialize_failure)(failure)
350 .as_internal_error_with_context(format!(
351 "try deserializing object operation failure {}::{} for remote",
352 object_meta.name, operation_meta.name
353 ))?;
354
355 OperationError::Operation(failure)
356 }
357 OperationError::Internal(internal) => OperationError::Internal(internal),
358 OperationError::Unhandled => OperationError::Unhandled,
359 }),
360 }
361 }
362
363 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
364
365 async fn connect_to(
366 instance: &Instance,
367
368 socket_addr: &Option<SocketAddr>,
369 ) -> Result<Socket, Error> {
370 if let Some(addr) = socket_addr {
371 info!(
372 "Connecting to {}",
373 format!("ws://{}/.giterated/daemon/", addr)
374 );
375
376 let (websocket, _response) =
377 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
378
379 info!("Connection established with {}", addr);
380
381 Ok(websocket)
382 } else {
383 info!(
384 "Connecting to {}",
385 format!("wss://{}/.giterated/daemon/", instance.0)
386 );
387
388 let (websocket, _response) =
389 connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?;
390
391 info!("Connection established with {}", instance.0);
392
393 Ok(websocket)
394 }
395 }
396
397 async fn send_expect<O: GiteratedObject, D: GiteratedOperation<O>>(
398 socket: &mut Socket,
399 message: Authenticated<O, D>,
400 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
401 let payload = bincode::serialize(&message.into_payload()).unwrap();
402
403 socket
404 .send(Message::Binary(payload))
405 .await
406 .as_internal_error()?;
407
408 while let Some(message) = socket.next().await {
409 let payload = match message.as_internal_error()? {
410 Message::Binary(payload) => payload,
411
412 _ => {
413 continue;
414 }
415 };
416
417 let raw_result =
418 bincode::deserialize::<Result<Vec<u8>, NetworkOperationError<Vec<u8>>>>(&payload)
419 .map_err(|e| OperationError::Internal(Error::from(e)))?;
420
421 trace!(
422 "Received response for networked operation {}::{}.",
423 O::object_name(),
424 D::operation_name()
425 );
426
427 return match raw_result {
428 Ok(success) => Ok(success),
429 Err(err) => Err(match err {
430 NetworkOperationError::Operation(operation_error) => {
431 OperationError::Operation(operation_error)
432 }
433 NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()),
434 NetworkOperationError::Unhandled => OperationError::Unhandled,
435 }),
436 };
437 }
438
439 panic!()
440 }
441
442 #[derive(Debug, Clone, thiserror::Error)]
443 #[error("a remote internal error occurred")]
444 pub struct RemoteError;
445
446 #[derive(Debug, thiserror::Error)]
447 #[error("a remote internal error occurred")]
448
449 pub struct NetworkError;
450
451 #[derive(Debug)]
452 pub struct Authenticated<O: GiteratedObject, D: GiteratedOperation<O>> {
453 pub source: Vec<Arc<dyn AuthenticationSourceProviders + Send + Sync>>,
454 pub message: GiteratedMessage<O, D>,
455 }
456
457 impl<O: GiteratedObject, D: GiteratedOperation<O>> Authenticated<O, D> {
458 pub fn new(message: GiteratedMessage<O, D>) -> Self {
459 Self {
460 source: vec![],
461 message,
462 }
463 }
464
465 pub fn append_authentication(
466 &mut self,
467 authentication: Arc<dyn AuthenticationSourceProviders + Send + Sync>,
468 ) {
469 self.source.push(authentication);
470 }
471
472 pub fn into_payload(mut self) -> AuthenticatedPayload {
473 let payload = serde_json::to_vec(&self.message.payload).unwrap();
474
475 AuthenticatedPayload {
476 object: self.message.object.to_string(),
477 operation: self.message.operation,
478 source: self
479 .source
480 .drain(..)
481 .map(|provider| provider.as_ref().authenticate_all(&payload))
482 .flatten()
483 .collect::<Vec<_>>(),
484 payload,
485 }
486 }
257 487 }

giterated-stack/src/dynamic.rs

View file
@@ -5,11 +5,14 @@ use giterated_models::{
5 5 value::GiteratedObjectValue,
6 6 };
7 7
8 use crate::{ObjectOperationPair, ObjectSettingPair, ObjectValuePair, ValueMeta};
8 use crate::{
9 ObjectMeta, ObjectOperationPair, ObjectSettingPair, ObjectValuePair, OperationMeta, ValueMeta,
10 };
9 11
10 12 #[derive(Clone)]
11 13 pub struct AnyObject {
12 14 inner: Arc<dyn Any + Send + Sync>,
15 meta: Arc<ObjectMeta>,
13 16 kind: &'static str,
14 17 }
15 18
@@ -17,6 +20,7 @@ impl AnyObject {
17 20 pub fn new<O: GiteratedObject + 'static>(object: O) -> Self {
18 21 Self {
19 22 inner: Arc::new(object) as _,
23 meta: Arc::new(ObjectMeta::new::<O>()),
20 24 kind: O::object_name(),
21 25 }
22 26 }
@@ -28,6 +32,10 @@ impl AnyObject {
28 32 pub fn kind(&self) -> &'static str {
29 33 self.kind
30 34 }
35
36 pub fn meta(&self) -> &Arc<ObjectMeta> {
37 &self.meta
38 }
31 39 }
32 40
33 41 impl Deref for AnyObject {
@@ -41,13 +49,17 @@ impl Deref for AnyObject {
41 49 #[derive(Clone)]
42 50 pub struct AnyOperation {
43 51 inner: Arc<dyn Any + Send + Sync>,
52 meta: Arc<OperationMeta>,
44 53 kind: ObjectOperationPair<'static>,
45 54 }
46 55
47 56 impl AnyOperation {
48 pub fn new<O: GiteratedObject, D: GiteratedOperation<O> + 'static>(operation: D) -> Self {
57 pub fn new<O: GiteratedObject + 'static, D: GiteratedOperation<O> + 'static>(
58 operation: D,
59 ) -> Self {
49 60 Self {
50 61 inner: Arc::new(operation) as _,
62 meta: Arc::new(OperationMeta::new::<O, D>()),
51 63 kind: ObjectOperationPair::from_types::<O, D>(),
52 64 }
53 65 }
@@ -62,6 +74,10 @@ impl AnyOperation {
62 74 pub fn kind(&self) -> ObjectOperationPair<'static> {
63 75 self.kind
64 76 }
77
78 pub fn meta(&self) -> &Arc<OperationMeta> {
79 &self.meta
80 }
65 81 }
66 82
67 83 impl Deref for AnyOperation {

giterated-stack/src/handler/mod.rs

View file
@@ -213,7 +213,10 @@ impl<R> HandlerResolvable<R, StackOperationState> for AuthenticatedUser {
213 213 _required_parameters: &R,
214 214 operation_state: &StackOperationState,
215 215 ) -> Result<Self, Self::Error> {
216 operation_state.user.clone().ok_or_else(|| MissingValue)
216 operation_state
217 .user
218 .clone()
219 .ok_or_else(|| MissingValue("AuthenticatedUser"))
217 220 }
218 221 }
219 222
@@ -225,6 +228,9 @@ impl<R> HandlerResolvable<R, StackOperationState> for AuthenticatedInstance {
225 228 _required_parameters: &R,
226 229 operation_state: &StackOperationState,
227 230 ) -> Result<Self, Self::Error> {
228 operation_state.instance.clone().ok_or_else(|| MissingValue)
231 operation_state
232 .instance
233 .clone()
234 .ok_or_else(|| MissingValue("AuthenticatedInstance"))
229 235 }
230 236 }

giterated-stack/src/lib.rs

View file
@@ -17,7 +17,7 @@ pub mod update;
17 17 pub mod models {
18 18 pub use anyhow::Error;
19 19 pub use giterated_models::authenticated::*;
20 pub use giterated_models::error::{IntoInternalError, OperationError};
20 pub use giterated_models::error::{IntoInternalError, NetworkOperationError, OperationError};
21 21 pub use giterated_models::instance::Instance;
22 22 pub use giterated_models::object::GiteratedObject;
23 23 pub use giterated_models::operation::GiteratedOperation;
@@ -114,8 +114,8 @@ impl<O: GiteratedObject, D: GiteratedOperation<O>> FromOperationState<O, D>
114 114 }
115 115
116 116 #[derive(Debug, thiserror::Error, Serialize, Deserialize)]
117 #[error("missing value")]
118 pub struct MissingValue;
117 #[error("missing value {0}")]
118 pub struct MissingValue(&'static str);
119 119
120 120 #[async_trait::async_trait(?Send)]
121 121 impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationState<O, D>
@@ -128,7 +128,10 @@ impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationSt
128 128 _operation: &D,
129 129 state: &StackOperationState,
130 130 ) -> Result<AuthenticatedUser, ExtractorError<MissingValue>> {
131 state.user.clone().ok_or(ExtractorError(MissingValue))
131 state
132 .user
133 .clone()
134 .ok_or(ExtractorError(MissingValue("AuthenticatedUser")))
132 135 }
133 136 }
134 137
@@ -143,7 +146,10 @@ impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationSt
143 146 _operation: &D,
144 147 state: &StackOperationState,
145 148 ) -> Result<AuthenticatedInstance, ExtractorError<MissingValue>> {
146 state.instance.clone().ok_or(ExtractorError(MissingValue))
149 state
150 .instance
151 .clone()
152 .ok_or(ExtractorError(MissingValue("AuthenticatedInstance")))
147 153 }
148 154 }
149 155
@@ -208,7 +214,10 @@ impl AuthorizedOperation<User> for SetSetting {
208 214 authorize_for: &User,
209 215 operation_state: &StackOperationState,
210 216 ) -> Result<bool, ExtractorError<MissingValue>> {
211 let authenticated_user = operation_state.user.as_ref().ok_or(MissingValue)?;
217 let authenticated_user = operation_state
218 .user
219 .as_ref()
220 .ok_or(MissingValue("AuthenticatedUser"))?;
212 221
213 222 Ok(authorize_for == authenticated_user.deref())
214 223 }
@@ -223,7 +232,10 @@ impl AuthorizedOperation<User> for GetSetting {
223 232 authorize_for: &User,
224 233 operation_state: &StackOperationState,
225 234 ) -> Result<bool, ExtractorError<MissingValue>> {
226 let authenticated_user = operation_state.user.as_ref().ok_or(MissingValue)?;
235 let authenticated_user = operation_state
236 .user
237 .as_ref()
238 .ok_or(MissingValue("AuthenticatedUser"))?;
227 239
228 240 Ok(authorize_for == authenticated_user.deref())
229 241 }
@@ -241,7 +253,7 @@ impl AuthorizedOperation<Repository> for SetSetting {
241 253 let authenticated_user = operation_state
242 254 .user
243 255 .as_ref()
244 .ok_or_else(|| anyhow::Error::from(MissingValue))?;
256 .ok_or_else(|| anyhow::Error::from(MissingValue("AuthenticatedUser")))?;
245 257
246 258 let mut object = operation_state
247 259 .runtime
@@ -278,7 +290,7 @@ impl AuthorizedOperation<Repository> for GetSetting {
278 290 let authenticated_user = operation_state
279 291 .user
280 292 .as_ref()
281 .ok_or_else(|| anyhow::Error::from(MissingValue))?;
293 .ok_or_else(|| anyhow::Error::from(MissingValue("AuthenticatedUser")))?;
282 294
283 295 let mut object = operation_state
284 296 .runtime
@@ -466,13 +478,13 @@ where
466 478 pub struct OperationState<OS>(pub OS);
467 479
468 480 #[async_trait::async_trait(?Send)]
469 impl<P, OS> HandlerResolvable<P, OS> for OperationState<OS> {
481 impl<P, OS: Clone> HandlerResolvable<P, OS> for OperationState<OS> {
470 482 type Error = MissingValue;
471 483
472 484 async fn from_handler_state(
473 485 _required_parameters: &P,
474 _operation_state: &OS,
486 operation_state: &OS,
475 487 ) -> Result<Self, Self::Error> {
476 todo!()
488 Ok(Self(operation_state.clone()))
477 489 }
478 490 }

giterated-stack/src/meta/mod.rs

View file
@@ -1,11 +1,11 @@
1 use std::{any::Any, collections::HashMap, str::FromStr};
1 use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
2 2
3 3 use futures_util::{future::LocalBoxFuture, FutureExt};
4 4 use giterated_models::{
5 5 object::GiteratedObject,
6 6 operation::GiteratedOperation,
7 settings::Setting,
8 value::{GetValueTyped, GiteratedObjectValue},
7 settings::{SetSetting, Setting},
8 value::{GetValue, GetValueTyped, GiteratedObjectValue},
9 9 };
10 10 use serde_json::Value;
11 11 use tracing::trace;
@@ -91,6 +91,11 @@ impl RuntimeMetadata {
91 91 V::value_name()
92 92 );
93 93 }
94
95 self.operations.insert(
96 ObjectOperationPair::from_types::<O, GetValue>(),
97 OperationMeta::new::<O, GetValue>(),
98 );
94 99 }
95 100
96 101 pub fn register_setting<O: GiteratedObject + 'static, S: Setting + 'static + Clone>(&mut self) {
@@ -109,6 +114,11 @@ impl RuntimeMetadata {
109 114 } else {
110 115 trace!("Registration of setting {}.", S::name());
111 116 }
117
118 self.operations.insert(
119 ObjectOperationPair::from_types::<O, SetSetting>(),
120 OperationMeta::new::<O, SetSetting>(),
121 );
112 122 }
113 123
114 124 pub fn append(&mut self, other: Self) {
@@ -176,10 +186,13 @@ impl ValueMeta {
176 186 pub struct OperationMeta {
177 187 pub name: String,
178 188 pub object_kind: String,
189 pub serialize: fn(AnyOperation) -> Result<Vec<u8>, serde_json::Error>,
179 190 pub deserialize: fn(&[u8]) -> Result<AnyOperation, serde_json::Error>,
180 191 pub any_is_same: fn(&dyn Any) -> bool,
181 192 pub serialize_success: fn(AnySuccess) -> Result<Vec<u8>, serde_json::Error>,
182 193 pub serialize_error: fn(AnyFailure) -> Result<Vec<u8>, serde_json::Error>,
194 pub deserialize_success: fn(Vec<u8>) -> Result<AnySuccess, serde_json::Error>,
195 pub deserialize_failure: fn(Vec<u8>) -> Result<AnyFailure, serde_json::Error>,
183 196 }
184 197
185 198 pub trait IntoOperationMeta<O> {
@@ -187,14 +200,17 @@ pub trait IntoOperationMeta<O> {
187 200 fn deserialize(buffer: &[u8]) -> Result<AnyOperation, serde_json::Error>;
188 201 fn serialize_success(success: AnySuccess) -> Result<Vec<u8>, serde_json::Error>;
189 202 fn serialize_failure(failure: AnyFailure) -> Result<Vec<u8>, serde_json::Error>;
203 fn deserialize_success(success: Vec<u8>) -> Result<AnySuccess, serde_json::Error>;
204 fn deserialize_failure(failure: Vec<u8>) -> Result<AnyFailure, serde_json::Error>;
190 205 fn any_is_same(other: &dyn Any) -> bool;
206 fn serialize(operation: AnyOperation) -> Result<Vec<u8>, serde_json::Error>;
191 207 }
192 208
193 209 impl<O, D> IntoOperationMeta<O> for D
194 210 where
195 211 D::Failure: 'static,
196 212 D::Success: 'static,
197 O: GiteratedObject,
213 O: GiteratedObject + 'static,
198 214 D: GiteratedOperation<O> + 'static,
199 215 {
200 216 fn name() -> String {
@@ -218,6 +234,24 @@ where
218 234 fn any_is_same(other: &dyn Any) -> bool {
219 235 other.is::<D>()
220 236 }
237
238 fn serialize(operation: AnyOperation) -> Result<Vec<u8>, serde_json::Error> {
239 let operation: &D = operation.downcast_ref().unwrap();
240
241 serde_json::to_vec(operation)
242 }
243
244 fn deserialize_success(success: Vec<u8>) -> Result<AnySuccess, serde_json::Error> {
245 let success: D::Success = serde_json::from_slice(&success)?;
246
247 Ok(AnySuccess(Arc::new(success)))
248 }
249
250 fn deserialize_failure(failure: Vec<u8>) -> Result<AnyFailure, serde_json::Error> {
251 let failure: D::Failure = serde_json::from_slice(&failure)?;
252
253 Ok(AnyFailure(Arc::new(failure)))
254 }
221 255 }
222 256
223 257 impl OperationMeta {
@@ -229,6 +263,9 @@ impl OperationMeta {
229 263 serialize_error: I::serialize_failure,
230 264 object_kind: O::object_name().to_string(),
231 265 any_is_same: I::any_is_same,
266 serialize: I::serialize,
267 deserialize_success: I::deserialize_success,
268 deserialize_failure: I::deserialize_failure,
232 269 }
233 270 }
234 271 }
@@ -256,8 +293,10 @@ impl<O: GiteratedObject + 'static> IntoObjectMeta for O {
256 293 other.is::<O>()
257 294 }
258 295
259 fn home_uri(_object: AnyObject) -> String {
260 todo!()
296 fn home_uri(object: AnyObject) -> String {
297 let object: &O = object.downcast_ref().unwrap();
298
299 object.home_uri()
261 300 }
262 301 }
263 302
@@ -276,7 +315,7 @@ impl ObjectMeta {
276 315 object.to_string()
277 316 }),
278 317 any_is_same: I::any_is_same,
279 home_uri: I::home_uri,
318 home_uri: <I as IntoObjectMeta>::home_uri,
280 319 }
281 320 }
282 321 }

giterated-stack/src/stack.rs

View file
@@ -3,10 +3,15 @@ use std::any::Any;
3 3 use std::fmt::Debug;
4 4 use std::marker::PhantomData;
5 5 use std::ops::Deref;
6 use std::str::FromStr;
6 7 use std::{collections::HashMap, sync::Arc};
7 8
9 use anyhow::Error;
8 10 use giterated_models::error::IntoInternalError;
9 use giterated_models::settings::Setting;
11 use giterated_models::instance::Instance;
12 use giterated_models::object::ObjectResponse;
13 use giterated_models::settings::{SetSettingError, Setting};
14
10 15 use giterated_models::value::GetValue;
11 16 use giterated_models::{
12 17 error::OperationError,
@@ -86,11 +91,27 @@ pub struct GiteratedStackBuilder<OS> {
86 91 inner: GiteratedStackInner<OS>,
87 92 }
88 93
89 impl<OS> Default for GiteratedStackBuilder<OS> {
94 impl<OS: Clone + Send + Sync + 'static> Default for GiteratedStackBuilder<OS>
95 where
96 GiteratedStack<OS>: HandlerResolvable<(Instance, ObjectRequest), OS>,
97 <GiteratedStack<OS> as HandlerResolvable<(Instance, ObjectRequest), OS>>::Error: Into<Error>,
98 {
90 99 fn default() -> Self {
91 Self {
100 let mut stack = Self {
92 101 inner: Default::default(),
93 }
102 };
103
104 #[derive(Clone)]
105 struct InstanceResolver;
106
107 let mut builder: SubstackBuilder<InstanceResolver, OS> =
108 SubstackBuilder::new(InstanceResolver);
109
110 builder.object::<Instance>();
111
112 stack.merge_builder(builder);
113
114 stack
94 115 }
95 116 }
96 117
@@ -383,7 +404,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
383 404 async fn object_operation<O, D>(
384 405 &self,
385 406 in_object: O,
386 operation_name: &str,
407 _operation_name: &str,
387 408 payload: D,
388 409 operation_state: &OS,
389 410 ) -> Result<D::Success, OperationError<D::Failure>>
@@ -393,10 +414,130 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
393 414 D::Success: Clone,
394 415 D::Failure: Clone,
395 416 {
417 trace!(
418 "Object operation for {}::{}",
419 O::object_name(),
420 D::operation_name()
421 );
396 422 // Erase object and operation types.
397 423 let object = AnyObject::new(in_object.clone());
398 424 let operation = AnyOperation::new(payload);
399 425
426 let raw_result = self
427 .new_operation_func(object, operation, operation_state.clone())
428 .await;
429
430 // Convert the dynamic result back into its concrete type
431 match raw_result {
432 Ok(result) => Ok(result.0.downcast_ref::<D::Success>().unwrap().clone()),
433 Err(err) => Err(match err {
434 OperationError::Internal(internal) => {
435 OperationError::Internal(internal.context(format!(
436 "operation {}::{} handler outcome",
437 O::object_name(),
438 D::operation_name()
439 )))
440 }
441 OperationError::Operation(boxed_error) => OperationError::Operation(
442 boxed_error.0.downcast_ref::<D::Failure>().unwrap().clone(),
443 ),
444 OperationError::Unhandled => OperationError::Unhandled,
445 }),
446 }
447 }
448
449 async fn get_object<O>(
450 &self,
451 object_str: &str,
452 operation_state: &OS,
453 ) -> Result<Object<OS, O, Self>, OperationError<ObjectRequestError>>
454 where
455 O: GiteratedObject + Debug + 'static,
456 {
457 // TODO: Authorization?
458 for (object_name, object_meta) in self.inner.metadata.objects.iter() {
459 if object_name != O::object_name() {
460 continue;
461 }
462
463 if let Ok(object) = (object_meta.from_str)(object_str) {
464 return Ok(unsafe {
465 Object::new_unchecked(object.downcast_ref::<O>().unwrap().clone(), self.clone())
466 });
467 }
468 }
469
470 if let Some(handler) = self.operation_handlers.get(&ObjectOperationPair {
471 object_name: "any",
472 operation_name: "any",
473 }) {
474 let result = handler
475 .handle(
476 (
477 AnyObject::new(Instance::from_str("giterated.dev").unwrap()),
478 AnyOperation::new(ObjectRequest(object_str.to_string())),
479 ),
480 operation_state.clone(),
481 )
482 .await;
483
484 match result {
485 Ok(success) => {
486 let object: &ObjectResponse = success.0.downcast_ref().unwrap();
487
488 return Ok(unsafe {
489 Object::new_unchecked(O::from_object_str(&object.0).unwrap(), self.clone())
490 });
491 }
492 Err(err) => match err {
493 OperationError::Operation(failure) => {
494 let failure: &ObjectRequestError = failure.0.downcast_ref().unwrap();
495
496 return Err(OperationError::Operation(failure.clone()));
497 }
498 OperationError::Internal(internal) => {
499 return Err(OperationError::Internal(internal))
500 }
501 OperationError::Unhandled => {}
502 },
503 };
504 }
505
506 Err(OperationError::Unhandled)
507 }
508 }
509
510 // Placeholder
511 impl<OS: Clone + 'static> GiteratedStack<OS> {
512 pub async fn new_operation_func(
513 &self,
514 object: AnyObject,
515 operation: AnyOperation,
516 operation_state: OS,
517 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
518 let operation_name = operation.kind().operation_name;
519
520 if let Some(handler_tree) = self.inner.operation_handlers.get(&ObjectOperationPair {
521 object_name: "any",
522 operation_name: "any",
523 }) {
524 match handler_tree
525 .handle((object.clone(), operation.clone()), operation_state.clone())
526 .await
527 {
528 Ok(success) => return Ok(success),
529 Err(err) => match err {
530 OperationError::Operation(operation) => {
531 return Err(OperationError::Operation(operation))
532 }
533 OperationError::Internal(internal) => {
534 return Err(OperationError::Internal(internal))
535 }
536 OperationError::Unhandled => {}
537 },
538 }
539 }
540
400 541 // We need to hijack get_value, set_setting, and get_setting.
401 542 if operation_name == "get_value" {
402 543 let get_value = operation
@@ -408,17 +549,13 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
408 549 .metadata
409 550 .values
410 551 .get(&ObjectValuePair {
411 object_kind: O::object_name(),
552 object_kind: object.kind(),
412 553 value_kind: &get_value.value_name,
413 554 })
414 555 .ok_or_else(|| OperationError::Unhandled)?;
415 556 let value_name = value_meta.name.clone();
416 557
417 trace!(
418 "Handling get_value for {}::{}",
419 O::object_name(),
420 value_name
421 );
558 trace!("Handling get_value for {}::{}", object.kind(), value_name);
422 559
423 560 if let Some(handler) = self.inner.value_getters.get(&ObjectValuePair {
424 561 object_kind: "any",
@@ -432,8 +569,8 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
432 569 .await
433 570 {
434 571 Ok(success) => {
435 self.value_update(in_object, success.clone(), operation_state)
436 .await;
572 // self.value_update(in_object, success.clone(), operation_state)
573 // .await;
437 574
438 575 return Ok(*(Box::new((value_meta.serialize)(success).unwrap())
439 576 as Box<dyn Any>)
@@ -443,13 +580,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
443 580 Err(err) => {
444 581 match err {
445 582 OperationError::Operation(operation_error) => {
446 return Err(OperationError::Operation(
447 operation_error
448 .0
449 .downcast_ref::<D::Failure>()
450 .unwrap()
451 .clone(),
452 ));
583 return Err(OperationError::Operation(operation_error));
453 584 }
454 585 OperationError::Internal(internal) => {
455 586 // This DOES NOT result in an early return
@@ -464,7 +595,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
464 595 }
465 596
466 597 if let Some(handler) = self.inner.value_getters.get(&ObjectValuePair {
467 object_kind: O::object_name(),
598 object_kind: object.kind(),
468 599 value_kind: "any",
469 600 }) {
470 601 match handler
@@ -475,8 +606,8 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
475 606 .await
476 607 {
477 608 Ok(success) => {
478 self.value_update(in_object, success.clone(), operation_state)
479 .await;
609 // self.value_update(in_object, success.clone(), operation_state)
610 // .await;
480 611
481 612 return Ok(*(Box::new((value_meta.serialize)(success).unwrap())
482 613 as Box<dyn Any>)
@@ -486,13 +617,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
486 617 Err(err) => {
487 618 match err {
488 619 OperationError::Operation(operation_error) => {
489 return Err(OperationError::Operation(
490 operation_error
491 .0
492 .downcast_ref::<D::Failure>()
493 .unwrap()
494 .clone(),
495 ));
620 return Err(OperationError::Operation(operation_error));
496 621 }
497 622 OperationError::Internal(internal) => {
498 623 // This DOES NOT result in an early return
@@ -518,8 +643,8 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
518 643 .await
519 644 {
520 645 Ok(success) => {
521 self.value_update(in_object, success.clone(), operation_state)
522 .await;
646 // self.value_update(in_object, success.clone(), operation_state)
647 // .await;
523 648
524 649 return Ok(*(Box::new((value_meta.serialize)(success).unwrap())
525 650 as Box<dyn Any>)
@@ -529,13 +654,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
529 654 Err(err) => {
530 655 match err {
531 656 OperationError::Operation(operation_error) => {
532 return Err(OperationError::Operation(
533 operation_error
534 .0
535 .downcast_ref::<D::Failure>()
536 .unwrap()
537 .clone(),
538 ));
657 return Err(OperationError::Operation(operation_error));
539 658 }
540 659 OperationError::Internal(internal) => {
541 660 // This DOES NOT result in an early return
@@ -550,7 +669,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
550 669 }
551 670
552 671 if let Some(handler) = self.inner.value_getters.get(&ObjectValuePair {
553 object_kind: O::object_name(),
672 object_kind: object.kind(),
554 673 value_kind: &get_value.value_name,
555 674 }) {
556 675 match handler
@@ -561,24 +680,18 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
561 680 .await
562 681 {
563 682 Ok(success) => {
564 self.value_update(in_object, success.clone(), operation_state)
565 .await;
683 // self.value_update(in_object, success.clone(), operation_state)
684 // .await;
566 685
567 return Ok(*(Box::new((value_meta.serialize)(success).unwrap())
568 as Box<dyn Any>)
569 .downcast()
570 .unwrap());
686 return Ok(AnySuccess(Arc::new(
687 (value_meta.serialize)(success)
688 .as_internal_error_with_context("serializing value")?,
689 )));
571 690 }
572 691 Err(err) => {
573 692 match err {
574 693 OperationError::Operation(operation_error) => {
575 return Err(OperationError::Operation(
576 operation_error
577 .0
578 .downcast_ref::<D::Failure>()
579 .unwrap()
580 .clone(),
581 ));
694 return Err(OperationError::Operation(operation_error));
582 695 }
583 696 OperationError::Internal(internal) => {
584 697 // This DOES NOT result in an early return
@@ -597,20 +710,32 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
597 710
598 711 let raw_result = self
599 712 .get_setting(
600 object,
601 O::object_name().to_string(),
713 object.clone(),
714 object.kind().to_string(),
602 715 get_setting.clone(),
603 operation_state,
716 &operation_state,
604 717 )
605 718 .await;
606 719
720 let setting_meta = self
721 .metadata
722 .settings
723 .get(&ObjectSettingPair {
724 object_kind: object.kind(),
725 setting_name: &get_setting.setting_name,
726 })
727 .ok_or_else(|| OperationError::Unhandled)?;
728
607 729 return match raw_result {
608 730 Ok(success) => {
609 731 // Success is the setting type, serialize it
610 732 // let serialized = (setting_meta.serialize)(success).unwrap();
611 733
612 734 // Ok(serde_json::to_vec(&serialized).unwrap())
613 Ok(success.downcast_ref::<D::Success>().unwrap().clone())
735 // Ok(success.downcast_ref::<D::Success>().unwrap().clone())
736 Ok(AnySuccess(Arc::new(
737 (setting_meta.serialize)(success).unwrap(),
738 )))
614 739 }
615 740 Err(err) => Err(match err {
616 741 OperationError::Operation(failure) => {
@@ -620,7 +745,7 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
620 745 OperationError::Internal(internal) => {
621 746 OperationError::Internal(internal.context(format!(
622 747 "{}::get_setting::<{}> handler outcome",
623 O::object_name(),
748 object.kind(),
624 749 setting_name
625 750 )))
626 751 }
@@ -628,94 +753,157 @@ impl<OS: Send + Sync + Clone + 'static> ObjectBackend<OS> for GiteratedStack<OS>
628 753 }),
629 754 };
630 755 } else if operation.is::<SetSetting>() {
631 todo!()
632 } else if operation.is::<ObjectRequest>() {
633 todo!()
634 }
756 let operation: &SetSetting = operation.downcast_ref().unwrap();
757 let object_type = object.kind();
635 758
636 // Resolve the operation from the known operations table.
637 let operation_type = {
638 let mut operation_type = None;
759 trace!(
760 "Handling {}::set_setting for {}",
761 object_type,
762 operation.setting_name
763 );
639 764
640 for (target, operation_meta) in self.inner.metadata.operations.iter() {
641 // Skip elements that we know will not match
642 if target.object_name != O::object_name() {
643 continue;
765 let setting_meta = self
766 .metadata
767 .settings
768 .get(&ObjectSettingPair {
769 object_kind: &object_type,
770 setting_name: &operation.setting_name,
771 })
772 // TODO: Check this
773 .ok_or(OperationError::Operation(AnyFailure(Arc::new(
774 SetSettingError::InvalidSetting(
775 operation.setting_name.clone(),
776 object_type.to_string().clone(),
777 ),
778 ))))?;
779
780 let setting = (setting_meta.deserialize)(operation.value.clone())
781 .as_internal_error_with_context(format!(
782 "deserializing setting {} for object {}",
783 operation.setting_name, object_type
784 ))?;
785
786 trace!(
787 "Deserialized setting {} for object {}",
788 operation.setting_name,
789 object_type,
790 );
791
792 for provider in self.metadata_providers.iter() {
793 if provider.provides_for(object.deref()) {
794 trace!(
795 "Resolved setting provider for setting {} for object {}",
796 operation.setting_name,
797 object_type,
798 );
799
800 let object_meta = self
801 .metadata
802 .objects
803 .get(object_type)
804 .ok_or_else(|| OperationError::Unhandled)?;
805
806 let raw_result = provider
807 .write(object.clone(), object_meta, setting.clone(), setting_meta)
808 .await;
809
810 return match raw_result {
811 Ok(_) => {
812 warn!("Setting updated not implemented");
813 // (setting_meta.setting_updated)(
814 // object,
815 // setting,
816 // self.clone(),
817 // operation_state,
818 // )
819 // .await;
820
821 Ok(AnySuccess(Arc::new(())))
822 }
823 Err(e) => Err(OperationError::Internal(e.context(format!(
824 "writing object {} setting {}",
825 object_type, operation.setting_name
826 )))),
827 };
644 828 }
645 829
646 if target.operation_name != operation_name {
830 trace!(
831 "Failed to resolve setting provider for setting {} for object {}",
832 operation.setting_name,
833 object_type,
834 );
835 }
836 } else if operation.is::<ObjectRequest>() {
837 let object_request: &ObjectRequest = operation.downcast_ref().unwrap();
838 trace!("handling object request for {}", object_request.0);
839
840 // TODO: Authorization?
841 for (_object_name, object_meta) in self.inner.metadata.objects.iter() {
842 if object_meta.name == "networked_object" {
843 // TODO: HACK
647 844 continue;
648 845 }
649 846
650 if (operation_meta.any_is_same)(&operation) {
651 operation_type = Some(target.clone());
652 break;
847 if let Ok(object) = (object_meta.from_str)(&object_request.0) {
848 trace!("object request resolved as type {}", object_meta.name);
849
850 return Ok(AnySuccess(Arc::new(ObjectResponse((object_meta.to_str)(
851 object,
852 )))));
853 }
854 }
855
856 if let Some(handler) = self.operation_handlers.get(&ObjectOperationPair {
857 object_name: "any",
858 operation_name: "any",
859 }) {
860 let result = handler
861 .handle(
862 (
863 AnyObject::new(Instance::from_str("giterated.dev").unwrap()),
864 AnyOperation::new(ObjectRequest(object_request.0.to_string())),
865 ),
866 operation_state.clone(),
867 )
868 .await;
869
870 match result {
871 Ok(success) => {
872 let object: &ObjectResponse = success.0.downcast_ref().unwrap();
873
874 return Ok(AnySuccess(Arc::new(object.clone())));
875 }
876 Err(_err) => {
877 todo!()
878 }
653 879 }
654 880 }
655 881
656 operation_type
882 return Err(OperationError::Unhandled);
657 883 }
658 .ok_or_else(|| OperationError::Unhandled)?;
884
885 trace!(
886 "Object operation for {}::{} is not special case",
887 object.kind(),
888 operation_name
889 );
659 890
660 891 // Resolve the handler from our handler tree
661 892 let handler_tree = self
662 893 .inner
663 894 .operation_handlers
664 .get(&operation_type)
895 .get(&operation.kind())
665 896 .ok_or_else(|| OperationError::Unhandled)?;
666 897
667 let raw_result = handler_tree
668 .handle((object, operation), operation_state.clone())
669 .await;
670
671 // Convert the dynamic result back into its concrete type
672 match raw_result {
673 Ok(result) => Ok(result.0.downcast_ref::<D::Success>().unwrap().clone()),
674 Err(err) => Err(match err {
675 OperationError::Internal(internal) => {
676 OperationError::Internal(internal.context(format!(
677 "operation {}::{} handler outcome",
678 operation_type.object_name, operation_type.operation_name
679 )))
680 }
681 OperationError::Operation(boxed_error) => OperationError::Operation(
682 boxed_error.0.downcast_ref::<D::Failure>().unwrap().clone(),
683 ),
684 OperationError::Unhandled => OperationError::Unhandled,
685 }),
686 }
687 }
688
689 async fn get_object<O>(
690 &self,
691 object_str: &str,
692 _operation_state: &OS,
693 ) -> Result<Object<OS, O, Self>, OperationError<ObjectRequestError>>
694 where
695 O: GiteratedObject + Debug + 'static,
696 {
697 // TODO: Authorization?
698 for (_object_name, object_meta) in self.inner.metadata.objects.iter() {
699 if let Ok(object) = (object_meta.from_str)(object_str) {
700 return Ok(unsafe {
701 Object::new_unchecked(object.downcast_ref::<O>().unwrap().clone(), self.clone())
702 });
703 }
704 }
705
706 Err(OperationError::Unhandled)
707 }
708 }
898 trace!(
899 "Object operation for {}::{} handler tree resolved",
900 object.kind(),
901 operation_name
902 );
709 903
710 // Placeholder
711 impl<OS> GiteratedStack<OS> {
712 pub async fn new_operation_func(
713 &self,
714 _object: AnyObject,
715 _operation: AnyOperation,
716 _operation_state: OS,
717 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
718 todo!()
904 handler_tree
905 .handle((object, operation), operation_state.clone())
906 .await
719 907 }
720 908 }
721 909

giterated-stack/src/substack.rs

View file
@@ -1,5 +1,6 @@
1 1 use std::{collections::HashMap, marker::PhantomData, sync::Arc};
2 2
3 use anyhow::Error;
3 4 use futures_util::FutureExt;
4 5 use giterated_models::{
5 6 error::OperationError,
@@ -13,10 +14,10 @@ use tracing::{info, trace};
13 14
14 15 use crate::{
15 16 handler::HandlerWrapper, provider::MetadataProvider, AnyFailure, AnyObject, AnyOperation,
16 AnySetting, AnySuccess, AnyValue, GiteratedStack, GiteratedStackState, IntoGiteratedHandler,
17 MaybeDynamicObject, MaybeDynamicValue, ObjectOperationPair, ObjectSettingPair, ObjectValuePair,
18 OperationHandler, OperationState, RuntimeMetadata, SettingChange, SettingGetter,
19 StackOperationState, ValueChange, ValueGetter,
17 AnySetting, AnySuccess, AnyValue, GiteratedStack, GiteratedStackState, HandlerResolvable,
18 IntoGiteratedHandler, MaybeDynamicObject, MaybeDynamicValue, ObjectOperationPair,
19 ObjectSettingPair, ObjectValuePair, OperationHandler, OperationState, RuntimeMetadata,
20 SettingChange, SettingGetter, StackOperationState, ValueChange, ValueGetter,
20 21 };
21 22
22 23 pub struct SubstackBuilder<S: GiteratedStackState, OS> {
@@ -184,14 +185,17 @@ impl<S: Send + Sync + Clone + 'static, OS: Clone + 'static> SubstackBuilder<S, O
184 185
185 186 self
186 187 }
187 }
188 188
189 impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S, StackOperationState> {
190 189 /// Register a [`GiteratedObject`] type with the runtime.
191 190 ///
192 191 /// # Type Registration
193 192 /// This will register the provided object type.
194 pub fn object<O: GiteratedObject + 'static>(&mut self) -> &mut Self {
193 pub fn object<O: GiteratedObject + 'static>(&mut self) -> &mut Self
194 where
195 GiteratedStack<OS>: HandlerResolvable<(Instance, ObjectRequest), OS>,
196 <GiteratedStack<OS> as HandlerResolvable<(Instance, ObjectRequest), OS>>::Error:
197 Into<Error>,
198 {
195 199 self.metadata.register_object::<O>();
196 200
197 201 // Insert handler so ObjectRequest is handled properly
@@ -200,7 +204,7 @@ impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S, StackOperationState> {
200 204 move |_object: Instance,
201 205 operation: ObjectRequest,
202 206 _state: S,
203 stack: GiteratedStack<StackOperationState>| {
207 stack: GiteratedStack<OS>| {
204 208 let operation = operation.clone();
205 209 async move {
206 210 for (_object_name, object_meta) in stack.inner.metadata.objects.iter() {
@@ -217,7 +221,9 @@ impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S, StackOperationState> {
217 221
218 222 self
219 223 }
224 }
220 225
226 impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S, StackOperationState> {
221 227 /// Register a [`Setting`] type with the runtime.
222 228 ///
223 229 /// # Type Registration
@@ -305,8 +311,28 @@ impl<S: Send + Sync + Clone + 'static> SubstackBuilder<S, StackOperationState> {
305 311
306 312 // Placeholder
307 313 impl<S: Send + Sync + Clone + 'static, OS> SubstackBuilder<S, OS> {
308 pub fn dynamic_operation<H>(&mut self, _handler: H) -> &mut Self {
309 tracing::error!("Dynamic unimplemented");
314 pub fn dynamic_operation<A, H>(&mut self, handler: H) -> &mut Self
315 where
316 H: IntoGiteratedHandler<
317 (AnyObject, AnyOperation),
318 A,
319 S,
320 OS,
321 Result<AnySuccess, OperationError<AnyFailure>>,
322 > + Send
323 + Sync
324 + 'static,
325 OS: Clone + Send + Sync + 'static,
326 {
327 let wrapped = HandlerWrapper::new(self.state.clone(), handler);
328
329 self.operation_handlers.insert(
330 ObjectOperationPair {
331 object_name: "any",
332 operation_name: "any",
333 },
334 wrapped,
335 );
310 336
311 337 self
312 338 }