JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add authentication back into the operation states

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨97a26fd

⁨giterated-stack/src/lib.rs⁩ - ⁨17259⁩ bytes
Raw
1 pub mod handler;
2 pub mod state;
3
4 use std::{collections::HashMap, future::Future, ops::Deref, pin::Pin, str::FromStr, sync::Arc};
5
6 use futures_util::FutureExt;
7 use giterated_models::{
8 error::OperationError,
9 instance::Instance,
10 object::{
11 AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse,
12 },
13 object_backend::ObjectBackend,
14 operation::{AnyOperation, GiteratedOperation},
15 repository::Repository,
16 user::User,
17 };
18 use handler::GiteratedBackend;
19 use serde::{de::DeserializeOwned, Serialize};
20 use serde_json::Value;
21 use state::HandlerState;
22 use tokio::{sync::mpsc::channel, task::JoinHandle};
23 use tracing::{error, warn};
24
25 #[derive(Clone, Debug, Hash, Eq, PartialEq)]
26 struct ObjectOperationPair {
27 object_name: String,
28 operation_name: String,
29 }
30
31 pub struct OperationHandlers<S: Send + Sync + Clone> {
32 operations: HashMap<ObjectOperationPair, OperationWrapper<S>>,
33 get_object: Vec<OperationWrapper<S>>,
34 }
35
36 impl<S: Send + Sync + Clone> Default for OperationHandlers<S> {
37 fn default() -> Self {
38 Self {
39 operations: HashMap::new(),
40 get_object: Vec::new(),
41 }
42 }
43 }
44
45 impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> {
46 pub fn insert<
47 A,
48 O: GiteratedObject + Send + Sync,
49 D: GiteratedOperation<O> + 'static,
50 H: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
51 >(
52 &mut self,
53 handler: H,
54 ) -> &mut Self {
55 let object_name = handler.object_name().to_string();
56 let operation_name = handler.operation_name().to_string();
57
58 let wrapped = OperationWrapper::new(handler);
59
60 let pair = ObjectOperationPair {
61 object_name,
62 operation_name,
63 };
64
65 assert!(self.operations.insert(pair, wrapped).is_none());
66
67 self
68 }
69
70 pub fn register_object<O: GiteratedObject + Send + Sync>(&mut self) -> &mut Self {
71 let closure = |_: &Instance, operation: ObjectRequest, _state| {
72 async move {
73 if O::from_str(&operation.0).is_ok() {
74 Ok(ObjectResponse(operation.0))
75 } else {
76 Err(OperationError::Unhandled)
77 }
78 }
79 .boxed()
80 };
81
82 let wrapped = OperationWrapper::new(closure);
83
84 self.get_object.push(wrapped);
85
86 self
87 }
88
89 pub async fn handle<O: GiteratedObject>(
90 &self,
91 object: &O,
92 operation_name: &str,
93 operation: AnyOperation,
94 state: S,
95 operation_state: &StackOperationState,
96 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
97 // TODO
98 let object = object.to_string();
99
100 let object_name = {
101 if User::from_str(&object).is_ok() {
102 User::object_name()
103 } else if Repository::from_str(&object).is_ok() {
104 Repository::object_name()
105 } else if Instance::from_str(&object).is_ok() {
106 Instance::object_name()
107 } else {
108 return Err(OperationError::Unhandled);
109 }
110 }
111 .to_string();
112
113 let target_handler = ObjectOperationPair {
114 object_name,
115 operation_name: operation_name.to_string(),
116 };
117
118 if let Some(handler) = self.operations.get(&target_handler) {
119 handler
120 .handle(
121 AnyObject(object.to_string()),
122 operation.clone(),
123 state.clone(),
124 operation_state,
125 )
126 .await
127 } else {
128 Err(OperationError::Unhandled)
129 }
130 }
131
132 pub async fn resolve_object(
133 &self,
134 instance: AnyObject,
135 request: ObjectRequest,
136 state: S,
137 operation_state: &StackOperationState,
138 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
139 for handler in self.get_object.iter() {
140 if let Ok(response) = handler
141 .handle(
142 instance.clone(),
143 AnyOperation(serde_json::to_value(request.clone()).unwrap()),
144 state.clone(),
145 operation_state,
146 )
147 .await
148 {
149 return Ok(response);
150 }
151 }
152
153 Err(OperationError::Unhandled)
154 }
155 }
156
157 #[async_trait::async_trait]
158 pub trait GiteratedOperationHandler<
159 L,
160 O: GiteratedObject,
161 D: GiteratedOperation<O>,
162 S: Send + Sync + Clone,
163 >
164 {
165 fn operation_name(&self) -> &str;
166 fn object_name(&self) -> &str;
167
168 async fn handle(
169 &self,
170 object: &O,
171 operation: D,
172 state: S,
173 operation_state: &StackOperationState,
174 ) -> Result<D::Success, OperationError<D::Failure>>;
175 }
176
177 #[async_trait::async_trait]
178 impl<O, D, F, S> GiteratedOperationHandler<(), O, D, S> for F
179 where
180 F: FnMut(
181 &O,
182 D,
183 S,
184 ) -> Pin<
185 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
186 > + Send
187 + Sync
188 + Clone,
189 O: GiteratedObject + Send + Sync,
190 D: GiteratedOperation<O> + 'static,
191 <D as GiteratedOperation<O>>::Failure: Send,
192 S: Send + Sync + Clone + 'static,
193 {
194 fn operation_name(&self) -> &str {
195 D::operation_name()
196 }
197
198 fn object_name(&self) -> &str {
199 O::object_name()
200 }
201
202 async fn handle(
203 &self,
204 object: &O,
205 operation: D,
206 state: S,
207 _operation_state: &StackOperationState,
208 ) -> Result<D::Success, OperationError<D::Failure>> {
209 self.clone()(object, operation, state).await
210 }
211 }
212
213 #[async_trait::async_trait]
214 impl<O, O1, D, F, S> GiteratedOperationHandler<(O1,), O, D, S> for F
215 where
216 F: FnMut(
217 &O,
218 D,
219 S,
220 O1,
221 ) -> Pin<
222 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
223 > + Send
224 + Sync
225 + Clone,
226 O: GiteratedObject + Send + Sync,
227 D: GiteratedOperation<O> + 'static,
228 <D as GiteratedOperation<O>>::Failure: Send,
229 S: Send + Sync + Clone + 'static,
230 O1: FromOperationState,
231 {
232 fn operation_name(&self) -> &str {
233 D::operation_name()
234 }
235
236 fn object_name(&self) -> &str {
237 O::object_name()
238 }
239
240 async fn handle(
241 &self,
242 object: &O,
243 operation: D,
244 state: S,
245 operation_state: &StackOperationState,
246 ) -> Result<D::Success, OperationError<D::Failure>> {
247 let o1 = O1::from_state(operation_state)
248 .await
249 .map_err(|e| OperationError::Internal(e.to_string()))?;
250 self.clone()(object, operation, state, o1).await
251 }
252 }
253
254 #[async_trait::async_trait]
255 impl<O, O1, O2, D, F, S> GiteratedOperationHandler<(O1, O2), O, D, S> for F
256 where
257 F: FnMut(
258 &O,
259 D,
260 S,
261 O1,
262 O2,
263 ) -> Pin<
264 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
265 > + Send
266 + Sync
267 + Clone,
268 O: GiteratedObject + Send + Sync,
269 D: GiteratedOperation<O> + 'static,
270 <D as GiteratedOperation<O>>::Failure: Send,
271 S: Send + Sync + Clone + 'static,
272 O1: FromOperationState,
273 O2: FromOperationState,
274 {
275 fn operation_name(&self) -> &str {
276 D::operation_name()
277 }
278
279 fn object_name(&self) -> &str {
280 O::object_name()
281 }
282
283 async fn handle(
284 &self,
285 object: &O,
286 operation: D,
287 state: S,
288 operation_state: &StackOperationState,
289 ) -> Result<D::Success, OperationError<D::Failure>> {
290 let o1 = O1::from_state(operation_state)
291 .await
292 .map_err(|e| OperationError::Internal(e.to_string()))?;
293 let o2 = O2::from_state(operation_state)
294 .await
295 .map_err(|e| OperationError::Internal(e.to_string()))?;
296 self.clone()(object, operation, state, o1, o2).await
297 }
298 }
299
300 pub struct OperationWrapper<S: Send + Sync + Clone> {
301 func: Box<
302 dyn Fn(
303 AnyObject,
304 AnyOperation,
305 S,
306 StackOperationState,
307 )
308 -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>>
309 + Send
310 + Sync,
311 >,
312 object_name: String,
313 }
314
315 impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> {
316 pub fn new<
317 A,
318 O: GiteratedObject + Send + Sync,
319 D: GiteratedOperation<O> + 'static,
320 F: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
321 >(
322 handler: F,
323 ) -> Self {
324 let handler = Arc::new(Box::pin(handler));
325 Self {
326 func: Box::new(move |any_object, any_operation, state, operation_state| {
327 let handler = handler.clone();
328 async move {
329 let handler = handler.clone();
330 let object: O =
331 O::from_object_str(&any_object.0).map_err(|_| OperationError::Unhandled)?;
332 let operation: D = serde_json::from_value(any_operation.0.clone())
333 .map_err(|_| OperationError::Unhandled)?;
334
335 let result = handler
336 .handle(&object, operation, state, &operation_state)
337 .await;
338 result
339 .map(|success| serde_json::to_vec(&success).unwrap())
340 .map_err(|err| match err {
341 OperationError::Operation(err) => {
342 OperationError::Operation(serde_json::to_vec(&err).unwrap())
343 }
344 OperationError::Internal(internal) => {
345 OperationError::Internal(internal)
346 }
347 OperationError::Unhandled => OperationError::Unhandled,
348 })
349 }
350 .boxed()
351 }),
352 object_name: O::object_name().to_string(),
353 }
354 }
355
356 async fn handle(
357 &self,
358 object: AnyObject,
359 operation: AnyOperation,
360 state: S,
361 operation_state: &StackOperationState,
362 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
363 (self.func)(object, operation, state, operation_state.clone()).await
364 }
365 }
366
367 #[async_trait::async_trait]
368 pub trait FromOperationState: Sized + Clone + Send {
369 type Error: Serialize + DeserializeOwned;
370
371 async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<Self::Error>>;
372 }
373
374 #[async_trait::async_trait]
375 impl FromOperationState for BackendWrapper {
376 type Error = ();
377
378 async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<()>> {
379 Ok(state.giterated_backend.clone())
380 }
381 }
382
383 #[async_trait::async_trait]
384 impl FromOperationState for StackOperationState {
385 type Error = ();
386
387 async fn from_state(
388 state: &StackOperationState,
389 ) -> Result<StackOperationState, OperationError<()>> {
390 Ok(state.clone())
391 }
392 }
393
394 #[derive(Clone)]
395 pub struct StackOperationState {
396 pub giterated_backend: BackendWrapper,
397 pub instance: Option<AuthenticatedInstance>,
398 pub user: Option<AuthenticatedUser>,
399 }
400
401 #[derive(Clone, Debug)]
402 pub struct AuthenticatedInstance(Instance);
403
404 impl AuthenticatedInstance {
405 pub fn new(instance: Instance) -> Self {
406 AuthenticatedInstance(instance)
407 }
408 }
409
410 impl Deref for AuthenticatedInstance {
411 type Target = Instance;
412
413 fn deref(&self) -> &Self::Target {
414 &self.0
415 }
416 }
417
418 #[derive(Clone, Debug)]
419 pub struct AuthenticatedUser(User);
420
421 impl AuthenticatedUser {
422 pub fn new(user: User) -> Self {
423 AuthenticatedUser(user)
424 }
425 }
426
427 impl Deref for AuthenticatedUser {
428 type Target = User;
429
430 fn deref(&self) -> &Self::Target {
431 &self.0
432 }
433 }
434
435 #[derive(Clone)]
436 pub struct BackendWrapper {
437 sender: tokio::sync::mpsc::Sender<(
438 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
439 WrappedOperation,
440 )>,
441 task: Arc<JoinHandle<()>>,
442 }
443
444 pub struct WrappedOperation {
445 object: AnyObject,
446 operation_payload: AnyOperation,
447 operation_name: String,
448 state: StackOperationState,
449 }
450
451 impl BackendWrapper {
452 pub fn new<S: HandlerState>(backend: GiteratedBackend<S>) -> Self {
453 // Spawn listener task
454
455 let (send, mut recv) = channel::<(
456 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
457 WrappedOperation,
458 )>(1024);
459
460 let task = tokio::spawn(async move {
461 while let Some((responder, message)) = recv.recv().await {
462 let raw_result = backend
463 .object_operation(
464 message.object,
465 &message.operation_name,
466 message.operation_payload,
467 &message.state,
468 )
469 .await;
470
471 responder.send(raw_result).unwrap();
472 }
473 error!("Error, thing's dead");
474 });
475
476 Self {
477 sender: send,
478 task: Arc::new(task),
479 }
480 }
481
482 pub async fn call(&self, operation: WrappedOperation) -> Result<Value, OperationError<Value>> {
483 let (sender, response) = tokio::sync::oneshot::channel();
484
485 self.sender
486 .send((sender, operation))
487 .await
488 .map_err(|e| OperationError::Internal(e.to_string()))?;
489
490 match response.await {
491 Ok(result) => Ok(result?),
492 Err(err) => Err(OperationError::Internal(err.to_string())),
493 }
494 }
495 }
496
497 use std::fmt::Debug;
498
499 #[async_trait::async_trait]
500 impl ObjectBackend<StackOperationState> for BackendWrapper {
501 async fn object_operation<O, D>(
502 &self,
503 object: O,
504 operation: &str,
505 payload: D,
506 operation_state: &StackOperationState,
507 ) -> Result<D::Success, OperationError<D::Failure>>
508 where
509 O: GiteratedObject + Debug,
510 D: GiteratedOperation<O> + Debug,
511 {
512 let operation = WrappedOperation {
513 object: AnyObject(object.to_string()),
514 operation_name: operation.to_string(),
515 operation_payload: AnyOperation(serde_json::to_value(payload).unwrap()),
516 state: operation_state.clone(),
517 };
518
519 let raw_result = self.call(operation).await;
520
521 match raw_result {
522 Ok(result) => Ok(serde_json::from_value(result)
523 .map_err(|e| OperationError::Internal(e.to_string()))?),
524 Err(err) => match err {
525 OperationError::Internal(internal) => {
526 warn!(
527 "Internal Error: {:?}",
528 OperationError::<()>::Internal(internal.clone())
529 );
530
531 Err(OperationError::Internal(internal))
532 }
533 OperationError::Unhandled => Err(OperationError::Unhandled),
534 OperationError::Operation(err) => Err(OperationError::Operation(
535 serde_json::from_value(err)
536 .map_err(|e| OperationError::Internal(e.to_string()))?,
537 )),
538 },
539 }
540 }
541
542 async fn get_object<O: GiteratedObject + Debug>(
543 &self,
544 object_str: &str,
545 operation_state: &StackOperationState,
546 ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> {
547 let operation = WrappedOperation {
548 object: AnyObject(object_str.to_string()),
549 operation_name: ObjectRequest::operation_name().to_string(),
550 operation_payload: AnyOperation(
551 serde_json::to_value(ObjectRequest(object_str.to_string())).unwrap(),
552 ),
553 state: operation_state.clone(),
554 };
555
556 let raw_result = self.call(operation).await;
557
558 let object: ObjectResponse = match raw_result {
559 Ok(result) => Ok(serde_json::from_value(result)
560 .map_err(|e| OperationError::Internal(e.to_string()))?),
561 Err(err) => match err {
562 OperationError::Internal(internal) => {
563 warn!(
564 "Internal Error: {:?}",
565 OperationError::<()>::Internal(internal.clone())
566 );
567
568 Err(OperationError::Internal(internal))
569 }
570 OperationError::Unhandled => Err(OperationError::Unhandled),
571 OperationError::Operation(err) => Err(OperationError::Operation(
572 serde_json::from_value(err)
573 .map_err(|e| OperationError::Internal(e.to_string()))?,
574 )),
575 },
576 }?;
577
578 unsafe {
579 Ok(Object::new_unchecked(
580 O::from_str(&object.0)
581 .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?,
582 self.clone(),
583 ))
584 }
585 }
586 }
587