Giterated stack changeover, refactor still incomplete
parent: tbd commit: 8d40dfe
Showing 14 changed files with 477 insertions and 374 deletions
Cargo.lock
@@ -771,6 +771,7 @@ dependencies = [ | ||
771 | 771 | "git2", |
772 | 772 | "giterated-api", |
773 | 773 | "giterated-models", |
774 | "giterated-stack", | |
774 | 775 | "jsonwebtoken", |
775 | 776 | "log", |
776 | 777 | "rand", |
@@ -816,6 +817,15 @@ dependencies = [ | ||
816 | 817 | [[package]] |
817 | 818 | name = "giterated-stack" |
818 | 819 | version = "0.1.0" |
820 | dependencies = [ | |
821 | "async-trait", | |
822 | "bincode", | |
823 | "futures-util", | |
824 | "giterated-models", | |
825 | "serde", | |
826 | "serde_json", | |
827 | "tracing", | |
828 | ] | |
819 | 829 | |
820 | 830 | [[package]] |
821 | 831 | name = "h2" |
giterated-daemon/Cargo.toml
@@ -24,6 +24,7 @@ aes-gcm = "0.10.2" | ||
24 | 24 | semver = {version = "*", features = ["serde"]} |
25 | 25 | giterated-models = { path = "../giterated-models" } |
26 | 26 | giterated-api = { path = "../../giterated-api" } |
27 | giterated-stack = { path = "../giterated-stack" } | |
27 | 28 | deadpool = "*" |
28 | 29 | bincode = "*" |
29 | 30 |
giterated-daemon/src/backend/git.rs
@@ -710,24 +710,33 @@ impl RepositoryBackend for GitBackend { | ||
710 | 710 | .await?; |
711 | 711 | |
712 | 712 | // Parse the passed object ids |
713 | let oid_old = git2::Oid::from_str(request.old_id.as_str()).map_err(|_| GitBackendError::InvalidObjectId(request.old_id.clone()))?; | |
714 | let oid_new = git2::Oid::from_str(request.new_id.as_str()).map_err(|_| GitBackendError::InvalidObjectId(request.new_id.clone()))?; | |
713 | let oid_old = git2::Oid::from_str(request.old_id.as_str()) | |
714 | .map_err(|_| GitBackendError::InvalidObjectId(request.old_id.clone()))?; | |
715 | let oid_new = git2::Oid::from_str(request.new_id.as_str()) | |
716 | .map_err(|_| GitBackendError::InvalidObjectId(request.new_id.clone()))?; | |
715 | 717 | |
716 | 718 | // Get the ids associates commits |
717 | let commit_old = git.find_commit(oid_old).map_err(|_| GitBackendError::CommitNotFound(oid_old.to_string()))?; | |
718 | let commit_new = git.find_commit(oid_new).map_err(|_| GitBackendError::CommitNotFound(oid_new.to_string()))?; | |
719 | let commit_old = git | |
720 | .find_commit(oid_old) | |
721 | .map_err(|_| GitBackendError::CommitNotFound(oid_old.to_string()))?; | |
722 | let commit_new = git | |
723 | .find_commit(oid_new) | |
724 | .map_err(|_| GitBackendError::CommitNotFound(oid_new.to_string()))?; | |
719 | 725 | |
720 | 726 | // Get the commit trees |
721 | let tree_old = commit_old.tree().map_err(|_| GitBackendError::TreeNotFound(oid_old.to_string()))?; | |
722 | let tree_new = commit_new.tree().map_err(|_| GitBackendError::TreeNotFound(oid_new.to_string()))?; | |
727 | let tree_old = commit_old | |
728 | .tree() | |
729 | .map_err(|_| GitBackendError::TreeNotFound(oid_old.to_string()))?; | |
730 | let tree_new = commit_new | |
731 | .tree() | |
732 | .map_err(|_| GitBackendError::TreeNotFound(oid_new.to_string()))?; | |
723 | 733 | |
724 | 734 | // Diff the two trees against each other |
725 | 735 | let diff = git |
726 | 736 | .diff_tree_to_tree(Some(&tree_old), Some(&tree_new), None) |
727 | .map_err(|_| GitBackendError::FailedDiffing( | |
728 | oid_old.to_string(), | |
729 | oid_new.to_string(), | |
730 | ))?; | |
737 | .map_err(|_| { | |
738 | GitBackendError::FailedDiffing(oid_old.to_string(), oid_new.to_string()) | |
739 | })?; | |
731 | 740 | |
732 | 741 | // Should be safe to unwrap? |
733 | 742 | let stats = diff.stats().unwrap(); |
giterated-daemon/src/connection/wrapper.rs
@@ -60,6 +60,8 @@ pub async fn connection_wrapper( | ||
60 | 60 | |
61 | 61 | let _handshaked = false; |
62 | 62 | |
63 | let backend = backend.into_backend(); | |
64 | ||
63 | 65 | loop { |
64 | 66 | let mut socket = connection_state.socket.lock().await; |
65 | 67 | let message = socket.next().await; |
giterated-daemon/src/database_backend/handler.rs
@@ -1,11 +1,9 @@ | ||
1 | use std::{collections::HashMap, error::Error, pin::Pin, sync::Arc}; | |
1 | use std::error::Error; | |
2 | 2 | |
3 | use futures_util::{future::BoxFuture, Future, FutureExt}; | |
3 | use futures_util::{future::BoxFuture, FutureExt}; | |
4 | 4 | use giterated_models::{ |
5 | 5 | error::{GetValueError, OperationError, RepositoryError, UserError}, |
6 | object::{AnyObject, GiteratedObject}, | |
7 | 6 | object_backend::ObjectBackend, |
8 | operation::{AnyOperation, GiteratedOperation}, | |
9 | 7 | repository::{ |
10 | 8 | Commit, DefaultBranch, Description, LatestCommit, Repository, |
11 | 9 | RepositoryCommitBeforeRequest, RepositoryDiff, RepositoryDiffRequest, RepositoryFile, |
@@ -19,113 +17,6 @@ use giterated_models::{ | ||
19 | 17 | |
20 | 18 | use super::DatabaseBackend; |
21 | 19 | |
22 | #[async_trait::async_trait] | |
23 | pub trait GiteratedOperationHandler< | |
24 | O: GiteratedObject, | |
25 | D: GiteratedOperation<O>, | |
26 | S: Send + Sync + Clone, | |
27 | > | |
28 | { | |
29 | fn operation_name(&self) -> &str; | |
30 | fn object_name(&self) -> &str; | |
31 | ||
32 | async fn handle( | |
33 | &self, | |
34 | object: &O, | |
35 | operation: D, | |
36 | state: S, | |
37 | ) -> Result<D::Success, OperationError<D::Failure>>; | |
38 | } | |
39 | ||
40 | #[async_trait::async_trait] | |
41 | impl<O, D, F, S> GiteratedOperationHandler<O, D, S> for F | |
42 | where | |
43 | F: FnMut( | |
44 | &O, | |
45 | D, | |
46 | S, | |
47 | ) -> Pin< | |
48 | Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>, | |
49 | > + Send | |
50 | + Sync | |
51 | + Clone, | |
52 | O: GiteratedObject + Send + Sync, | |
53 | D: GiteratedOperation<O> + 'static, | |
54 | <D as GiteratedOperation<O>>::Failure: Send, | |
55 | S: Send + Sync + Clone + 'static, | |
56 | { | |
57 | fn operation_name(&self) -> &str { | |
58 | D::operation_name() | |
59 | } | |
60 | ||
61 | fn object_name(&self) -> &str { | |
62 | O::object_name() | |
63 | } | |
64 | ||
65 | async fn handle( | |
66 | &self, | |
67 | object: &O, | |
68 | operation: D, | |
69 | state: S, | |
70 | ) -> Result<D::Success, OperationError<D::Failure>> { | |
71 | self.clone()(object, operation, state).await | |
72 | } | |
73 | } | |
74 | ||
75 | pub struct OperationWrapper<S: Send + Sync + Clone>( | |
76 | Box< | |
77 | dyn Fn( | |
78 | AnyObject, | |
79 | AnyOperation, | |
80 | S, | |
81 | ) | |
82 | -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>> | |
83 | + Send | |
84 | + Sync, | |
85 | >, | |
86 | ); | |
87 | ||
88 | impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> { | |
89 | pub fn new< | |
90 | O: GiteratedObject + Send + Sync, | |
91 | D: GiteratedOperation<O> + 'static, | |
92 | F: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone, | |
93 | >( | |
94 | handler: F, | |
95 | ) -> Self { | |
96 | let handler = Arc::new(Box::pin(handler)); | |
97 | Self(Box::new(move |any_object, any_operation, state| { | |
98 | let handler = handler.clone(); | |
99 | async move { | |
100 | let handler = handler.clone(); | |
101 | let object: O = O::from_object_str(&any_object.0).unwrap(); | |
102 | let operation: D = serde_json::from_value(any_operation.0.clone()).unwrap(); | |
103 | ||
104 | let result = handler.handle(&object, operation, state).await; | |
105 | result | |
106 | .map(|success| serde_json::to_vec(&success).unwrap()) | |
107 | .map_err(|err| match err { | |
108 | OperationError::Operation(err) => { | |
109 | OperationError::Operation(serde_json::to_vec(&err).unwrap()) | |
110 | } | |
111 | OperationError::Internal(internal) => OperationError::Internal(internal), | |
112 | OperationError::Unhandled => OperationError::Unhandled, | |
113 | }) | |
114 | } | |
115 | .boxed() | |
116 | })) | |
117 | } | |
118 | ||
119 | async fn handle( | |
120 | &mut self, | |
121 | object: AnyObject, | |
122 | operation: AnyOperation, | |
123 | state: S, | |
124 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { | |
125 | self.0(object, operation, state).await | |
126 | } | |
127 | } | |
128 | ||
129 | 20 | pub fn user_get_repositories( |
130 | 21 | object: &User, |
131 | 22 | _operation: UserRepositoriesRequest, |
@@ -208,9 +99,10 @@ pub fn repository_info( | ||
208 | 99 | state: DatabaseBackend, |
209 | 100 | ) -> BoxFuture<'static, Result<RepositoryView, OperationError<RepositoryError>>> { |
210 | 101 | let object = object.clone(); |
102 | let backend = state.into_backend(); | |
211 | 103 | |
212 | 104 | async move { |
213 | let mut object = state | |
105 | let mut object = backend | |
214 | 106 | .get_object::<Repository>(&object.to_string()) |
215 | 107 | .await |
216 | 108 | .unwrap(); |
@@ -259,9 +151,10 @@ pub fn repository_file_from_id( | ||
259 | 151 | state: DatabaseBackend, |
260 | 152 | ) -> BoxFuture<'static, Result<RepositoryFile, OperationError<RepositoryError>>> { |
261 | 153 | let object = object.clone(); |
154 | let backend = state.into_backend(); | |
262 | 155 | |
263 | 156 | async move { |
264 | let object = state | |
157 | let object = backend | |
265 | 158 | .get_object::<Repository>(&object.to_string()) |
266 | 159 | .await |
267 | 160 | .unwrap(); |
@@ -288,9 +181,10 @@ pub fn repository_diff( | ||
288 | 181 | state: DatabaseBackend, |
289 | 182 | ) -> BoxFuture<'static, Result<RepositoryDiff, OperationError<RepositoryError>>> { |
290 | 183 | let object = object.clone(); |
184 | let backend = state.into_backend(); | |
291 | 185 | |
292 | 186 | async move { |
293 | let object = state | |
187 | let object = backend | |
294 | 188 | .get_object::<Repository>(&object.to_string()) |
295 | 189 | .await |
296 | 190 | .unwrap(); |
@@ -313,9 +207,10 @@ pub fn repository_commit_before( | ||
313 | 207 | state: DatabaseBackend, |
314 | 208 | ) -> BoxFuture<'static, Result<Commit, OperationError<RepositoryError>>> { |
315 | 209 | let object = object.clone(); |
210 | let backend = state.into_backend(); | |
316 | 211 | |
317 | 212 | async move { |
318 | let object = state | |
213 | let object = backend | |
319 | 214 | .get_object::<Repository>(&object.to_string()) |
320 | 215 | .await |
321 | 216 | .unwrap(); |
@@ -390,53 +285,3 @@ pub fn repository_set_setting( | ||
390 | 285 | } |
391 | 286 | .boxed() |
392 | 287 | } |
393 | ||
394 | pub struct OperationHandlers<S: Send + Sync + Clone> { | |
395 | operations: HashMap<String, OperationWrapper<S>>, | |
396 | } | |
397 | ||
398 | impl<S: Send + Sync + Clone> Default for OperationHandlers<S> { | |
399 | fn default() -> Self { | |
400 | Self { | |
401 | operations: HashMap::new(), | |
402 | } | |
403 | } | |
404 | } | |
405 | ||
406 | impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | |
407 | pub fn insert< | |
408 | O: GiteratedObject + Send + Sync, | |
409 | D: GiteratedOperation<O> + 'static, | |
410 | H: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone, | |
411 | >( | |
412 | &mut self, | |
413 | handler: H, | |
414 | ) -> &mut Self { | |
415 | let operation_name = handler.operation_name().to_string(); | |
416 | ||
417 | let wrapped = OperationWrapper::new(handler); | |
418 | ||
419 | self.operations.insert(operation_name, wrapped); | |
420 | ||
421 | self | |
422 | } | |
423 | ||
424 | pub async fn handle<O: GiteratedObject>( | |
425 | &mut self, | |
426 | object: &O, | |
427 | operation_name: &str, | |
428 | operation: AnyOperation, | |
429 | state: S, | |
430 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { | |
431 | if let Some(handler) = self.operations.get_mut(operation_name) { | |
432 | handler | |
433 | .handle(AnyObject(object.to_string()), operation, state) | |
434 | .await | |
435 | } else { | |
436 | Err(OperationError::Internal(format!( | |
437 | "unknown operation: {}", | |
438 | operation_name | |
439 | ))) | |
440 | } | |
441 | } | |
442 | } |
giterated-daemon/src/database_backend/mod.rs
@@ -1,16 +1,16 @@ | ||
1 | 1 | pub mod handler; |
2 | 2 | |
3 | use std::{str::FromStr, sync::Arc}; | |
3 | use std::sync::Arc; | |
4 | 4 | |
5 | 5 | use giterated_models::error::OperationError; |
6 | 6 | use giterated_models::instance::Instance; |
7 | use giterated_models::object::{ | |
8 | AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, | |
9 | }; | |
7 | use giterated_models::object::{GiteratedObject, Object, ObjectRequestError}; | |
10 | 8 | use giterated_models::object_backend::ObjectBackend; |
11 | 9 | use giterated_models::operation::GiteratedOperation; |
12 | 10 | use giterated_models::repository::Repository; |
13 | 11 | use giterated_models::user::User; |
12 | use giterated_stack::handler::GiteratedBackend; | |
13 | use giterated_stack::OperationHandlers; | |
14 | 14 | use std::fmt::Debug; |
15 | 15 | use tokio::sync::Mutex; |
16 | 16 | |
@@ -19,7 +19,7 @@ use crate::backend::{RepositoryBackend, UserBackend}; | ||
19 | 19 | use self::handler::{ |
20 | 20 | repository_commit_before, repository_diff, repository_file_from_id, repository_get_setting, |
21 | 21 | repository_get_value, repository_info, repository_set_setting, user_get_repositories, |
22 | user_get_setting, user_get_value, user_set_setting, OperationHandlers, | |
22 | user_get_setting, user_get_value, user_set_setting, | |
23 | 23 | }; |
24 | 24 | |
25 | 25 | #[derive(Clone, Debug)] |
@@ -65,6 +65,28 @@ impl DatabaseBackend { | ||
65 | 65 | repository_backend, |
66 | 66 | } |
67 | 67 | } |
68 | ||
69 | pub fn into_backend(&self) -> GiteratedBackend<Self> { | |
70 | let mut handlers = OperationHandlers::default(); | |
71 | ||
72 | handlers | |
73 | .insert(user_get_repositories) | |
74 | .insert(user_get_value) | |
75 | .insert(user_get_setting) | |
76 | .insert(user_set_setting) | |
77 | .insert(repository_info) | |
78 | .insert(repository_file_from_id) | |
79 | .insert(repository_diff) | |
80 | .insert(repository_commit_before) | |
81 | .insert(repository_get_value) | |
82 | .insert(repository_get_setting) | |
83 | .insert(repository_set_setting) | |
84 | .register_object::<Instance>() | |
85 | .register_object::<Repository>() | |
86 | .register_object::<User>(); | |
87 | ||
88 | GiteratedBackend::new(self.clone(), handlers) | |
89 | } | |
68 | 90 | } |
69 | 91 | |
70 | 92 | impl Debug for DatabaseBackend { |
@@ -73,193 +95,7 @@ impl Debug for DatabaseBackend { | ||
73 | 95 | } |
74 | 96 | } |
75 | 97 | |
76 | #[async_trait::async_trait] | |
77 | impl ObjectBackend for DatabaseBackend { | |
78 | async fn object_operation<O: GiteratedObject + Debug, D: GiteratedOperation<O> + Debug>( | |
79 | &self, | |
80 | object: O, | |
81 | operation: &str, | |
82 | payload: D, | |
83 | ) -> Result<D::Success, OperationError<D::Failure>> { | |
84 | let serialized = | |
85 | serde_json::to_value(payload).map_err(|e| OperationError::Internal(e.to_string()))?; | |
86 | let object = object.to_string(); | |
87 | if let Ok(user) = User::from_str(&object) { | |
88 | let mut handler = OperationHandlers::default(); | |
89 | ||
90 | handler | |
91 | .insert(user_get_repositories) | |
92 | .insert(user_get_value) | |
93 | .insert(user_get_setting) | |
94 | .insert(user_set_setting); | |
95 | ||
96 | match handler | |
97 | .handle( | |
98 | &user, | |
99 | operation, | |
100 | serde_json::from_value(serialized.clone()).unwrap(), | |
101 | self.clone(), | |
102 | ) | |
103 | .await | |
104 | { | |
105 | Ok(result) => Ok(serde_json::from_slice(&result) | |
106 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
107 | Err(err) => match err { | |
108 | OperationError::Internal(internal) => { | |
109 | warn!( | |
110 | "Internal Error: {:?}", | |
111 | OperationError::<()>::Internal(internal.clone()) | |
112 | ); | |
113 | ||
114 | Err(OperationError::Internal(internal)) | |
115 | } | |
116 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
117 | OperationError::Operation(err) => Err(OperationError::Operation( | |
118 | serde_json::from_slice(&err) | |
119 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
120 | )), | |
121 | }, | |
122 | } | |
123 | } else if let Ok(repository) = Repository::from_str(&object) { | |
124 | let mut handler = OperationHandlers::default(); | |
125 | ||
126 | handler | |
127 | .insert(repository_info) | |
128 | .insert(repository_file_from_id) | |
129 | .insert(repository_diff) | |
130 | .insert(repository_commit_before) | |
131 | .insert(repository_get_value) | |
132 | .insert(repository_get_setting) | |
133 | .insert(repository_set_setting); | |
134 | ||
135 | match handler | |
136 | .handle( | |
137 | &repository, | |
138 | operation, | |
139 | serde_json::from_value(serialized.clone()).unwrap(), | |
140 | self.clone(), | |
141 | ) | |
142 | .await | |
143 | { | |
144 | Ok(result) => Ok(serde_json::from_slice(&result) | |
145 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
146 | Err(err) => match err { | |
147 | OperationError::Internal(internal) => { | |
148 | warn!( | |
149 | "Internal Error: {:?}", | |
150 | OperationError::<()>::Internal(internal.clone()) | |
151 | ); | |
152 | ||
153 | Err(OperationError::Internal(internal)) | |
154 | } | |
155 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
156 | OperationError::Operation(err) => Err(OperationError::Operation( | |
157 | serde_json::from_slice(&err) | |
158 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
159 | )), | |
160 | }, | |
161 | } | |
162 | } else if let Ok(instance) = Instance::from_str(&object) { | |
163 | if instance != self.our_instance { | |
164 | // We don't handle other objects currently | |
165 | return Err(OperationError::Unhandled); | |
166 | } | |
167 | ||
168 | if let Ok(object_request) = serde_json::from_value::<ObjectRequest>(serialized.clone()) | |
169 | { | |
170 | let result = self.get_object::<AnyObject>(&object_request.0).await; | |
171 | ||
172 | let result = result | |
173 | .map(|success| serde_json::to_vec(&success.object()).unwrap()) | |
174 | .map_err(|err| match err { | |
175 | OperationError::Operation(err) => { | |
176 | OperationError::Operation(serde_json::to_vec(&err).unwrap()) | |
177 | } | |
178 | OperationError::Internal(internal) => OperationError::Internal(internal), | |
179 | OperationError::Unhandled => OperationError::Unhandled, | |
180 | }); | |
181 | ||
182 | match result { | |
183 | Ok(result) => Ok(serde_json::from_slice(&result) | |
184 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
185 | Err(err) => match err { | |
186 | OperationError::Internal(internal) => { | |
187 | warn!( | |
188 | "Internal Error: {:?}", | |
189 | OperationError::<()>::Internal(internal.clone()) | |
190 | ); | |
191 | ||
192 | Err(OperationError::Internal(internal)) | |
193 | } | |
194 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
195 | OperationError::Operation(err) => Err(OperationError::Operation( | |
196 | serde_json::from_slice(&err) | |
197 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
198 | )), | |
199 | }, | |
200 | } | |
201 | } else { | |
202 | Err(OperationError::Unhandled) | |
203 | } | |
204 | } else { | |
205 | Err(OperationError::Unhandled) | |
206 | } | |
207 | } | |
208 | ||
209 | async fn get_object<O: GiteratedObject + Debug>( | |
210 | &self, | |
211 | object_str: &str, | |
212 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> { | |
213 | if let Ok(user) = User::from_str(object_str) { | |
214 | let mut user_backend = self.user_backend.lock().await; | |
215 | ||
216 | if user_backend | |
217 | .exists(&user) | |
218 | .await | |
219 | .map_err(|e| OperationError::Internal(e.to_string()))? | |
220 | { | |
221 | Ok(unsafe { | |
222 | Object::new_unchecked( | |
223 | O::from_object_str(object_str) | |
224 | .map_err(|e| ObjectRequestError::Deserialization(e.to_string()))?, | |
225 | self.clone(), | |
226 | ) | |
227 | }) | |
228 | } else { | |
229 | return Err(OperationError::Unhandled); | |
230 | } | |
231 | } else if let Ok(repository) = Repository::from_str(object_str) { | |
232 | let mut repository_backend = self.repository_backend.lock().await; | |
233 | ||
234 | if repository_backend | |
235 | .exists(&repository) | |
236 | .await | |
237 | .map_err(|e| OperationError::Internal(e.to_string()))? | |
238 | { | |
239 | Ok(unsafe { | |
240 | Object::new_unchecked( | |
241 | O::from_object_str(object_str) | |
242 | .map_err(|e| ObjectRequestError::Deserialization(e.to_string()))?, | |
243 | self.clone(), | |
244 | ) | |
245 | }) | |
246 | } else { | |
247 | return Err(OperationError::Unhandled); | |
248 | } | |
249 | } else if let Ok(instance) = Instance::from_str(object_str) { | |
250 | if instance != self.our_instance { | |
251 | // We don't handle other objects currently | |
252 | return Err(OperationError::Unhandled); | |
253 | } | |
254 | ||
255 | return Err(OperationError::Unhandled); | |
256 | } else { | |
257 | // Invalid object type | |
258 | return Err(OperationError::Unhandled); | |
259 | } | |
260 | } | |
261 | } | |
262 | ||
98 | // TODO: These should be on the stack | |
263 | 99 | // These tests verify that the essential handling of the database backend is |
264 | 100 | // functional and correct. |
265 | 101 | #[cfg(test)] |
@@ -284,6 +120,7 @@ mod test { | ||
284 | 120 | use giterated_models::settings::AnySetting; |
285 | 121 | use giterated_models::user::{DisplayName, User}; |
286 | 122 | use giterated_models::value::{AnyValue, GiteratedObjectValue}; |
123 | use giterated_stack::handler::GiteratedBackend; | |
287 | 124 | use serde_json::Value; |
288 | 125 | use tokio::sync::Mutex; |
289 | 126 | |
@@ -428,12 +265,13 @@ mod test { | ||
428 | 265 | } |
429 | 266 | } |
430 | 267 | |
431 | fn test_backend() -> DatabaseBackend { | |
268 | fn test_backend() -> GiteratedBackend<DatabaseBackend> { | |
432 | 269 | DatabaseBackend { |
433 | 270 | our_instance: Instance::from_str("testing.giterated.dev").unwrap(), |
434 | 271 | user_backend: Arc::new(Mutex::new(TestUserDatabaseBackend)) as _, |
435 | 272 | repository_backend: Arc::new(Mutex::new(TestUserRepositoryBackend)) as _, |
436 | 273 | } |
274 | .into_backend() | |
437 | 275 | } |
438 | 276 | |
439 | 277 | #[tokio::test] |
giterated-models/src/object/operations.rs
@@ -6,7 +6,7 @@ use crate::{instance::Instance, operation::GiteratedOperation}; | ||
6 | 6 | |
7 | 7 | use super::GiteratedObject; |
8 | 8 | |
9 | #[derive(Debug, Serialize, Deserialize)] | |
9 | #[derive(Debug, Serialize, Deserialize, Clone)] | |
10 | 10 | pub struct ObjectRequest(pub String); |
11 | 11 | |
12 | 12 | #[derive(Serialize, Deserialize)] |
giterated-models/src/user/mod.rs
@@ -47,7 +47,7 @@ impl GiteratedObject for User { | ||
47 | 47 | } |
48 | 48 | |
49 | 49 | fn from_object_str(object_str: &str) -> Result<Self, anyhow::Error> { |
50 | Ok(User::from_str(object_str).unwrap()) | |
50 | Ok(User::from_str(object_str)?) | |
51 | 51 | } |
52 | 52 | } |
53 | 53 |
giterated-stack/Cargo.toml
@@ -6,3 +6,10 @@ edition = "2021" | ||
6 | 6 | # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html |
7 | 7 | |
8 | 8 | [dependencies] |
9 | giterated-models = { path = "../giterated-models" } | |
10 | async-trait = "0.1" | |
11 | serde = { version = "1.0.188", features = [ "derive" ]} | |
12 | serde_json = "1.0" | |
13 | bincode = "*" | |
14 | futures-util = "*" | |
15 | tracing = "*" | |
15 | \ No newline at end of file |
giterated-stack/README.md
@@ -48,4 +48,14 @@ impl GiteratedOperation<User> for FooOperation { | ||
48 | 48 | } |
49 | 49 | ``` |
50 | 50 | |
51 | These trait implementations further guard operation handlers from being incorrect, as a function with mismatched parameters or return type is not a valid operation handler. | |
51 | \ No newline at end of file | |
51 | These trait implementations further guard operation handlers from being incorrect, as a function with mismatched parameters or return type is not a valid operation handler. | |
52 | ||
53 | ## Routing Operations to Handlers | |
54 | ||
55 | All incoming operations are processed through the `TODO` type. This type will, at runtime, generate a LUT that matches object types to their operation handlers. | |
56 | ||
57 | At this time, it is not valid for multiple handlers to handle the same operation on the same object type. The `TODO` type also verifies this is not the case at startup to prevent conflicts at runtime. | |
58 | ||
59 | ## Defining a new Operation | |
60 | ||
61 | A new operation can be implemented on any valid `GiteratedObject` type. This means it is easy to extend the behaviour of existing Object types to add new behaviours in-tree, and custom behaviours with plugins. | |
61 | \ No newline at end of file |
giterated-stack/src/handler.rs
@@ -0,0 +1,149 @@ | ||
1 | use giterated_models::{ | |
2 | error::OperationError, | |
3 | object::{ | |
4 | AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse, | |
5 | }, | |
6 | object_backend::ObjectBackend, | |
7 | operation::{AnyOperation, GiteratedOperation}, | |
8 | }; | |
9 | use std::{fmt::Debug, str::FromStr, sync::Arc}; | |
10 | use tracing::warn; | |
11 | ||
12 | use crate::{state::HandlerState, OperationHandlers}; | |
13 | ||
14 | #[derive(Clone)] | |
15 | pub struct GiteratedBackend<S: HandlerState> { | |
16 | state: S, | |
17 | handlers: Arc<OperationHandlers<S>>, | |
18 | } | |
19 | ||
20 | impl<S: HandlerState> GiteratedBackend<S> { | |
21 | pub fn new(state: S, handlers: OperationHandlers<S>) -> Self { | |
22 | Self { | |
23 | state, | |
24 | handlers: Arc::new(handlers), | |
25 | } | |
26 | } | |
27 | } | |
28 | ||
29 | #[async_trait::async_trait] | |
30 | impl<S: HandlerState> ObjectBackend for GiteratedBackend<S> { | |
31 | async fn object_operation<O, D>( | |
32 | &self, | |
33 | object: O, | |
34 | operation: &str, | |
35 | payload: D, | |
36 | ) -> Result<D::Success, OperationError<D::Failure>> | |
37 | where | |
38 | O: GiteratedObject + Debug, | |
39 | D: GiteratedOperation<O> + Debug, | |
40 | { | |
41 | let serialized = | |
42 | serde_json::to_value(payload).map_err(|e| OperationError::Internal(e.to_string()))?; | |
43 | let object = object.to_string(); | |
44 | ||
45 | if operation == ObjectRequest::operation_name() { | |
46 | // We're doing an object request | |
47 | let raw_result = self | |
48 | .handlers | |
49 | .resolve_object( | |
50 | AnyObject(object.clone()), | |
51 | serde_json::from_value(serialized).unwrap(), | |
52 | self.state.clone(), | |
53 | ) | |
54 | .await; | |
55 | ||
56 | return match raw_result { | |
57 | Ok(result) => Ok(serde_json::from_slice(&result) | |
58 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
59 | Err(err) => match err { | |
60 | OperationError::Internal(internal) => { | |
61 | warn!( | |
62 | "Internal Error: {:?}", | |
63 | OperationError::<()>::Internal(internal.clone()) | |
64 | ); | |
65 | ||
66 | Err(OperationError::Internal(internal)) | |
67 | } | |
68 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
69 | OperationError::Operation(err) => Err(OperationError::Operation( | |
70 | serde_json::from_slice(&err) | |
71 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
72 | )), | |
73 | }, | |
74 | }; | |
75 | } | |
76 | ||
77 | let raw_result = self | |
78 | .handlers | |
79 | .handle( | |
80 | &AnyObject(object), | |
81 | operation, | |
82 | AnyOperation(serialized), | |
83 | self.state.clone(), | |
84 | ) | |
85 | .await; | |
86 | ||
87 | match raw_result { | |
88 | Ok(result) => Ok(serde_json::from_slice(&result) | |
89 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
90 | Err(err) => match err { | |
91 | OperationError::Internal(internal) => { | |
92 | warn!( | |
93 | "Internal Error: {:?}", | |
94 | OperationError::<()>::Internal(internal.clone()) | |
95 | ); | |
96 | ||
97 | Err(OperationError::Internal(internal)) | |
98 | } | |
99 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
100 | OperationError::Operation(err) => Err(OperationError::Operation( | |
101 | serde_json::from_slice(&err) | |
102 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
103 | )), | |
104 | }, | |
105 | } | |
106 | } | |
107 | ||
108 | async fn get_object<O: GiteratedObject + Debug>( | |
109 | &self, | |
110 | object_str: &str, | |
111 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> { | |
112 | let raw_result = self | |
113 | .handlers | |
114 | .resolve_object( | |
115 | AnyObject("giterated.dev".to_string()), | |
116 | ObjectRequest(object_str.to_string()), | |
117 | self.state.clone(), | |
118 | ) | |
119 | .await; | |
120 | ||
121 | let object: ObjectResponse = match raw_result { | |
122 | Ok(result) => Ok(serde_json::from_slice(&result) | |
123 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
124 | Err(err) => match err { | |
125 | OperationError::Internal(internal) => { | |
126 | warn!( | |
127 | "Internal Error: {:?}", | |
128 | OperationError::<()>::Internal(internal.clone()) | |
129 | ); | |
130 | ||
131 | Err(OperationError::Internal(internal)) | |
132 | } | |
133 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
134 | OperationError::Operation(err) => Err(OperationError::Operation( | |
135 | serde_json::from_slice(&err) | |
136 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
137 | )), | |
138 | }, | |
139 | }?; | |
140 | ||
141 | unsafe { | |
142 | Ok(Object::new_unchecked( | |
143 | O::from_str(&object.0) | |
144 | .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?, | |
145 | self.clone(), | |
146 | )) | |
147 | } | |
148 | } | |
149 | } |
giterated-stack/src/lib.rs
@@ -0,0 +1,223 @@ | ||
1 | pub mod handler; | |
2 | pub mod state; | |
3 | ||
4 | use std::{future::Future, pin::Pin, sync::Arc}; | |
5 | ||
6 | use futures_util::FutureExt; | |
7 | use giterated_models::{ | |
8 | error::OperationError, | |
9 | instance::Instance, | |
10 | object::{AnyObject, GiteratedObject, ObjectRequest, ObjectResponse}, | |
11 | operation::{AnyOperation, GiteratedOperation}, | |
12 | }; | |
13 | ||
14 | pub struct OperationHandlers<S: Send + Sync + Clone> { | |
15 | operations: Vec<OperationWrapper<S>>, | |
16 | get_object: Vec<OperationWrapper<S>>, | |
17 | } | |
18 | ||
19 | impl<S: Send + Sync + Clone> Default for OperationHandlers<S> { | |
20 | fn default() -> Self { | |
21 | Self { | |
22 | operations: Vec::new(), | |
23 | get_object: Vec::new(), | |
24 | } | |
25 | } | |
26 | } | |
27 | ||
28 | impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | |
29 | pub fn insert< | |
30 | O: GiteratedObject + Send + Sync, | |
31 | D: GiteratedOperation<O> + 'static, | |
32 | H: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone, | |
33 | >( | |
34 | &mut self, | |
35 | handler: H, | |
36 | ) -> &mut Self { | |
37 | let _operation_name = handler.operation_name().to_string(); | |
38 | ||
39 | let wrapped = OperationWrapper::new(handler); | |
40 | ||
41 | self.operations.push(wrapped); | |
42 | ||
43 | self | |
44 | } | |
45 | ||
46 | pub fn register_object<O: GiteratedObject + Send + Sync>(&mut self) -> &mut Self { | |
47 | let closure = |_: &Instance, operation: ObjectRequest, _state| { | |
48 | async move { | |
49 | if O::from_str(&operation.0).is_ok() { | |
50 | Ok(ObjectResponse(operation.0)) | |
51 | } else { | |
52 | Err(OperationError::Unhandled) | |
53 | } | |
54 | } | |
55 | .boxed() | |
56 | }; | |
57 | ||
58 | let wrapped = OperationWrapper::new(closure); | |
59 | ||
60 | self.get_object.push(wrapped); | |
61 | ||
62 | self | |
63 | } | |
64 | ||
65 | pub async fn handle<O: GiteratedObject>( | |
66 | &self, | |
67 | object: &O, | |
68 | _operation_name: &str, | |
69 | operation: AnyOperation, | |
70 | state: S, | |
71 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { | |
72 | for handler in self.operations.iter() { | |
73 | return match handler | |
74 | .handle( | |
75 | AnyObject(object.to_string()), | |
76 | operation.clone(), | |
77 | state.clone(), | |
78 | ) | |
79 | .await | |
80 | { | |
81 | Ok(ok) => Ok(ok), | |
82 | Err(err) => Err(match err { | |
83 | OperationError::Operation(err) => OperationError::Operation(err), | |
84 | OperationError::Internal(err) => OperationError::Internal(err), | |
85 | OperationError::Unhandled => continue, | |
86 | }), | |
87 | }; | |
88 | } | |
89 | ||
90 | Err(OperationError::Unhandled) | |
91 | } | |
92 | ||
93 | pub async fn resolve_object( | |
94 | &self, | |
95 | instance: AnyObject, | |
96 | request: ObjectRequest, | |
97 | state: S, | |
98 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { | |
99 | for handler in self.get_object.iter() { | |
100 | if let Ok(response) = handler | |
101 | .handle( | |
102 | instance.clone(), | |
103 | AnyOperation(serde_json::to_value(request.clone()).unwrap()), | |
104 | state.clone(), | |
105 | ) | |
106 | .await | |
107 | { | |
108 | return Ok(response); | |
109 | } | |
110 | } | |
111 | ||
112 | Err(OperationError::Unhandled) | |
113 | } | |
114 | } | |
115 | ||
116 | #[async_trait::async_trait] | |
117 | pub trait GiteratedOperationHandler< | |
118 | O: GiteratedObject, | |
119 | D: GiteratedOperation<O>, | |
120 | S: Send + Sync + Clone, | |
121 | > | |
122 | { | |
123 | fn operation_name(&self) -> &str; | |
124 | fn object_name(&self) -> &str; | |
125 | ||
126 | async fn handle( | |
127 | &self, | |
128 | object: &O, | |
129 | operation: D, | |
130 | state: S, | |
131 | ) -> Result<D::Success, OperationError<D::Failure>>; | |
132 | } | |
133 | ||
134 | #[async_trait::async_trait] | |
135 | impl<O, D, F, S> GiteratedOperationHandler<O, D, S> for F | |
136 | where | |
137 | F: FnMut( | |
138 | &O, | |
139 | D, | |
140 | S, | |
141 | ) -> Pin< | |
142 | Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>, | |
143 | > + Send | |
144 | + Sync | |
145 | + Clone, | |
146 | O: GiteratedObject + Send + Sync, | |
147 | D: GiteratedOperation<O> + 'static, | |
148 | <D as GiteratedOperation<O>>::Failure: Send, | |
149 | S: Send + Sync + Clone + 'static, | |
150 | { | |
151 | fn operation_name(&self) -> &str { | |
152 | D::operation_name() | |
153 | } | |
154 | ||
155 | fn object_name(&self) -> &str { | |
156 | O::object_name() | |
157 | } | |
158 | ||
159 | async fn handle( | |
160 | &self, | |
161 | object: &O, | |
162 | operation: D, | |
163 | state: S, | |
164 | ) -> Result<D::Success, OperationError<D::Failure>> { | |
165 | self.clone()(object, operation, state).await | |
166 | } | |
167 | } | |
168 | ||
169 | pub struct OperationWrapper<S: Send + Sync + Clone>( | |
170 | Box< | |
171 | dyn Fn( | |
172 | AnyObject, | |
173 | AnyOperation, | |
174 | S, | |
175 | ) | |
176 | -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>> | |
177 | + Send | |
178 | + Sync, | |
179 | >, | |
180 | ); | |
181 | ||
182 | impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> { | |
183 | pub fn new< | |
184 | O: GiteratedObject + Send + Sync, | |
185 | D: GiteratedOperation<O> + 'static, | |
186 | F: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone, | |
187 | >( | |
188 | handler: F, | |
189 | ) -> Self { | |
190 | let handler = Arc::new(Box::pin(handler)); | |
191 | Self(Box::new(move |any_object, any_operation, state| { | |
192 | let handler = handler.clone(); | |
193 | async move { | |
194 | let handler = handler.clone(); | |
195 | let object: O = | |
196 | O::from_object_str(&any_object.0).map_err(|_| OperationError::Unhandled)?; | |
197 | let operation: D = serde_json::from_value(any_operation.0.clone()) | |
198 | .map_err(|_| OperationError::Unhandled)?; | |
199 | ||
200 | let result = handler.handle(&object, operation, state).await; | |
201 | result | |
202 | .map(|success| serde_json::to_vec(&success).unwrap()) | |
203 | .map_err(|err| match err { | |
204 | OperationError::Operation(err) => { | |
205 | OperationError::Operation(serde_json::to_vec(&err).unwrap()) | |
206 | } | |
207 | OperationError::Internal(internal) => OperationError::Internal(internal), | |
208 | OperationError::Unhandled => OperationError::Unhandled, | |
209 | }) | |
210 | } | |
211 | .boxed() | |
212 | })) | |
213 | } | |
214 | ||
215 | async fn handle( | |
216 | &self, | |
217 | object: AnyObject, | |
218 | operation: AnyOperation, | |
219 | state: S, | |
220 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { | |
221 | self.0(object, operation, state).await | |
222 | } | |
223 | } |
giterated-stack/src/state.rs
@@ -0,0 +1,12 @@ | ||
1 | /// A type which can be passed into a stateful handler. | |
2 | /// | |
3 | /// # Trait Bounds | |
4 | /// This trait is bound on `Send + Sync`, as well as `Clone`. Handler states are | |
5 | /// cloned many times, and should be inexpensive to clone. | |
6 | /// | |
7 | /// # Blanket Impl | |
8 | /// This trait is blanket-impl'd on any type that meets the requirements. You do not need | |
9 | /// to manually mark your state types with it. | |
10 | pub trait HandlerState: Send + Sync + Clone + 'static {} | |
11 | ||
12 | impl<T> HandlerState for T where T: Send + Sync + Clone + 'static {} |