JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

begin new protocol refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨2091700

⁨src/daemon_backend.rs⁩ - ⁨3119⁩ bytes
Raw
1 use std::fmt::Debug;
2
3 use deadpool::managed::Pool;
4 use futures_util::{SinkExt, StreamExt};
5 use giterated_models::{
6 error::OperationError,
7 model::{authenticated::Authenticated, MessageTarget},
8 operation::{
9 GiteratedMessage, GiteratedObject, GiteratedOperation, Object, ObjectBackend,
10 ObjectRequest, ObjectRequestError, ObjectResponse,
11 },
12 };
13 use serde::{de::DeserializeOwned, Deserialize, Serialize};
14 use tokio_tungstenite::tungstenite::Message;
15
16 use crate::{pool::GiteratedConnectionPool, DaemonConnectionPool, Socket};
17
18 #[async_trait::async_trait]
19 impl ObjectBackend for DaemonConnectionPool {
20 async fn object_operation<O: GiteratedObject + Debug, D: GiteratedOperation<O> + Debug>(
21 &self,
22 object: O,
23 operation: D,
24 ) -> Result<D::Success, OperationError<D::Failure>> {
25 let message = GiteratedMessage {
26 object,
27 operation: operation.operation_name().to_string(),
28 payload: operation,
29 };
30
31 let mut connection = self
32 .0
33 .get()
34 .await
35 .map_err(|e| OperationError::Internal(e.to_string()))?;
36
37 let authenticated = Authenticated::new(message);
38
39 send_expect(&mut connection, authenticated).await
40 }
41
42 async fn get_object<O: GiteratedObject + Debug>(
43 &self,
44 object_str: &str,
45 ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> {
46 let operation = ObjectRequest(object_str.to_string());
47 let message = GiteratedMessage {
48 object: self.0.manager().target_instance.clone(),
49 operation: operation.operation_name().to_string(),
50 payload: operation,
51 };
52
53 let mut connection = self
54 .0
55 .get()
56 .await
57 .map_err(|e| OperationError::Internal(e.to_string()))?;
58
59 let authenticated = Authenticated::new(message);
60
61 let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?;
62
63 Ok(unsafe {
64 Object::new_unchecked(
65 serde_json::from_slice(&object_raw.0)
66 .map_err(|e| OperationError::Internal(e.to_string()))?,
67 self.clone(),
68 )
69 })
70 }
71 }
72
73 async fn send_expect<
74 O: GiteratedObject,
75 D: GiteratedOperation<O>,
76 B: DeserializeOwned,
77 R: DeserializeOwned,
78 >(
79 socket: &mut Socket,
80 message: Authenticated<O, D>,
81 ) -> Result<R, OperationError<B>> {
82 let payload = serde_json::to_vec(&message.into_payload()).unwrap();
83
84 socket
85 .send(Message::Binary(payload))
86 .await
87 .map_err(|e| OperationError::Internal(e.to_string()))?;
88
89 while let Some(message) = socket.next().await {
90 let payload = match message.map_err(|e| OperationError::Internal(e.to_string()))? {
91 Message::Binary(payload) => payload,
92 _ => {
93 continue;
94 }
95 };
96
97 let as_target = serde_json::from_slice::<R>(&payload)
98 .map_err(|e| OperationError::Internal(e.to_string()))?;
99 }
100
101 panic!()
102 }
103