JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Base protocol refactor complete

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨079d544

⁨giterated-daemon/src/database_backend/handler.rs⁩ - ⁨8199⁩ bytes
Raw
1 use std::{collections::HashMap, pin::Pin, sync::Arc};
2
3 use futures_util::{future::BoxFuture, Future, FutureExt};
4 use giterated_models::{
5 error::{GetValueError, OperationError},
6 model::{repository::Repository, settings::AnySetting, user::User},
7 operation::{AnyObject, AnyOperation, GetValue, GiteratedObject, GiteratedOperation},
8 values::{AnyValue, GetSetting, GetSettingError, SetSetting, SetSettingError},
9 };
10
11 use super::DatabaseBackend;
12
13 #[async_trait::async_trait]
14 pub trait GiteratedOperationHandler<
15 O: GiteratedObject,
16 D: GiteratedOperation<O>,
17 S: Send + Sync + Clone,
18 >
19 {
20 fn operation_name(&self) -> &str;
21 fn object_name(&self) -> &str;
22
23 async fn handle(
24 &self,
25 object: &O,
26 operation: D,
27 state: S,
28 ) -> Result<D::Success, OperationError<D::Failure>>;
29 }
30
31 #[async_trait::async_trait]
32 impl<O, D, F, S> GiteratedOperationHandler<O, D, S> for F
33 where
34 F: FnMut(
35 &O,
36 D,
37 S,
38 ) -> Pin<
39 Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>,
40 > + Send
41 + Sync
42 + Clone,
43 O: GiteratedObject + Send + Sync,
44 D: GiteratedOperation<O> + 'static,
45 <D as GiteratedOperation<O>>::Failure: Send,
46 S: Send + Sync + Clone + 'static,
47 {
48 fn operation_name(&self) -> &str {
49 D::operation_name()
50 }
51
52 fn object_name(&self) -> &str {
53 O::object_name()
54 }
55
56 async fn handle(
57 &self,
58 object: &O,
59 operation: D,
60 state: S,
61 ) -> Result<D::Success, OperationError<D::Failure>> {
62 self.clone()(object, operation, state).await
63 }
64 }
65
66 pub struct OperationWrapper<S: Send + Sync + Clone>(
67 Box<
68 dyn Fn(
69 AnyObject,
70 AnyOperation,
71 S,
72 )
73 -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>>
74 + Send
75 + Sync,
76 >,
77 );
78
79 impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> {
80 pub fn new<
81 O: GiteratedObject + Send + Sync,
82 D: GiteratedOperation<O> + 'static,
83 F: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone,
84 >(
85 handler: F,
86 ) -> Self {
87 let handler = Arc::new(Box::pin(handler));
88 Self(Box::new(move |any_object, any_operation, state| {
89 let handler = handler.clone();
90 async move {
91 let handler = handler.clone();
92 let object: O = O::from_object_str(&any_object.0).unwrap();
93 let operation: D = serde_json::from_value(any_operation.0.clone()).unwrap();
94
95 let result = handler.handle(&object, operation, state).await;
96 result
97 .map(|success| serde_json::to_vec(&success).unwrap())
98 .map_err(|err| match err {
99 OperationError::Operation(err) => {
100 OperationError::Operation(serde_json::to_vec(&err).unwrap())
101 }
102 OperationError::Internal(internal) => OperationError::Internal(internal),
103 OperationError::Unhandled => OperationError::Unhandled,
104 })
105 }
106 .boxed()
107 }))
108 }
109
110 async fn handle(
111 &mut self,
112 object: AnyObject,
113 operation: AnyOperation,
114 state: S,
115 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
116 self.0(object, operation, state).await
117 }
118 }
119
120 pub fn user_get_value(
121 object: &User,
122 operation: GetValue<AnyValue<User>>,
123 state: DatabaseBackend,
124 ) -> BoxFuture<'static, Result<AnyValue<User>, OperationError<GetValueError>>> {
125 let object = object.clone();
126
127 async move {
128 let mut user_backend = state.user_backend.lock().await;
129 let value = user_backend
130 .get_value(&object, &operation.value_name)
131 .await
132 .map_err(|e| OperationError::Internal(e.to_string()))?;
133
134 Ok(value)
135 }
136 .boxed()
137 }
138
139 pub fn user_get_setting(
140 object: &User,
141 operation: GetSetting<AnySetting>,
142 state: DatabaseBackend,
143 ) -> BoxFuture<'static, Result<AnySetting, OperationError<GetSettingError>>> {
144 let object = object.clone();
145
146 async move {
147 let mut user_backend = state.user_backend.lock().await;
148 let value = user_backend
149 .get_setting(&object, &operation.setting_name)
150 .await
151 .map_err(|e| OperationError::Internal(e.to_string()))?;
152
153 Ok(value)
154 }
155 .boxed()
156 }
157
158 pub fn user_set_setting(
159 object: &User,
160 operation: SetSetting<AnySetting>,
161 state: DatabaseBackend,
162 ) -> BoxFuture<'static, Result<(), OperationError<SetSettingError>>> {
163 let object = object.clone();
164
165 async move {
166 let mut user_backend = state.user_backend.lock().await;
167 let value = user_backend
168 .write_setting(&object, &operation.setting_name, &operation.value.0)
169 .await
170 .map_err(|e| OperationError::Internal(e.to_string()))?;
171
172 Ok(value)
173 }
174 .boxed()
175 }
176
177 pub fn repository_get_value(
178 object: &Repository,
179 operation: GetValue<AnyValue<Repository>>,
180 state: DatabaseBackend,
181 ) -> BoxFuture<'static, Result<AnyValue<Repository>, OperationError<GetValueError>>> {
182 let object = object.clone();
183
184 async move {
185 let mut repository_backend = state.repository_backend.lock().await;
186 let value = repository_backend
187 .get_value(&object, &operation.value_name)
188 .await
189 .map_err(|e| OperationError::Internal(e.to_string()))?;
190
191 Ok(value)
192 }
193 .boxed()
194 }
195
196 pub fn repository_get_setting(
197 object: &Repository,
198 operation: GetSetting<AnySetting>,
199 state: DatabaseBackend,
200 ) -> BoxFuture<'static, Result<AnySetting, OperationError<GetSettingError>>> {
201 let object = object.clone();
202
203 async move {
204 let mut repository_backend = state.repository_backend.lock().await;
205 let value = repository_backend
206 .get_setting(&object, &operation.setting_name)
207 .await
208 .map_err(|e| OperationError::Internal(e.to_string()))?;
209
210 Ok(value)
211 }
212 .boxed()
213 }
214
215 pub fn repository_set_setting(
216 object: &Repository,
217 operation: SetSetting<AnySetting>,
218 state: DatabaseBackend,
219 ) -> BoxFuture<'static, Result<(), OperationError<SetSettingError>>> {
220 let object = object.clone();
221
222 async move {
223 let mut repository_backend = state.repository_backend.lock().await;
224 let value = repository_backend
225 .write_setting(&object, &operation.setting_name, &operation.value.0)
226 .await
227 .map_err(|e| OperationError::Internal(e.to_string()))?;
228
229 Ok(value)
230 }
231 .boxed()
232 }
233
234 pub struct OperationHandlers<S: Send + Sync + Clone> {
235 operations: HashMap<String, OperationWrapper<S>>,
236 }
237
238 impl<S: Send + Sync + Clone> Default for OperationHandlers<S> {
239 fn default() -> Self {
240 Self {
241 operations: HashMap::new(),
242 }
243 }
244 }
245
246 impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> {
247 pub fn insert<
248 O: GiteratedObject + Send + Sync,
249 D: GiteratedOperation<O> + 'static,
250 H: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone,
251 >(
252 &mut self,
253 handler: H,
254 ) -> &mut Self {
255 let operation_name = handler.operation_name().to_string();
256
257 let wrapped = OperationWrapper::new(handler);
258
259 self.operations.insert(operation_name, wrapped);
260
261 self
262 }
263
264 pub async fn handle<O: GiteratedObject>(
265 &mut self,
266 object: &O,
267 operation_name: &str,
268 operation: AnyOperation,
269 state: S,
270 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
271 if let Some(handler) = self.operations.get_mut(operation_name) {
272 handler
273 .handle(AnyObject(object.to_string()), operation, state)
274 .await
275 } else {
276 Err(OperationError::Internal(format!(
277 "unknown operation: {}",
278 operation_name
279 )))
280 }
281 }
282 }
283