JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Finish unified stack refactor.

Adds support for operation state, which will be used to pass authentication information around. Added generic backend that uses a channel to communicate with a typed backend.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨d15581c

⁨giterated-stack/src/lib.rs⁩ - ⁨16545⁩ bytes
Raw
1 pub mod handler;
2 pub mod state;
3
4 use std::{collections::HashMap, future::Future, pin::Pin, str::FromStr, sync::Arc};
5
6 use futures_util::FutureExt;
7 use giterated_models::{
8 error::OperationError,
9 instance::Instance,
10 object::{
11 AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse,
12 },
13 object_backend::ObjectBackend,
14 operation::{AnyOperation, GiteratedOperation},
15 repository::Repository,
16 user::User,
17 };
18 use handler::GiteratedBackend;
19 use serde::{de::DeserializeOwned, Serialize};
20 use serde_json::Value;
21 use state::HandlerState;
22 use tokio::{sync::mpsc::channel, task::JoinHandle};
23 use tracing::{error, warn};
24
25 #[derive(Clone, Debug, Hash, Eq, PartialEq)]
26 struct ObjectOperationPair {
27 object_name: String,
28 operation_name: String,
29 }
30
31 pub struct OperationHandlers<S: Send + Sync + Clone> {
32 operations: HashMap<ObjectOperationPair, OperationWrapper<S>>,
33 get_object: Vec<OperationWrapper<S>>,
34 }
35
36 impl<S: Send + Sync + Clone> Default for OperationHandlers<S> {
37 fn default() -> Self {
38 Self {
39 operations: HashMap::new(),
40 get_object: Vec::new(),
41 }
42 }
43 }
44
45 impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> {
46 pub fn insert<
47 A,
48 O: GiteratedObject + Send + Sync,
49 D: GiteratedOperation<O> + 'static,
50 H: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
51 >(
52 &mut self,
53 handler: H,
54 ) -> &mut Self {
55 let object_name = handler.object_name().to_string();
56 let operation_name = handler.operation_name().to_string();
57
58 let wrapped = OperationWrapper::new(handler);
59
60 let pair = ObjectOperationPair {
61 object_name,
62 operation_name,
63 };
64
65 assert!(self.operations.insert(pair, wrapped).is_none());
66
67 self
68 }
69
70 pub fn register_object<O: GiteratedObject + Send + Sync>(&mut self) -> &mut Self {
71 let closure = |_: &Instance, operation: ObjectRequest, _state| {
72 async move {
73 if O::from_str(&operation.0).is_ok() {
74 Ok(ObjectResponse(operation.0))
75 } else {
76 Err(OperationError::Unhandled)
77 }
78 }
79 .boxed()
80 };
81
82 let wrapped = OperationWrapper::new(closure);
83
84 self.get_object.push(wrapped);
85
86 self
87 }
88
89 pub async fn handle<O: GiteratedObject>(
90 &self,
91 object: &O,
92 operation_name: &str,
93 operation: AnyOperation,
94 state: S,
95 operation_state: &StackOperationState,
96 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
97 // TODO
98 let object = object.to_string();
99
100 let object_name = {
101 if User::from_str(&object).is_ok() {
102 User::object_name()
103 } else if Repository::from_str(&object).is_ok() {
104 Repository::object_name()
105 } else if Instance::from_str(&object).is_ok() {
106 Instance::object_name()
107 } else {
108 return Err(OperationError::Unhandled);
109 }
110 }
111 .to_string();
112
113 let target_handler = ObjectOperationPair {
114 object_name,
115 operation_name: operation_name.to_string(),
116 };
117
118 if let Some(handler) = self.operations.get(&target_handler) {
119 handler
120 .handle(
121 AnyObject(object.to_string()),
122 operation.clone(),
123 state.clone(),
124 operation_state,
125 )
126 .await
127 } else {
128 Err(OperationError::Unhandled)
129 }
130 }
131
132 pub async fn resolve_object(
133 &self,
134 instance: AnyObject,
135 request: ObjectRequest,
136 state: S,
137 operation_state: &StackOperationState,
138 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
139 for handler in self.get_object.iter() {
140 if let Ok(response) = handler
141 .handle(
142 instance.clone(),
143 AnyOperation(serde_json::to_value(request.clone()).unwrap()),
144 state.clone(),
145 operation_state,
146 )
147 .await
148 {
149 return Ok(response);
150 }
151 }
152
153 Err(OperationError::Unhandled)
154 }
155 }
156
157 #[async_trait::async_trait]
158 pub trait GiteratedOperationHandler<
159 L,
160 O: GiteratedObject,
161 D: GiteratedOperation<O>,
162 S: Send + Sync + Clone,
163 >
164 {
165 fn operation_name(&self) -> &str;
166 fn object_name(&self) -> &str;
167
168 async fn handle(
169 &self,
170 object: &O,
171 operation: D,
172 state: S,
173 operation_state: &StackOperationState,
174 ) -> Result<D::Success, OperationError<D::Failure>>;
175 }
176
177 #[async_trait::async_trait]
178 impl<O, D, F, S> GiteratedOperationHandler<(), O, D, S> for F
179 where
180 F: FnMut(
181 &O,
182 D,
183 S,
184 ) -> Pin<
185 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
186 > + Send
187 + Sync
188 + Clone,
189 O: GiteratedObject + Send + Sync,
190 D: GiteratedOperation<O> + 'static,
191 <D as GiteratedOperation<O>>::Failure: Send,
192 S: Send + Sync + Clone + 'static,
193 {
194 fn operation_name(&self) -> &str {
195 D::operation_name()
196 }
197
198 fn object_name(&self) -> &str {
199 O::object_name()
200 }
201
202 async fn handle(
203 &self,
204 object: &O,
205 operation: D,
206 state: S,
207 _operation_state: &StackOperationState,
208 ) -> Result<D::Success, OperationError<D::Failure>> {
209 self.clone()(object, operation, state).await
210 }
211 }
212
213 #[async_trait::async_trait]
214 impl<O, O1, D, F, S> GiteratedOperationHandler<(O1,), O, D, S> for F
215 where
216 F: FnMut(
217 &O,
218 D,
219 S,
220 O1,
221 ) -> Pin<
222 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
223 > + Send
224 + Sync
225 + Clone,
226 O: GiteratedObject + Send + Sync,
227 D: GiteratedOperation<O> + 'static,
228 <D as GiteratedOperation<O>>::Failure: Send,
229 S: Send + Sync + Clone + 'static,
230 O1: FromOperationState,
231 {
232 fn operation_name(&self) -> &str {
233 D::operation_name()
234 }
235
236 fn object_name(&self) -> &str {
237 O::object_name()
238 }
239
240 async fn handle(
241 &self,
242 object: &O,
243 operation: D,
244 state: S,
245 operation_state: &StackOperationState,
246 ) -> Result<D::Success, OperationError<D::Failure>> {
247 let o1 = O1::from_state(operation_state)
248 .await
249 .map_err(|e| OperationError::Internal(e.to_string()))?;
250 self.clone()(object, operation, state, o1).await
251 }
252 }
253
254 #[async_trait::async_trait]
255 impl<O, O1, O2, D, F, S> GiteratedOperationHandler<(O1, O2), O, D, S> for F
256 where
257 F: FnMut(
258 &O,
259 D,
260 S,
261 O1,
262 O2,
263 ) -> Pin<
264 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
265 > + Send
266 + Sync
267 + Clone,
268 O: GiteratedObject + Send + Sync,
269 D: GiteratedOperation<O> + 'static,
270 <D as GiteratedOperation<O>>::Failure: Send,
271 S: Send + Sync + Clone + 'static,
272 O1: FromOperationState,
273 O2: FromOperationState,
274 {
275 fn operation_name(&self) -> &str {
276 D::operation_name()
277 }
278
279 fn object_name(&self) -> &str {
280 O::object_name()
281 }
282
283 async fn handle(
284 &self,
285 object: &O,
286 operation: D,
287 state: S,
288 operation_state: &StackOperationState,
289 ) -> Result<D::Success, OperationError<D::Failure>> {
290 let o1 = O1::from_state(operation_state)
291 .await
292 .map_err(|e| OperationError::Internal(e.to_string()))?;
293 let o2 = O2::from_state(operation_state)
294 .await
295 .map_err(|e| OperationError::Internal(e.to_string()))?;
296 self.clone()(object, operation, state, o1, o2).await
297 }
298 }
299
300 pub struct OperationWrapper<S: Send + Sync + Clone> {
301 func: Box<
302 dyn Fn(
303 AnyObject,
304 AnyOperation,
305 S,
306 StackOperationState,
307 )
308 -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>>
309 + Send
310 + Sync,
311 >,
312 object_name: String,
313 }
314
315 impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> {
316 pub fn new<
317 A,
318 O: GiteratedObject + Send + Sync,
319 D: GiteratedOperation<O> + 'static,
320 F: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone,
321 >(
322 handler: F,
323 ) -> Self {
324 let handler = Arc::new(Box::pin(handler));
325 Self {
326 func: Box::new(move |any_object, any_operation, state, operation_state| {
327 let handler = handler.clone();
328 async move {
329 let handler = handler.clone();
330 let object: O =
331 O::from_object_str(&any_object.0).map_err(|_| OperationError::Unhandled)?;
332 let operation: D = serde_json::from_value(any_operation.0.clone())
333 .map_err(|_| OperationError::Unhandled)?;
334
335 let result = handler
336 .handle(&object, operation, state, &operation_state)
337 .await;
338 result
339 .map(|success| serde_json::to_vec(&success).unwrap())
340 .map_err(|err| match err {
341 OperationError::Operation(err) => {
342 OperationError::Operation(serde_json::to_vec(&err).unwrap())
343 }
344 OperationError::Internal(internal) => {
345 OperationError::Internal(internal)
346 }
347 OperationError::Unhandled => OperationError::Unhandled,
348 })
349 }
350 .boxed()
351 }),
352 object_name: O::object_name().to_string(),
353 }
354 }
355
356 async fn handle(
357 &self,
358 object: AnyObject,
359 operation: AnyOperation,
360 state: S,
361 operation_state: &StackOperationState,
362 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
363 (self.func)(object, operation, state, operation_state.clone()).await
364 }
365 }
366
367 #[async_trait::async_trait]
368 pub trait FromOperationState: Sized + Clone + Send {
369 type Error: Serialize + DeserializeOwned;
370
371 async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<Self::Error>>;
372 }
373
374 #[async_trait::async_trait]
375 impl FromOperationState for BackendWrapper {
376 type Error = ();
377
378 async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<()>> {
379 Ok(state.giterated_backend.clone())
380 }
381 }
382
383 #[async_trait::async_trait]
384 impl FromOperationState for StackOperationState {
385 type Error = ();
386
387 async fn from_state(
388 state: &StackOperationState,
389 ) -> Result<StackOperationState, OperationError<()>> {
390 Ok(state.clone())
391 }
392 }
393
394 #[derive(Clone)]
395 pub struct StackOperationState {
396 pub giterated_backend: BackendWrapper,
397 }
398
399 #[derive(Clone)]
400 pub struct BackendWrapper {
401 sender: tokio::sync::mpsc::Sender<(
402 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
403 WrappedOperation,
404 )>,
405 task: Arc<JoinHandle<()>>,
406 }
407
408 pub struct WrappedOperation {
409 object: AnyObject,
410 operation_payload: AnyOperation,
411 operation_name: String,
412 state: StackOperationState,
413 }
414
415 impl BackendWrapper {
416 pub fn new<S: HandlerState>(backend: GiteratedBackend<S>) -> Self {
417 // Spawn listener task
418
419 let (send, mut recv) = channel::<(
420 tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>,
421 WrappedOperation,
422 )>(1024);
423
424 let task = tokio::spawn(async move {
425 while let Some((responder, message)) = recv.recv().await {
426 let raw_result = backend
427 .object_operation(
428 message.object,
429 &message.operation_name,
430 message.operation_payload,
431 &message.state,
432 )
433 .await;
434
435 responder.send(raw_result).unwrap();
436 }
437 error!("Error, thing's dead");
438 });
439
440 Self {
441 sender: send,
442 task: Arc::new(task),
443 }
444 }
445
446 pub async fn call(&self, operation: WrappedOperation) -> Result<Value, OperationError<Value>> {
447 let (sender, response) = tokio::sync::oneshot::channel();
448
449 self.sender
450 .send((sender, operation))
451 .await
452 .map_err(|e| OperationError::Internal(e.to_string()))?;
453
454 match response.await {
455 Ok(result) => Ok(result?),
456 Err(err) => Err(OperationError::Internal(err.to_string())),
457 }
458 }
459 }
460
461 use std::fmt::Debug;
462
463 #[async_trait::async_trait]
464 impl ObjectBackend<StackOperationState> for BackendWrapper {
465 async fn object_operation<O, D>(
466 &self,
467 object: O,
468 operation: &str,
469 payload: D,
470 operation_state: &StackOperationState,
471 ) -> Result<D::Success, OperationError<D::Failure>>
472 where
473 O: GiteratedObject + Debug,
474 D: GiteratedOperation<O> + Debug,
475 {
476 let operation = WrappedOperation {
477 object: AnyObject(object.to_string()),
478 operation_name: operation.to_string(),
479 operation_payload: AnyOperation(serde_json::to_value(payload).unwrap()),
480 state: operation_state.clone(),
481 };
482
483 let raw_result = self.call(operation).await;
484
485 match raw_result {
486 Ok(result) => Ok(serde_json::from_value(result)
487 .map_err(|e| OperationError::Internal(e.to_string()))?),
488 Err(err) => match err {
489 OperationError::Internal(internal) => {
490 warn!(
491 "Internal Error: {:?}",
492 OperationError::<()>::Internal(internal.clone())
493 );
494
495 Err(OperationError::Internal(internal))
496 }
497 OperationError::Unhandled => Err(OperationError::Unhandled),
498 OperationError::Operation(err) => Err(OperationError::Operation(
499 serde_json::from_value(err)
500 .map_err(|e| OperationError::Internal(e.to_string()))?,
501 )),
502 },
503 }
504 }
505
506 async fn get_object<O: GiteratedObject + Debug>(
507 &self,
508 object_str: &str,
509 operation_state: &StackOperationState,
510 ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> {
511 let operation = WrappedOperation {
512 object: AnyObject(object_str.to_string()),
513 operation_name: ObjectRequest::operation_name().to_string(),
514 operation_payload: AnyOperation(
515 serde_json::to_value(ObjectRequest(object_str.to_string())).unwrap(),
516 ),
517 state: operation_state.clone(),
518 };
519
520 let raw_result = self.call(operation).await;
521
522 let object: ObjectResponse = match raw_result {
523 Ok(result) => Ok(serde_json::from_value(result)
524 .map_err(|e| OperationError::Internal(e.to_string()))?),
525 Err(err) => match err {
526 OperationError::Internal(internal) => {
527 warn!(
528 "Internal Error: {:?}",
529 OperationError::<()>::Internal(internal.clone())
530 );
531
532 Err(OperationError::Internal(internal))
533 }
534 OperationError::Unhandled => Err(OperationError::Unhandled),
535 OperationError::Operation(err) => Err(OperationError::Operation(
536 serde_json::from_value(err)
537 .map_err(|e| OperationError::Internal(e.to_string()))?,
538 )),
539 },
540 }?;
541
542 unsafe {
543 Ok(Object::new_unchecked(
544 O::from_str(&object.0)
545 .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?,
546 self.clone(),
547 ))
548 }
549 }
550 }
551