JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Implement `FromOperationState` for `AuthenticatedUser` and `AuthenticatedInstance`

Use `AuthenticatedUser` on repository requests so we can filter by privacy. Woohoo! Attempt to filter `UserRepositoriesRequest` responses by visibility to the requester.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨75dcac3

⁨giterated-stack/src/lib.rs⁩ - ⁨19669⁩ bytes
Raw
1 pub mod handler;
2 pub mod state;
3
4 use std::{collections::HashMap, future::Future, ops::Deref, pin::Pin, str::FromStr, sync::Arc};
5
6 use futures_util::FutureExt;
7 use giterated_models::{
8 error::OperationError,
9 instance::Instance,
10 object::{
11 AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse,
12 },
13 object_backend::ObjectBackend,
14 operation::{AnyOperation, GiteratedOperation},
15 repository::Repository,
16 user::User,
17 };
18 use handler::GiteratedBackend;
19 use serde::{de::DeserializeOwned, Serialize};
20 use serde_json::Value;
21 use state::HandlerState;
22 use tokio::{sync::mpsc::channel, task::JoinHandle};
23 use tracing::{error, warn};
24
25 #[derive(Clone, Debug, Hash, Eq, PartialEq)]
26 struct ObjectOperationPair {
27 object_name: String,
28 operation_name: String,
29 }
30
31 pub struct OperationHandlers<S: Send + Sync + Clone> {
32 operations: HashMap<ObjectOperationPair, OperationWrapper<S>>,
33 get_object: Vec<OperationWrapper<S>>,
34 }
35
36 impl<S: Send + Sync + Clone> Default for OperationHandlers<S> {
37 fn default() -> Self {
38 Self {
39 operations: HashMap::new(),
40 get_object: Vec::new(),
41 }
42 }
43 }
44
45 impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> {
46 pub fn insert<
47 A,
48 O: GiteratedObject + Send + Sync,
49 D: GiteratedOperation<O> + 'static,
50 H: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
51 >(
52 &mut self,
53 handler: H,
54 ) -> &mut Self {
55 let object_name = handler.object_name().to_string();
56 let operation_name = handler.operation_name().to_string();
57
58 let wrapped = OperationWrapper::new(handler);
59
60 let pair = ObjectOperationPair {
61 object_name,
62 operation_name,
63 };
64
65 assert!(self.operations.insert(pair, wrapped).is_none());
66
67 self
68 }
69
70 pub fn register_object<O: GiteratedObject + Send + Sync>(&mut self) -> &mut Self {
71 let closure = |_: &Instance, operation: ObjectRequest, _state| {
72 async move {
73 if O::from_str(&operation.0).is_ok() {
74 Ok(ObjectResponse(operation.0))
75 } else {
76 Err(OperationError::Unhandled)
77 }
78 }
79 .boxed()
80 };
81
82 let wrapped = OperationWrapper::new(closure);
83
84 self.get_object.push(wrapped);
85
86 self
87 }
88
89 pub async fn handle<O: GiteratedObject>(
90 &self,
91 object: &O,
92 operation_name: &str,
93 operation: AnyOperation,
94 state: S,
95 operation_state: &StackOperationState,
96 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
97 // TODO
98 let object = object.to_string();
99
100 let object_name = {
101 if User::from_str(&object).is_ok() {
102 User::object_name()
103 } else if Repository::from_str(&object).is_ok() {
104 Repository::object_name()
105 } else if Instance::from_str(&object).is_ok() {
106 Instance::object_name()
107 } else {
108 return Err(OperationError::Unhandled);
109 }
110 }
111 .to_string();
112
113 let target_handler = ObjectOperationPair {
114 object_name,
115 operation_name: operation_name.to_string(),
116 };
117
118 if let Some(handler) = self.operations.get(&target_handler) {
119 handler
120 .handle(
121 AnyObject(object.to_string()),
122 operation.clone(),
123 state.clone(),
124 operation_state,
125 )
126 .await
127 } else {
128 Err(OperationError::Unhandled)
129 }
130 }
131
132 pub async fn resolve_object(
133 &self,
134 instance: AnyObject,
135 request: ObjectRequest,
136 state: S,
137 operation_state: &StackOperationState,
138 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
139 for handler in self.get_object.iter() {
140 if let Ok(response) = handler
141 .handle(
142 instance.clone(),
143 AnyOperation(serde_json::to_value(request.clone()).unwrap()),
144 state.clone(),
145 operation_state,
146 )
147 .await
148 {
149 return Ok(response);
150 }
151 }
152
153 Err(OperationError::Unhandled)
154 }
155 }
156
157 #[async_trait::async_trait]
158 pub trait GiteratedOperationHandler<
159 L,
160 O: GiteratedObject,
161 D: GiteratedOperation<O>,
162 S: Send + Sync + Clone,
163 >
164 {
165 fn operation_name(&self) -> &str;
166 fn object_name(&self) -> &str;
167
168 async fn handle(
169 &self,
170 object: &O,
171 operation: D,
172 state: S,
173 operation_state: &StackOperationState,
174 ) -> Result<D::Success, OperationError<D::Failure>>;
175 }
176
177 #[async_trait::async_trait]
178 impl<O, D, F, S> GiteratedOperationHandler<(), O, D, S> for F
179 where
180 F: FnMut(
181 &O,
182 D,
183 S,
184 ) -> Pin<
185 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
186 > + Send
187 + Sync
188 + Clone,
189 O: GiteratedObject + Send + Sync,
190 D: GiteratedOperation<O> + 'static,
191 <D as GiteratedOperation<O>>::Failure: Send,
192 S: Send + Sync + Clone + 'static,
193 {
194 fn operation_name(&self) -> &str {
195 D::operation_name()
196 }
197
198 fn object_name(&self) -> &str {
199 O::object_name()
200 }
201
202 async fn handle(
203 &self,
204 object: &O,
205 operation: D,
206 state: S,
207 _operation_state: &StackOperationState,
208 ) -> Result<D::Success, OperationError<D::Failure>> {
209 self.clone()(object, operation, state).await
210 }
211 }
212
213 #[async_trait::async_trait]
214 impl<O, O1, D, F, S> GiteratedOperationHandler<(O1,), O, D, S> for F
215 where
216 F: FnMut(
217 &O,
218 D,
219 S,
220 O1,
221 ) -> Pin<
222 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
223 > + Send
224 + Sync
225 + Clone,
226 O: GiteratedObject + Send + Sync,
227 D: GiteratedOperation<O> + 'static,
228 <D as GiteratedOperation<O>>::Failure: Send,
229 S: Send + Sync + Clone + 'static,
230 O1: FromOperationState,
231 {
232 fn operation_name(&self) -> &str {
233 D::operation_name()
234 }
235
236 fn object_name(&self) -> &str {
237 O::object_name()
238 }
239
240 async fn handle(
241 &self,
242 object: &O,
243 operation: D,
244 state: S,
245 operation_state: &StackOperationState,
246 ) -> Result<D::Success, OperationError<D::Failure>> {
247 let o1 = O1::from_state(operation_state)
248 .await
249 .map_err(|e| OperationError::Internal(e.to_string()))?;
250 self.clone()(object, operation, state, o1).await
251 }
252 }
253
254 #[async_trait::async_trait]
255 impl<O, O1, O2, D, F, S> GiteratedOperationHandler<(O1, O2), O, D, S> for F
256 where
257 F: FnMut(
258 &O,
259 D,
260 S,
261 O1,
262 O2,
263 ) -> Pin<
264 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
265 > + Send
266 + Sync
267 + Clone,
268 O: GiteratedObject + Send + Sync,
269 D: GiteratedOperation<O> + 'static,
270 <D as GiteratedOperation<O>>::Failure: Send,
271 S: Send + Sync + Clone + 'static,
272 O1: FromOperationState,
273 O2: FromOperationState,
274 {
275 fn operation_name(&self) -> &str {
276 D::operation_name()
277 }
278
279 fn object_name(&self) -> &str {
280 O::object_name()
281 }
282
283 async fn handle(
284 &self,
285 object: &O,
286 operation: D,
287 state: S,
288 operation_state: &StackOperationState,
289 ) -> Result<D::Success, OperationError<D::Failure>> {
290 let o1 = O1::from_state(operation_state)
291 .await
292 .map_err(|e| OperationError::Internal(e.to_string()))?;
293 let o2 = O2::from_state(operation_state)
294 .await
295 .map_err(|e| OperationError::Internal(e.to_string()))?;
296 self.clone()(object, operation, state, o1, o2).await
297 }
298 }
299
300 #[async_trait::async_trait]
301 impl<O, O1, O2, O3, D, F, S> GiteratedOperationHandler<(O1, O2, O3), O, D, S> for F
302 where
303 F: FnMut(
304 &O,
305 D,
306 S,
307 O1,
308 O2,
309 O3,
310 ) -> Pin<
311 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
312 > + Send
313 + Sync
314 + Clone,
315 O: GiteratedObject + Send + Sync,
316 D: GiteratedOperation<O> + 'static,
317 <D as GiteratedOperation<O>>::Failure: Send,
318 S: Send + Sync + Clone + 'static,
319 O1: FromOperationState,
320 O2: FromOperationState,
321 O3: FromOperationState,
322 {
323 fn operation_name(&self) -> &str {
324 D::operation_name()
325 }
326
327 fn object_name(&self) -> &str {
328 O::object_name()
329 }
330
331 async fn handle(
332 &self,
333 object: &O,
334 operation: D,
335 state: S,
336 operation_state: &StackOperationState,
337 ) -> Result<D::Success, OperationError<D::Failure>> {
338 let o1 = O1::from_state(operation_state)
339 .await
340 .map_err(|e| OperationError::Internal(e.to_string()))?;
341 let o2 = O2::from_state(operation_state)
342 .await
343 .map_err(|e| OperationError::Internal(e.to_string()))?;
344 let o3 = O3::from_state(operation_state)
345 .await
346 .map_err(|e| OperationError::Internal(e.to_string()))?;
347 self.clone()(object, operation, state, o1, o2, o3).await
348 }
349 }
350
351 pub struct OperationWrapper<S: Send + Sync + Clone> {
352 func: Box<
353 dyn Fn(
354 AnyObject,
355 AnyOperation,
356 S,
357 StackOperationState,
358 )
359 -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>>
360 + Send
361 + Sync,
362 >,
363 object_name: String,
364 }
365
366 impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> {
367 pub fn new<
368 A,
369 O: GiteratedObject + Send + Sync,
370 D: GiteratedOperation<O> + 'static,
371 F: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
372 >(
373 handler: F,
374 ) -> Self {
375 let handler = Arc::new(Box::pin(handler));
376 Self {
377 func: Box::new(move |any_object, any_operation, state, operation_state| {
378 let handler = handler.clone();
379 async move {
380 let handler = handler.clone();
381 let object: O =
382 O::from_object_str(&any_object.0).map_err(|_| OperationError::Unhandled)?;
383 let operation: D = serde_json::from_value(any_operation.0.clone())
384 .map_err(|_| OperationError::Unhandled)?;
385
386 let result = handler
387 .handle(&object, operation, state, &operation_state)
388 .await;
389 result
390 .map(|success| serde_json::to_vec(&success).unwrap())
391 .map_err(|err| match err {
392 OperationError::Operation(err) => {
393 OperationError::Operation(serde_json::to_vec(&err).unwrap())
394 }
395 OperationError::Internal(internal) => {
396 OperationError::Internal(internal)
397 }
398 OperationError::Unhandled => OperationError::Unhandled,
399 })
400 }
401 .boxed()
402 }),
403 object_name: O::object_name().to_string(),
404 }
405 }
406
407 async fn handle(
408 &self,
409 object: AnyObject,
410 operation: AnyOperation,
411 state: S,
412 operation_state: &StackOperationState,
413 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
414 (self.func)(object, operation, state, operation_state.clone()).await
415 }
416 }
417
418 #[async_trait::async_trait]
419 pub trait FromOperationState: Sized + Clone + Send {
420 type Error: Serialize + DeserializeOwned;
421
422 async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<Self::Error>>;
423 }
424
425 #[async_trait::async_trait]
426 impl FromOperationState for BackendWrapper {
427 type Error = ();
428
429 async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<()>> {
430 Ok(state.giterated_backend.clone())
431 }
432 }
433
434 #[async_trait::async_trait]
435 impl FromOperationState for StackOperationState {
436 type Error = ();
437
438 async fn from_state(
439 state: &StackOperationState,
440 ) -> Result<StackOperationState, OperationError<()>> {
441 Ok(state.clone())
442 }
443 }
444
445 #[async_trait::async_trait]
446 impl FromOperationState for AuthenticatedUser {
447 type Error = ();
448
449 async fn from_state(
450 state: &StackOperationState,
451 ) -> Result<AuthenticatedUser, OperationError<()>> {
452 state
453 .user
454 .clone()
455 .ok_or_else(|| OperationError::Operation(()))
456 }
457 }
458
459 #[async_trait::async_trait]
460 impl FromOperationState for AuthenticatedInstance {
461 type Error = ();
462
463 async fn from_state(
464 state: &StackOperationState,
465 ) -> Result<AuthenticatedInstance, OperationError<()>> {
466 state
467 .instance
468 .clone()
469 .ok_or_else(|| OperationError::Operation(()))
470 }
471 }
472
473 #[async_trait::async_trait]
474 impl<T: FromOperationState> FromOperationState for Option<T> {
475 type Error = ();
476
477 async fn from_state(state: &StackOperationState) -> Result<Option<T>, OperationError<()>> {
478 Ok(T::from_state(state).await.ok())
479 }
480 }
481
482 #[derive(Clone)]
483 pub struct StackOperationState {
484 pub giterated_backend: BackendWrapper,
485 pub instance: Option<AuthenticatedInstance>,
486 pub user: Option<AuthenticatedUser>,
487 }
488
489 #[derive(Clone, Debug)]
490 pub struct AuthenticatedInstance(Instance);
491
492 impl AuthenticatedInstance {
493 pub fn new(instance: Instance) -> Self {
494 AuthenticatedInstance(instance)
495 }
496 }
497
498 impl Deref for AuthenticatedInstance {
499 type Target = Instance;
500
501 fn deref(&self) -> &Self::Target {
502 &self.0
503 }
504 }
505
506 #[derive(Clone, Debug)]
507 pub struct AuthenticatedUser(User);
508
509 impl AuthenticatedUser {
510 pub fn new(user: User) -> Self {
511 AuthenticatedUser(user)
512 }
513 }
514
515 impl Deref for AuthenticatedUser {
516 type Target = User;
517
518 fn deref(&self) -> &Self::Target {
519 &self.0
520 }
521 }
522
523 #[derive(Clone)]
524 pub struct BackendWrapper {
525 sender: tokio::sync::mpsc::Sender<(
526 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
527 WrappedOperation,
528 )>,
529 task: Arc<JoinHandle<()>>,
530 }
531
532 pub struct WrappedOperation {
533 object: AnyObject,
534 operation_payload: AnyOperation,
535 operation_name: String,
536 state: StackOperationState,
537 }
538
539 impl BackendWrapper {
540 pub fn new<S: HandlerState>(backend: GiteratedBackend<S>) -> Self {
541 // Spawn listener task
542
543 let (send, mut recv) = channel::<(
544 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
545 WrappedOperation,
546 )>(1024);
547
548 let task = tokio::spawn(async move {
549 while let Some((responder, message)) = recv.recv().await {
550 let raw_result = backend
551 .object_operation(
552 message.object,
553 &message.operation_name,
554 message.operation_payload,
555 &message.state,
556 )
557 .await;
558
559 responder.send(raw_result).unwrap();
560 }
561 error!("Error, thing's dead");
562 });
563
564 Self {
565 sender: send,
566 task: Arc::new(task),
567 }
568 }
569
570 pub async fn call(&self, operation: WrappedOperation) -> Result<Value, OperationError<Value>> {
571 let (sender, response) = tokio::sync::oneshot::channel();
572
573 self.sender
574 .send((sender, operation))
575 .await
576 .map_err(|e| OperationError::Internal(e.to_string()))?;
577
578 match response.await {
579 Ok(result) => Ok(result?),
580 Err(err) => Err(OperationError::Internal(err.to_string())),
581 }
582 }
583 }
584
585 use std::fmt::Debug;
586
587 #[async_trait::async_trait]
588 impl ObjectBackend<StackOperationState> for BackendWrapper {
589 async fn object_operation<O, D>(
590 &self,
591 object: O,
592 operation: &str,
593 payload: D,
594 operation_state: &StackOperationState,
595 ) -> Result<D::Success, OperationError<D::Failure>>
596 where
597 O: GiteratedObject + Debug,
598 D: GiteratedOperation<O> + Debug,
599 {
600 let operation = WrappedOperation {
601 object: AnyObject(object.to_string()),
602 operation_name: operation.to_string(),
603 operation_payload: AnyOperation(serde_json::to_value(payload).unwrap()),
604 state: operation_state.clone(),
605 };
606
607 let raw_result = self.call(operation).await;
608
609 match raw_result {
610 Ok(result) => Ok(serde_json::from_value(result)
611 .map_err(|e| OperationError::Internal(e.to_string()))?),
612 Err(err) => match err {
613 OperationError::Internal(internal) => {
614 warn!(
615 "Internal Error: {:?}",
616 OperationError::<()>::Internal(internal.clone())
617 );
618
619 Err(OperationError::Internal(internal))
620 }
621 OperationError::Unhandled => Err(OperationError::Unhandled),
622 OperationError::Operation(err) => Err(OperationError::Operation(
623 serde_json::from_value(err)
624 .map_err(|e| OperationError::Internal(e.to_string()))?,
625 )),
626 },
627 }
628 }
629
630 async fn get_object<O: GiteratedObject + Debug>(
631 &self,
632 object_str: &str,
633 operation_state: &StackOperationState,
634 ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> {
635 let operation = WrappedOperation {
636 object: AnyObject(object_str.to_string()),
637 operation_name: ObjectRequest::operation_name().to_string(),
638 operation_payload: AnyOperation(
639 serde_json::to_value(ObjectRequest(object_str.to_string())).unwrap(),
640 ),
641 state: operation_state.clone(),
642 };
643
644 let raw_result = self.call(operation).await;
645
646 let object: ObjectResponse = match raw_result {
647 Ok(result) => Ok(serde_json::from_value(result)
648 .map_err(|e| OperationError::Internal(e.to_string()))?),
649 Err(err) => match err {
650 OperationError::Internal(internal) => {
651 warn!(
652 "Internal Error: {:?}",
653 OperationError::<()>::Internal(internal.clone())
654 );
655
656 Err(OperationError::Internal(internal))
657 }
658 OperationError::Unhandled => Err(OperationError::Unhandled),
659 OperationError::Operation(err) => Err(OperationError::Operation(
660 serde_json::from_value(err)
661 .map_err(|e| OperationError::Internal(e.to_string()))?,
662 )),
663 },
664 }?;
665
666 unsafe {
667 Ok(Object::new_unchecked(
668 O::from_str(&object.0)
669 .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?,
670 self.clone(),
671 ))
672 }
673 }
674 }
675