JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Beginning of `stack-next` refactor

-Refactoring the protocol stack into something similar to a runtime. -Handles merging handler builders which is placing the ground work for plugins in. - Increased metadata generation during compilation enables less ser/de during execution. - Goal is to have an O(1) time from incoming operation to calling handlers. - Decreased penalty for using the statically typed API from within your code, now avoids some allocation. # Changes - Added `GiteratedRuntime` which is to replace the current unified stack - Added `RuntimeBuilder` which does what the current `OperationHandlers` struct does, but much better. - Added `RuntimeMetadata` to store type metadata for new `Any` based internals - Refactored serde_json out of the internal operation handling

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨708dea4

⁨giterated-stack/src/lib.rs⁩ - ⁨33808⁩ bytes
Raw
1 pub mod handler;
2 pub mod runtime;
3 pub mod state;
4 pub mod update;
5
6 use std::{
7 any::Any, collections::HashMap, future::Future, ops::Deref, pin::Pin, str::FromStr, sync::Arc,
8 };
9
10 use futures_util::FutureExt;
11 use giterated_models::{
12 error::OperationError,
13 instance::{
14 AuthenticationTokenRequest, Instance, RegisterAccountRequest, RepositoryCreateRequest,
15 },
16 object::{
17 AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse,
18 },
19 object_backend::ObjectBackend,
20 operation::{AnyOperation, GiteratedOperation},
21 repository::{AccessList, Repository},
22 settings::{AnySetting, GetSetting, SetSetting, Setting},
23 user::User,
24 value::{AnyValue, GetValue, GiteratedObjectValue},
25 };
26 use handler::GiteratedBackend;
27 use serde::{de::DeserializeOwned, Serialize};
28 use serde_json::Value;
29 use state::HandlerState;
30 use tokio::{
31 sync::{mpsc::channel, Mutex},
32 task::JoinHandle,
33 };
34 use tracing::{error, warn};
35 use update::{
36 HandleSettingUpdate, HandleSettingUpdatedFunction, HandleValueUpdate,
37 HandleValueUpdatedFunction, SettingUpdateKind, ValueUpdateKind,
38 };
39
40 #[derive(Clone, Debug, Hash, Eq, PartialEq)]
41 struct ObjectOperationPair {
42 object_name: String,
43 operation_name: String,
44 }
45
46 pub struct SettingMeta {
47 name: String,
48 deserialize: Box<dyn Fn(&[u8]) -> Result<Box<dyn Any>, serde_json::Error> + Send + Sync>,
49 }
50
51 pub struct ValueMeta {
52 name: String,
53 deserialize: Box<dyn Fn(&[u8]) -> Result<Box<dyn Any>, serde_json::Error> + Send + Sync>,
54 }
55
56 pub struct ObjectMeta {
57 name: String,
58 from_str: Box<dyn Fn(&str) -> Result<Box<dyn Any + Send + Sync>, ()> + Send + Sync>,
59 any_is_same: Box<dyn Fn(&dyn Any) -> bool + Send + Sync>,
60 }
61
62 pub struct OperationMeta {
63 name: String,
64 object_kind: String,
65 deserialize: Box<dyn Fn(&[u8]) -> Result<Box<dyn Any + Send + Sync>, ()> + Send + Sync>,
66 any_is_same: Box<dyn Fn(&dyn Any) -> bool + Send + Sync>,
67 serialize_success:
68 Box<dyn Fn(Box<dyn Any>) -> Result<Vec<u8>, serde_json::Error> + Send + Sync>,
69 serialize_error: Box<dyn Fn(Box<dyn Any>) -> Result<Vec<u8>, serde_json::Error> + Send + Sync>,
70 }
71
72 pub struct OperationHandlers<S: Send + Sync + Clone> {
73 operations: HashMap<ObjectOperationPair, OperationWrapper<S>>,
74 get_object: Vec<OperationWrapper<S>>,
75 value_getters: HashMap<ValueGetter, OperationWrapper<S>>,
76 settings_getter: HashMap<String, OperationWrapper<S>>,
77 settings: HashMap<String, SettingMeta>,
78 objects: HashMap<String, ObjectMeta>,
79 operations_meta: HashMap<ObjectOperationPair, OperationMeta>,
80 value_updated: HashMap<ValueUpdateKind, HandleValueUpdatedFunction>,
81 setting_updated: HashMap<SettingUpdateKind, HandleSettingUpdatedFunction>,
82 }
83
84 #[derive(Clone, Debug, Hash, Eq, PartialEq)]
85 pub struct ObjectValuePair {
86 pub object_kind: String,
87 pub value_kind: String,
88 }
89
90 impl<S: Send + Sync + Clone> Default for OperationHandlers<S> {
91 fn default() -> Self {
92 Self {
93 operations: HashMap::new(),
94 get_object: Vec::new(),
95 value_updated: HashMap::default(),
96 setting_updated: HashMap::default(),
97 value_getters: HashMap::default(),
98 settings_getter: HashMap::default(),
99 settings: HashMap::default(),
100 objects: HashMap::default(),
101 operations_meta: HashMap::default(),
102 }
103 }
104 }
105
106 impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> {
107 fn insert_operation<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync + 'static>(
108 &mut self,
109 ) {
110 // let object_name = O::object_name().to_string();
111 // let operation_name = D::operation_name().to_string();
112
113 // self.operations_meta.insert(
114 // ObjectOperationPair {
115 // object_name: object_name.clone(),
116 // operation_name: operation_name.clone(),
117 // },
118 // OperationMeta {
119 // name: operation_name,
120 // object_kind: object_name,
121 // deserialize: Box::new(|bytes| {
122 // Ok(Box::new(serde_json::from_slice::<D>(bytes).unwrap())
123 // as Box<dyn Any + Send + Sync>)
124 // }),
125 // any_is_same: Box::new(|any_box| any_box.is::<D>()),
126 // },
127 // );
128 }
129
130 pub fn setting<T: Setting>(&mut self) -> &mut Self {
131 let setting_meta = SettingMeta {
132 name: T::name().to_string(),
133 deserialize: Box::new(|slice| {
134 Ok(Box::new(serde_json::from_slice(slice)?) as Box<dyn Any>)
135 }),
136 };
137
138 self.settings.insert(T::name().to_string(), setting_meta);
139
140 self
141 }
142
143 pub fn insert<
144 A,
145 O: GiteratedObject + Send + Sync + 'static,
146 D: GiteratedOperation<O> + Send + Sync + 'static,
147 H: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
148 >(
149 &mut self,
150 handler: H,
151 ) -> &mut Self {
152 let object_name = handler.object_name().to_string();
153 let operation_name = handler.operation_name().to_string();
154
155 let wrapped = OperationWrapper::new(handler);
156
157 let pair = ObjectOperationPair {
158 object_name,
159 operation_name,
160 };
161
162 assert!(self.operations.insert(pair, wrapped).is_none());
163
164 self.insert_operation::<O, D>();
165
166 self
167 }
168
169 pub fn object<O: GiteratedObject + Send + Sync + 'static>(&mut self) -> &mut Self {
170 // let object_meta = ObjectMeta {
171 // name: O::object_name().to_string(),
172 // from_str: Box::new(|str| Ok(Box::new(O::from_str(&str).map_err(|_| ())?))),
173
174 // };
175
176 // self.objects
177 // .insert(O::object_name().to_string(), object_meta);
178
179 let closure = |_: &Instance, operation: ObjectRequest, _state| {
180 async move {
181 if O::from_str(&operation.0).is_ok() {
182 Ok(ObjectResponse(operation.0))
183 } else {
184 Err(OperationError::Unhandled)
185 }
186 }
187 .boxed()
188 };
189
190 let wrapped = OperationWrapper::new(closure);
191
192 self.get_object.push(wrapped);
193
194 self
195 }
196
197 pub async fn handle<O: GiteratedObject>(
198 &self,
199 object: &O,
200 operation_name: &str,
201 operation: AnyOperation,
202 state: S,
203 operation_state: &StackOperationState,
204 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
205 // TODO
206 let object = object.to_string();
207 let object_name = O::object_name().to_string();
208
209 let object_meta = self
210 .objects
211 .get(&object_name)
212 .ok_or_else(|| OperationError::Unhandled)?;
213
214 let object_box = (object_meta.from_str)(&object).map_err(|_| OperationError::Unhandled)?;
215
216 let target_handler = ObjectOperationPair {
217 object_name,
218 operation_name: operation_name.to_string(),
219 };
220
221 let operation_meta = self
222 .operations_meta
223 .get(&target_handler)
224 .ok_or_else(|| OperationError::Unhandled)?;
225
226 let operation_box =
227 (operation_meta.deserialize)(&serde_json::to_vec(&operation.0).unwrap())
228 .map_err(|_| OperationError::Unhandled)?;
229
230 if let Some(handler) = self.operations.get(&target_handler) {
231 handler
232 .handle(object_box, operation_box, state.clone(), operation_state)
233 .await
234 } else {
235 Err(OperationError::Unhandled)
236 }
237 }
238
239 pub async fn resolve_object(
240 &self,
241 instance: Instance,
242 request: ObjectRequest,
243 state: S,
244 operation_state: &StackOperationState,
245 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
246 for handler in self.get_object.iter() {
247 if let Ok(response) = handler
248 .handle(
249 Box::new(instance.clone()) as _,
250 Box::new(request.clone()) as _,
251 state.clone(),
252 operation_state,
253 )
254 .await
255 {
256 return Ok(response);
257 }
258 }
259
260 Err(OperationError::Unhandled)
261 }
262
263 pub fn insert_value_update_handler<
264 H: HandleValueUpdate<O, V> + Send + Sync + Clone + 'static,
265 O: GiteratedObject + Send + Sync,
266 V: GiteratedObjectValue<Object = O> + Send + Sync,
267 >(
268 &mut self,
269 handler: H,
270 ) -> &mut Self {
271 let wrapper = HandleValueUpdatedFunction::new(handler, V::value_name());
272
273 assert!(self
274 .value_updated
275 .insert(wrapper.target.clone(), wrapper)
276 .is_none());
277
278 self
279 }
280
281 pub fn insert_setting_update_handler<
282 H: HandleSettingUpdate<O, T> + Send + Sync + Clone + 'static,
283 O: GiteratedObject + Send + Sync,
284 T: Setting + Send + Sync,
285 >(
286 &mut self,
287 handler: H,
288 ) -> &mut Self {
289 let wrapper = HandleSettingUpdatedFunction::new(handler, T::name());
290
291 assert!(self
292 .setting_updated
293 .insert(wrapper.target.clone(), wrapper)
294 .is_none());
295
296 self
297 }
298
299 pub fn value_getter<A, O, V, F>(&mut self, handler: F) -> &mut Self
300 where
301 O: GiteratedObject + Send + Sync + 'static,
302 V: GiteratedObjectValue<Object = O> + Send + Sync + 'static,
303 F: GiteratedOperationHandler<A, O, GetValue<V>, S> + Send + Sync + Clone + 'static,
304 {
305 let object_name = handler.object_name().to_string();
306 let value_name = V::value_name().to_string();
307
308 let wrapped = OperationWrapper::new(handler);
309
310 assert!(self
311 .value_getters
312 .insert(
313 ValueGetter {
314 object_type: object_name,
315 value_type: value_name
316 },
317 wrapped
318 )
319 .is_none());
320
321 self
322 }
323
324 pub fn setting_getter<A, O, F>(&mut self, handler: F) -> &mut Self
325 where
326 O: GiteratedObject + Send + Sync + 'static,
327 F: GiteratedOperationHandler<A, O, GetSetting, S> + Send + Sync + Clone + 'static,
328 {
329 let object_name = handler.object_name().to_string();
330
331 let wrapped = OperationWrapper::new(handler);
332
333 assert!(self.settings_getter.insert(object_name, wrapped).is_none());
334
335 self
336 }
337 }
338
339 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
340 pub struct ValueGetter {
341 pub object_type: String,
342 pub value_type: String,
343 }
344
345 impl Default for ValueGetter {
346 fn default() -> Self {
347 Self {
348 object_type: AnyObject::object_name().to_string(),
349 value_type: "any".to_string(),
350 }
351 }
352 }
353
354 #[async_trait::async_trait]
355 pub trait GiteratedOperationHandler<
356 L,
357 O: GiteratedObject,
358 D: GiteratedOperation<O>,
359 S: Send + Sync + Clone,
360 >: Send + Sync
361 {
362 fn operation_name(&self) -> &str;
363 fn object_name(&self) -> &str;
364
365 async fn handle(
366 &self,
367 object: &O,
368 operation: D,
369 state: S,
370 operation_state: &StackOperationState,
371 ) -> Result<D::Success, OperationError<D::Failure>>;
372 }
373
374 #[async_trait::async_trait]
375 impl<O, D, F, S> GiteratedOperationHandler<(), O, D, S> for F
376 where
377 F: FnMut(
378 &O,
379 D,
380 S,
381 ) -> Pin<
382 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
383 > + Send
384 + Sync
385 + Clone,
386 O: GiteratedObject + Send + Sync,
387 D: GiteratedOperation<O> + 'static,
388 <D as GiteratedOperation<O>>::Failure: Send,
389 S: Send + Sync + Clone + 'static,
390 {
391 fn operation_name(&self) -> &str {
392 D::operation_name()
393 }
394
395 fn object_name(&self) -> &str {
396 O::object_name()
397 }
398
399 async fn handle(
400 &self,
401 object: &O,
402 operation: D,
403 state: S,
404 _operation_state: &StackOperationState,
405 ) -> Result<D::Success, OperationError<D::Failure>> {
406 self.clone()(object, operation, state).await
407 }
408 }
409
410 #[async_trait::async_trait]
411 impl<O, O1, D, F, S> GiteratedOperationHandler<(O1,), O, D, S> for F
412 where
413 F: FnMut(
414 &O,
415 D,
416 S,
417 O1,
418 ) -> Pin<
419 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
420 > + Send
421 + Sync
422 + Clone,
423 O: GiteratedObject + Send + Sync,
424 D: GiteratedOperation<O> + 'static + Send + Sync,
425 <D as GiteratedOperation<O>>::Failure: Send,
426 S: Send + Sync + Clone + 'static,
427 O1: FromOperationState<O, D>,
428 {
429 fn operation_name(&self) -> &str {
430 D::operation_name()
431 }
432
433 fn object_name(&self) -> &str {
434 O::object_name()
435 }
436
437 async fn handle(
438 &self,
439 object: &O,
440 operation: D,
441 state: S,
442 operation_state: &StackOperationState,
443 ) -> Result<D::Success, OperationError<D::Failure>> {
444 let o1 = O1::from_state(object, &operation, operation_state)
445 .await
446 .map_err(|e| OperationError::Internal(e.to_string()))?;
447 self.clone()(object, operation, state, o1).await
448 }
449 }
450
451 #[async_trait::async_trait]
452 impl<O, O1, O2, D, F, S> GiteratedOperationHandler<(O1, O2), O, D, S> for F
453 where
454 F: FnMut(
455 &O,
456 D,
457 S,
458 O1,
459 O2,
460 ) -> Pin<
461 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
462 > + Send
463 + Sync
464 + Clone,
465 O: GiteratedObject + Send + Sync,
466 D: GiteratedOperation<O> + 'static + Send + Sync,
467 <D as GiteratedOperation<O>>::Failure: Send,
468 S: Send + Sync + Clone + 'static,
469 O1: FromOperationState<O, D>,
470 O2: FromOperationState<O, D>,
471 {
472 fn operation_name(&self) -> &str {
473 D::operation_name()
474 }
475
476 fn object_name(&self) -> &str {
477 O::object_name()
478 }
479
480 async fn handle(
481 &self,
482 object: &O,
483 operation: D,
484 state: S,
485 operation_state: &StackOperationState,
486 ) -> Result<D::Success, OperationError<D::Failure>> {
487 let o1 = O1::from_state(object, &operation, operation_state)
488 .await
489 .map_err(|e| OperationError::Internal(e.to_string()))?;
490 let o2 = O2::from_state(object, &operation, operation_state)
491 .await
492 .map_err(|e| OperationError::Internal(e.to_string()))?;
493 self.clone()(object, operation, state, o1, o2).await
494 }
495 }
496
497 #[async_trait::async_trait]
498 impl<O, O1, O2, O3, D, F, S> GiteratedOperationHandler<(O1, O2, O3), O, D, S> for F
499 where
500 F: FnMut(
501 &O,
502 D,
503 S,
504 O1,
505 O2,
506 O3,
507 ) -> Pin<
508 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
509 > + Send
510 + Sync
511 + Clone,
512 O: GiteratedObject + Send + Sync,
513 D: GiteratedOperation<O> + 'static + Send + Sync,
514 <D as GiteratedOperation<O>>::Failure: Send,
515 S: Send + Sync + Clone + 'static,
516 O1: FromOperationState<O, D>,
517 O2: FromOperationState<O, D>,
518 O3: FromOperationState<O, D>,
519 {
520 fn operation_name(&self) -> &str {
521 D::operation_name()
522 }
523
524 fn object_name(&self) -> &str {
525 O::object_name()
526 }
527
528 async fn handle(
529 &self,
530 object: &O,
531 operation: D,
532 state: S,
533 operation_state: &StackOperationState,
534 ) -> Result<D::Success, OperationError<D::Failure>> {
535 let o1 = O1::from_state(object, &operation, operation_state)
536 .await
537 .map_err(|e| OperationError::Internal(e.to_string()))?;
538 let o2 = O2::from_state(object, &operation, operation_state)
539 .await
540 .map_err(|e| OperationError::Internal(e.to_string()))?;
541 let o3 = O3::from_state(object, &operation, operation_state)
542 .await
543 .map_err(|e| OperationError::Internal(e.to_string()))?;
544 self.clone()(object, operation, state, o1, o2, o3).await
545 }
546 }
547
548 pub struct OperationWrapper<S: Send + Sync + Clone> {
549 func: Box<
550 dyn Fn(
551 Box<dyn Any + Send + Sync>,
552 Box<dyn Any + Send + Sync>,
553 S,
554 StackOperationState,
555 )
556 -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>>
557 + Send
558 + Sync,
559 >,
560 object_name: String,
561 }
562
563 impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> {
564 pub fn new<
565 A,
566 O: GiteratedObject + Send + Sync + 'static,
567 D: GiteratedOperation<O> + 'static,
568 F: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
569 >(
570 handler: F,
571 ) -> Self {
572 let handler = Arc::new(Box::pin(handler));
573 Self {
574 func: Box::new(move |object, operation, state, operation_state| {
575 let handler = handler.clone();
576 async move {
577 let handler = handler.clone();
578 let object: Box<O> = object.downcast().unwrap();
579 let operation: Box<D> = operation.downcast().unwrap();
580
581 let result = handler
582 .handle(&object, *operation, state, &operation_state)
583 .await;
584 result
585 .map(|success| serde_json::to_vec(&success).unwrap())
586 .map_err(|err| match err {
587 OperationError::Operation(err) => {
588 OperationError::Operation(serde_json::to_vec(&err).unwrap())
589 }
590 OperationError::Internal(internal) => {
591 OperationError::Internal(internal)
592 }
593 OperationError::Unhandled => OperationError::Unhandled,
594 })
595 }
596 .boxed()
597 }),
598 object_name: O::object_name().to_string(),
599 }
600 }
601
602 async fn handle(
603 &self,
604 object: Box<dyn Any + Send + Sync>,
605 operation: Box<dyn Any + Send + Sync>,
606 state: S,
607 operation_state: &StackOperationState,
608 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
609 (self.func)(object, operation, state, operation_state.clone()).await
610 }
611 }
612
613 #[async_trait::async_trait]
614 pub trait FromOperationState<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync>:
615 Sized + Clone + Send
616 {
617 type Error: Serialize + DeserializeOwned;
618
619 async fn from_state(
620 object: &O,
621 operation: &D,
622 state: &StackOperationState,
623 ) -> Result<Self, OperationError<Self::Error>>;
624 }
625
626 #[async_trait::async_trait]
627 impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationState<O, D>
628 for BackendWrapper
629 {
630 type Error = ();
631
632 async fn from_state(
633 _object: &O,
634 _operation: &D,
635 state: &StackOperationState,
636 ) -> Result<Self, OperationError<()>> {
637 Ok(state.giterated_backend.clone())
638 }
639 }
640
641 #[async_trait::async_trait]
642 impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationState<O, D>
643 for StackOperationState
644 {
645 type Error = ();
646
647 async fn from_state(
648 _object: &O,
649 _operation: &D,
650 state: &StackOperationState,
651 ) -> Result<StackOperationState, OperationError<()>> {
652 Ok(state.clone())
653 }
654 }
655
656 #[async_trait::async_trait]
657 impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationState<O, D>
658 for AuthenticatedUser
659 {
660 type Error = ();
661
662 async fn from_state(
663 _object: &O,
664 _operation: &D,
665 state: &StackOperationState,
666 ) -> Result<AuthenticatedUser, OperationError<()>> {
667 state
668 .user
669 .clone()
670 .ok_or_else(|| OperationError::Operation(()))
671 }
672 }
673
674 #[async_trait::async_trait]
675 impl<O: GiteratedObject, D: GiteratedOperation<O> + Send + Sync> FromOperationState<O, D>
676 for AuthenticatedInstance
677 {
678 type Error = ();
679
680 async fn from_state(
681 _object: &O,
682 _operation: &D,
683 state: &StackOperationState,
684 ) -> Result<AuthenticatedInstance, OperationError<()>> {
685 state
686 .instance
687 .clone()
688 .ok_or_else(|| OperationError::Operation(()))
689 }
690 }
691
692 #[async_trait::async_trait]
693 impl<
694 T: FromOperationState<O, D> + Send + Sync,
695 O: GiteratedObject + Sync,
696 D: GiteratedOperation<O> + Send + Sync,
697 > FromOperationState<O, D> for Option<T>
698 {
699 type Error = ();
700
701 async fn from_state(
702 object: &O,
703 operation: &D,
704 state: &StackOperationState,
705 ) -> Result<Option<T>, OperationError<()>> {
706 Ok(T::from_state(object, operation, state).await.ok())
707 }
708 }
709
710 #[derive(Clone)]
711 pub struct AuthorizedUser(AuthenticatedUser);
712
713 #[derive(Clone)]
714 pub struct AuthorizedInstance(AuthenticatedInstance);
715
716 #[async_trait::async_trait]
717 pub trait AuthorizedOperation<O: GiteratedObject>: GiteratedOperation<O> {
718 async fn authorize(
719 &self,
720 authorize_for: &O,
721 state: &StackOperationState,
722 ) -> Result<bool, OperationError<()>>;
723 }
724
725 #[async_trait::async_trait]
726 impl<
727 O: GiteratedObject + Send + Sync + Debug,
728 V: GiteratedObjectValue<Object = O> + Send + Sync,
729 > AuthorizedOperation<O> for GetValue<V>
730 {
731 async fn authorize(
732 &self,
733 authorize_for: &O,
734 operation_state: &StackOperationState,
735 ) -> Result<bool, OperationError<()>> {
736 Ok(operation_state
737 .giterated_backend
738 .get_object::<O>(&authorize_for.to_string(), operation_state)
739 .await
740 .is_ok())
741 }
742 }
743
744 #[async_trait::async_trait]
745 impl AuthorizedOperation<User> for SetSetting {
746 async fn authorize(
747 &self,
748 authorize_for: &User,
749 operation_state: &StackOperationState,
750 ) -> Result<bool, OperationError<()>> {
751 let authenticated_user = operation_state
752 .user
753 .as_ref()
754 .ok_or_else(|| OperationError::Operation(()))?;
755
756 Ok(authorize_for == authenticated_user.deref())
757 }
758 }
759
760 #[async_trait::async_trait]
761 impl AuthorizedOperation<User> for GetSetting {
762 async fn authorize(
763 &self,
764 authorize_for: &User,
765 operation_state: &StackOperationState,
766 ) -> Result<bool, OperationError<()>> {
767 let authenticated_user = operation_state
768 .user
769 .as_ref()
770 .ok_or_else(|| OperationError::Operation(()))?;
771
772 Ok(authorize_for == authenticated_user.deref())
773 }
774 }
775
776 #[async_trait::async_trait]
777 impl AuthorizedOperation<Repository> for SetSetting {
778 async fn authorize(
779 &self,
780 authorize_for: &Repository,
781 operation_state: &StackOperationState,
782 ) -> Result<bool, OperationError<()>> {
783 let authenticated_user = operation_state
784 .user
785 .as_ref()
786 .ok_or_else(|| OperationError::Operation(()))?;
787
788 let mut object = operation_state
789 .giterated_backend
790 .get_object::<Repository>(&authorize_for.to_string(), operation_state)
791 .await
792 .map_err(|e| OperationError::Internal(e.to_string()))?;
793
794 let access_list = object
795 .get_setting::<AccessList>(operation_state)
796 .await
797 .map_err(|e| OperationError::Internal(e.to_string()))?;
798
799 if access_list
800 .0
801 .iter()
802 .find(|user| *user == authenticated_user.deref())
803 .is_some()
804 {
805 Ok(true)
806 } else {
807 Ok(false)
808 }
809 }
810 }
811
812 #[async_trait::async_trait]
813 impl AuthorizedOperation<Repository> for GetSetting {
814 async fn authorize(
815 &self,
816 authorize_for: &Repository,
817 operation_state: &StackOperationState,
818 ) -> Result<bool, OperationError<()>> {
819 let authenticated_user = operation_state
820 .user
821 .as_ref()
822 .ok_or_else(|| OperationError::Operation(()))?;
823
824 let mut object = operation_state
825 .giterated_backend
826 .get_object::<Repository>(&authorize_for.to_string(), operation_state)
827 .await
828 .map_err(|e| OperationError::Internal(e.to_string()))?;
829
830 let access_list = object
831 .get_setting::<AccessList>(operation_state)
832 .await
833 .map_err(|e| OperationError::Internal(e.to_string()))?;
834
835 if access_list
836 .0
837 .iter()
838 .find(|user| *user == authenticated_user.deref())
839 .is_some()
840 {
841 Ok(true)
842 } else {
843 Ok(false)
844 }
845 }
846 }
847
848 #[async_trait::async_trait]
849 impl AuthorizedOperation<Instance> for RegisterAccountRequest {
850 async fn authorize(
851 &self,
852 authorize_for: &Instance,
853 state: &StackOperationState,
854 ) -> Result<bool, OperationError<()>> {
855 if state.our_instance == *authorize_for {
856 Ok(true)
857 } else {
858 Ok(false)
859 }
860 }
861 }
862
863 #[async_trait::async_trait]
864 impl AuthorizedOperation<Instance> for AuthenticationTokenRequest {
865 async fn authorize(
866 &self,
867 authorize_for: &Instance,
868 state: &StackOperationState,
869 ) -> Result<bool, OperationError<()>> {
870 if state.our_instance == *authorize_for {
871 Ok(true)
872 } else {
873 Ok(false)
874 }
875 }
876 }
877
878 #[async_trait::async_trait]
879 impl AuthorizedOperation<Instance> for RepositoryCreateRequest {
880 async fn authorize(
881 &self,
882 authorize_for: &Instance,
883 state: &StackOperationState,
884 ) -> Result<bool, OperationError<()>> {
885 if state.our_instance == *authorize_for {
886 Ok(true)
887 } else {
888 Ok(false)
889 }
890 }
891 }
892
893 #[async_trait::async_trait]
894 impl<A: AuthorizedOperation<User> + Send + Sync> FromOperationState<User, A> for AuthorizedUser {
895 type Error = ();
896
897 async fn from_state(
898 object: &User,
899 operation: &A,
900 state: &StackOperationState,
901 ) -> Result<AuthorizedUser, OperationError<()>> {
902 let authenticated = AuthenticatedUser::from_state(object, operation, state).await?;
903
904 match operation.authorize(object, state).await {
905 Ok(authorized) => {
906 assert!(authorized);
907 }
908 Err(err) => return Err(OperationError::Internal(err.to_string())),
909 };
910
911 Ok(AuthorizedUser(authenticated))
912 }
913 }
914
915 #[async_trait::async_trait]
916 impl<A: AuthorizedOperation<Instance> + Send + Sync> FromOperationState<Instance, A>
917 for AuthorizedInstance
918 {
919 type Error = ();
920
921 async fn from_state(
922 object: &Instance,
923 operation: &A,
924 state: &StackOperationState,
925 ) -> Result<AuthorizedInstance, OperationError<()>> {
926 let authenticated = AuthenticatedInstance::from_state(object, operation, state).await?;
927
928 match operation.authorize(object, state).await {
929 Ok(authorized) => {
930 assert!(authorized);
931 }
932 Err(err) => return Err(OperationError::Internal(err.to_string())),
933 };
934
935 Ok(AuthorizedInstance(authenticated))
936 }
937 }
938
939 // #[async_trait::async_trait> FromOperationState for Option<T> {
940 // type Error = ();
941
942 // async fn from_state(state: &StackOperationState) -> Result<Option<T>, OperationError<()>> {
943 // Ok(T::from_state(]
944 // impl<T: FromOperationStatestate).await.ok())
945 // }
946 // }
947
948 #[derive(Clone)]
949 pub struct StackOperationState {
950 pub our_instance: Instance,
951 pub giterated_backend: BackendWrapper,
952 pub instance: Option<AuthenticatedInstance>,
953 pub user: Option<AuthenticatedUser>,
954 }
955
956 #[derive(Clone, Debug)]
957 pub struct AuthenticatedInstance(Instance);
958
959 impl AuthenticatedInstance {
960 pub fn new(instance: Instance) -> Self {
961 AuthenticatedInstance(instance)
962 }
963 }
964
965 impl Deref for AuthenticatedInstance {
966 type Target = Instance;
967
968 fn deref(&self) -> &Self::Target {
969 &self.0
970 }
971 }
972
973 #[derive(Clone, Debug)]
974 pub struct AuthenticatedUser(User);
975
976 impl AuthenticatedUser {
977 pub fn new(user: User) -> Self {
978 AuthenticatedUser(user)
979 }
980 }
981
982 impl Deref for AuthenticatedUser {
983 type Target = User;
984
985 fn deref(&self) -> &Self::Target {
986 &self.0
987 }
988 }
989
990 #[derive(Clone)]
991 pub struct BackendWrapper {
992 sender: tokio::sync::mpsc::Sender<(
993 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
994 WrappedOperation,
995 )>,
996 task: Arc<JoinHandle<()>>,
997 }
998
999 pub struct WrappedOperation {
1000 object: AnyObject,
1001 operation_payload: AnyOperation,
1002 operation_name: String,
1003 state: StackOperationState,
1004 }
1005
1006 impl BackendWrapper {
1007 pub fn new<S: HandlerState>(backend: GiteratedBackend<S>) -> Self {
1008 // Spawn listener task
1009
1010 let (send, mut recv) = channel::<(
1011 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
1012 WrappedOperation,
1013 )>(1024);
1014
1015 let task = tokio::spawn(async move {
1016 while let Some((responder, message)) = recv.recv().await {
1017 let raw_result = backend
1018 .object_operation(
1019 message.object,
1020 &message.operation_name,
1021 message.operation_payload,
1022 &message.state,
1023 )
1024 .await;
1025
1026 responder.send(raw_result).unwrap();
1027 }
1028 error!("Error, thing's dead");
1029 });
1030
1031 Self {
1032 sender: send,
1033 task: Arc::new(task),
1034 }
1035 }
1036
1037 pub async fn call(&self, operation: WrappedOperation) -> Result<Value, OperationError<Value>> {
1038 let (sender, response) = tokio::sync::oneshot::channel();
1039
1040 self.sender
1041 .send((sender, operation))
1042 .await
1043 .map_err(|e| OperationError::Internal(e.to_string()))?;
1044
1045 match response.await {
1046 Ok(result) => Ok(result?),
1047 Err(err) => Err(OperationError::Internal(err.to_string())),
1048 }
1049 }
1050 }
1051
1052 use std::fmt::Debug;
1053
1054 #[async_trait::async_trait]
1055 impl ObjectBackend<StackOperationState> for BackendWrapper {
1056 async fn object_operation<O, D>(
1057 &self,
1058 object: O,
1059 operation: &str,
1060 payload: D,
1061 operation_state: &StackOperationState,
1062 ) -> Result<D::Success, OperationError<D::Failure>>
1063 where
1064 O: GiteratedObject + Debug,
1065 D: GiteratedOperation<O> + Debug,
1066 {
1067 let operation = WrappedOperation {
1068 object: AnyObject(object.to_string()),
1069 operation_name: operation.to_string(),
1070 operation_payload: AnyOperation(serde_json::to_value(payload).unwrap()),
1071 state: operation_state.clone(),
1072 };
1073
1074 let raw_result = self.call(operation).await;
1075
1076 match raw_result {
1077 Ok(result) => Ok(serde_json::from_value(result)
1078 .map_err(|e| OperationError::Internal(e.to_string()))?),
1079 Err(err) => match err {
1080 OperationError::Internal(internal) => {
1081 warn!(
1082 "Internal Error: {:?}",
1083 OperationError::<()>::Internal(internal.clone())
1084 );
1085
1086 Err(OperationError::Internal(internal))
1087 }
1088 OperationError::Unhandled => Err(OperationError::Unhandled),
1089 OperationError::Operation(err) => Err(OperationError::Operation(
1090 serde_json::from_value(err)
1091 .map_err(|e| OperationError::Internal(e.to_string()))?,
1092 )),
1093 },
1094 }
1095 }
1096
1097 async fn get_object<O: GiteratedObject + Debug>(
1098 &self,
1099 object_str: &str,
1100 operation_state: &StackOperationState,
1101 ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> {
1102 let operation = WrappedOperation {
1103 object: AnyObject(object_str.to_string()),
1104 operation_name: ObjectRequest::operation_name().to_string(),
1105 operation_payload: AnyOperation(
1106 serde_json::to_value(ObjectRequest(object_str.to_string())).unwrap(),
1107 ),
1108 state: operation_state.clone(),
1109 };
1110
1111 let raw_result = self.call(operation).await;
1112
1113 let object: ObjectResponse = match raw_result {
1114 Ok(result) => Ok(serde_json::from_value(result)
1115 .map_err(|e| OperationError::Internal(e.to_string()))?),
1116 Err(err) => match err {
1117 OperationError::Internal(internal) => {
1118 warn!(
1119 "Internal Error: {:?}",
1120 OperationError::<()>::Internal(internal.clone())
1121 );
1122
1123 Err(OperationError::Internal(internal))
1124 }
1125 OperationError::Unhandled => Err(OperationError::Unhandled),
1126 OperationError::Operation(err) => Err(OperationError::Operation(
1127 serde_json::from_value(err)
1128 .map_err(|e| OperationError::Internal(e.to_string()))?,
1129 )),
1130 },
1131 }?;
1132
1133 unsafe {
1134 Ok(Object::new_unchecked(
1135 O::from_str(&object.0)
1136 .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?,
1137 self.clone(),
1138 ))
1139 }
1140 }
1141 }
1142