Finish unified stack refactor.
Adds support for operation state, which will be used to pass authentication information around. Added generic backend that uses a channel to communicate with a typed backend.
parent: tbd commit: d15581c
Showing 16 changed files with 552 insertions and 146 deletions
Cargo.lock
@@ -824,6 +824,7 @@ dependencies = [ | ||
824 | 824 | "giterated-models", |
825 | 825 | "serde", |
826 | 826 | "serde_json", |
827 | "tokio", | |
827 | 828 | "tracing", |
828 | 829 | ] |
829 | 830 |
giterated-daemon/src/cache_backend.rs
@@ -1,30 +1 @@ | ||
1 | use giterated_models::error::OperationError; | |
2 | 1 | |
3 | use giterated_models::object::{GiteratedObject, Object, ObjectRequestError}; | |
4 | use giterated_models::object_backend::ObjectBackend; | |
5 | use giterated_models::operation::GiteratedOperation; | |
6 | ||
7 | use std::fmt::Debug; | |
8 | ||
9 | #[derive(Clone, Debug)] | |
10 | pub struct CacheBackend; | |
11 | ||
12 | #[async_trait::async_trait] | |
13 | impl ObjectBackend for CacheBackend { | |
14 | async fn object_operation<O: GiteratedObject + Debug, D: GiteratedOperation<O> + Debug>( | |
15 | &self, | |
16 | _object: O, | |
17 | _operation: &str, | |
18 | _payload: D, | |
19 | ) -> Result<D::Success, OperationError<D::Failure>> { | |
20 | // We don't handle operations with this backend | |
21 | Err(OperationError::Unhandled) | |
22 | } | |
23 | ||
24 | async fn get_object<O: GiteratedObject + Debug>( | |
25 | &self, | |
26 | _object_str: &str, | |
27 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> { | |
28 | Err(OperationError::Unhandled) | |
29 | } | |
30 | } |
giterated-daemon/src/connection/wrapper.rs
@@ -14,6 +14,7 @@ use giterated_models::{ | ||
14 | 14 | authenticated::AuthenticatedPayload, message::GiteratedMessage, object::AnyObject, |
15 | 15 | operation::AnyOperation, |
16 | 16 | }; |
17 | use giterated_stack::{handler::GiteratedBackend, StackOperationState}; | |
17 | 18 | use serde::Serialize; |
18 | 19 | |
19 | 20 | use tokio::{net::TcpStream, sync::Mutex}; |
@@ -41,7 +42,8 @@ pub async fn connection_wrapper( | ||
41 | 42 | instance: impl ToOwned<Owned = Instance>, |
42 | 43 | instance_connections: Arc<Mutex<InstanceConnections>>, |
43 | 44 | config: Table, |
44 | backend: DatabaseBackend, | |
45 | backend: GiteratedBackend<DatabaseBackend>, | |
46 | operation_state: StackOperationState, | |
45 | 47 | ) { |
46 | 48 | let connection_state = ConnectionState { |
47 | 49 | socket: Arc::new(Mutex::new(socket)), |
@@ -60,8 +62,6 @@ pub async fn connection_wrapper( | ||
60 | 62 | |
61 | 63 | let _handshaked = false; |
62 | 64 | |
63 | let backend = backend.into_backend(); | |
64 | ||
65 | 65 | loop { |
66 | 66 | let mut socket = connection_state.socket.lock().await; |
67 | 67 | let message = socket.next().await; |
@@ -86,7 +86,12 @@ pub async fn connection_wrapper( | ||
86 | 86 | let message: GiteratedMessage<AnyObject, AnyOperation> = message.into_message(); |
87 | 87 | |
88 | 88 | let result = backend |
89 | .object_operation(message.object, &message.operation, message.payload) | |
89 | .object_operation( | |
90 | message.object, | |
91 | &message.operation, | |
92 | message.payload, | |
93 | &operation_state, | |
94 | ) | |
90 | 95 | .await; |
91 | 96 | |
92 | 97 | // Map result to Vec<u8> on both |
giterated-daemon/src/database_backend/handler.rs
@@ -14,6 +14,7 @@ use giterated_models::{ | ||
14 | 14 | user::{User, UserRepositoriesRequest}, |
15 | 15 | value::{AnyValue, GetValue}, |
16 | 16 | }; |
17 | use giterated_stack::{BackendWrapper, StackOperationState}; | |
17 | 18 | |
18 | 19 | use super::DatabaseBackend; |
19 | 20 | |
@@ -97,13 +98,14 @@ pub fn repository_info( | ||
97 | 98 | object: &Repository, |
98 | 99 | operation: RepositoryInfoRequest, |
99 | 100 | state: DatabaseBackend, |
101 | operation_state: StackOperationState, | |
102 | backend: BackendWrapper, | |
100 | 103 | ) -> BoxFuture<'static, Result<RepositoryView, OperationError<RepositoryError>>> { |
101 | 104 | let object = object.clone(); |
102 | let backend = state.into_backend(); | |
103 | 105 | |
104 | 106 | async move { |
105 | 107 | let mut object = backend |
106 | .get_object::<Repository>(&object.to_string()) | |
108 | .get_object::<Repository>(&object.to_string(), &operation_state) | |
107 | 109 | .await |
108 | 110 | .unwrap(); |
109 | 111 | |
@@ -125,17 +127,17 @@ pub fn repository_info( | ||
125 | 127 | let info = RepositoryView { |
126 | 128 | name: object.object().name.clone(), |
127 | 129 | owner: object.object().owner.clone(), |
128 | description: object.get::<Description>().await.ok(), | |
130 | description: object.get::<Description>(&operation_state).await.ok(), | |
129 | 131 | visibility: object |
130 | .get::<Visibility>() | |
132 | .get::<Visibility>(&operation_state) | |
131 | 133 | .await |
132 | 134 | .map_err(|e| OperationError::Internal(format!("{:?}: {}", e.source(), e)))?, |
133 | 135 | default_branch: object |
134 | .get::<DefaultBranch>() | |
136 | .get::<DefaultBranch>(&operation_state) | |
135 | 137 | .await |
136 | 138 | .map_err(|e| OperationError::Internal(format!("{:?}: {}", e.source(), e)))?, |
137 | 139 | // TODO: Can't be a simple get function, this needs to be returned alongside the tree as this differs depending on the rev and path. |
138 | latest_commit: object.get::<LatestCommit>().await.ok(), | |
140 | latest_commit: object.get::<LatestCommit>(&operation_state).await.ok(), | |
139 | 141 | tree_rev: operation.rev, |
140 | 142 | tree, |
141 | 143 | }; |
@@ -149,13 +151,14 @@ pub fn repository_file_from_id( | ||
149 | 151 | object: &Repository, |
150 | 152 | operation: RepositoryFileFromIdRequest, |
151 | 153 | state: DatabaseBackend, |
154 | operation_state: StackOperationState, | |
155 | backend: BackendWrapper, | |
152 | 156 | ) -> BoxFuture<'static, Result<RepositoryFile, OperationError<RepositoryError>>> { |
153 | 157 | let object = object.clone(); |
154 | let backend = state.into_backend(); | |
155 | 158 | |
156 | 159 | async move { |
157 | 160 | let object = backend |
158 | .get_object::<Repository>(&object.to_string()) | |
161 | .get_object::<Repository>(&object.to_string(), &operation_state) | |
159 | 162 | .await |
160 | 163 | .unwrap(); |
161 | 164 | |
@@ -179,13 +182,14 @@ pub fn repository_diff( | ||
179 | 182 | object: &Repository, |
180 | 183 | operation: RepositoryDiffRequest, |
181 | 184 | state: DatabaseBackend, |
185 | operation_state: StackOperationState, | |
186 | backend: BackendWrapper, | |
182 | 187 | ) -> BoxFuture<'static, Result<RepositoryDiff, OperationError<RepositoryError>>> { |
183 | 188 | let object = object.clone(); |
184 | let backend = state.into_backend(); | |
185 | 189 | |
186 | 190 | async move { |
187 | 191 | let object = backend |
188 | .get_object::<Repository>(&object.to_string()) | |
192 | .get_object::<Repository>(&object.to_string(), &operation_state) | |
189 | 193 | .await |
190 | 194 | .unwrap(); |
191 | 195 | |
@@ -205,13 +209,14 @@ pub fn repository_commit_before( | ||
205 | 209 | object: &Repository, |
206 | 210 | operation: RepositoryCommitBeforeRequest, |
207 | 211 | state: DatabaseBackend, |
212 | operation_state: StackOperationState, | |
213 | backend: BackendWrapper, | |
208 | 214 | ) -> BoxFuture<'static, Result<Commit, OperationError<RepositoryError>>> { |
209 | 215 | let object = object.clone(); |
210 | let backend = state.into_backend(); | |
211 | 216 | |
212 | 217 | async move { |
213 | 218 | let object = backend |
214 | .get_object::<Repository>(&object.to_string()) | |
219 | .get_object::<Repository>(&object.to_string(), &operation_state) | |
215 | 220 | .await |
216 | 221 | .unwrap(); |
217 | 222 |
giterated-daemon/src/database_backend/mod.rs
@@ -10,7 +10,7 @@ use giterated_models::operation::GiteratedOperation; | ||
10 | 10 | use giterated_models::repository::Repository; |
11 | 11 | use giterated_models::user::User; |
12 | 12 | use giterated_stack::handler::GiteratedBackend; |
13 | use giterated_stack::OperationHandlers; | |
13 | use giterated_stack::{OperationHandlers, StackOperationState}; | |
14 | 14 | use std::fmt::Debug; |
15 | 15 | use tokio::sync::Mutex; |
16 | 16 | |
@@ -26,12 +26,13 @@ use self::handler::{ | ||
26 | 26 | pub struct Foobackend {} |
27 | 27 | |
28 | 28 | #[async_trait::async_trait] |
29 | impl ObjectBackend for Foobackend { | |
29 | impl ObjectBackend<StackOperationState> for Foobackend { | |
30 | 30 | async fn object_operation<O: GiteratedObject + Debug, D: GiteratedOperation<O> + Debug>( |
31 | 31 | &self, |
32 | 32 | _object: O, |
33 | 33 | _operation: &str, |
34 | 34 | _payload: D, |
35 | _operation_state: &StackOperationState, | |
35 | 36 | ) -> Result<D::Success, OperationError<D::Failure>> { |
36 | 37 | // We don't handle operations with this backend |
37 | 38 | Err(OperationError::Unhandled) |
@@ -40,7 +41,8 @@ impl ObjectBackend for Foobackend { | ||
40 | 41 | async fn get_object<O: GiteratedObject + Debug>( |
41 | 42 | &self, |
42 | 43 | _object_str: &str, |
43 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> { | |
44 | _operation_state: &StackOperationState, | |
45 | ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> { | |
44 | 46 | Err(OperationError::Unhandled) |
45 | 47 | } |
46 | 48 | } |
@@ -121,6 +123,7 @@ mod test { | ||
121 | 123 | use giterated_models::user::{DisplayName, User}; |
122 | 124 | use giterated_models::value::{AnyValue, GiteratedObjectValue}; |
123 | 125 | use giterated_stack::handler::GiteratedBackend; |
126 | use giterated_stack::StackOperationState; | |
124 | 127 | use serde_json::Value; |
125 | 128 | use tokio::sync::Mutex; |
126 | 129 | |
@@ -274,16 +277,21 @@ mod test { | ||
274 | 277 | .into_backend() |
275 | 278 | } |
276 | 279 | |
280 | fn operation_state() -> StackOperationState { | |
281 | todo!() | |
282 | } | |
283 | ||
277 | 284 | #[tokio::test] |
278 | 285 | async fn test_user_get() { |
279 | 286 | let backend = test_backend(); |
287 | let operation_state = operation_state(); | |
280 | 288 | |
281 | 289 | let mut user = backend |
282 | .get_object::<User>("test_user:test.giterated.dev") | |
290 | .get_object::<User>("test_user:test.giterated.dev", &operation_state) | |
283 | 291 | .await |
284 | 292 | .expect("object should have been returned"); |
285 | 293 | |
286 | user.get::<DisplayName>() | |
294 | user.get::<DisplayName>(&operation_state) | |
287 | 295 | .await |
288 | 296 | .expect("object value should have been returned"); |
289 | 297 | } |
@@ -291,13 +299,14 @@ mod test { | ||
291 | 299 | #[tokio::test] |
292 | 300 | async fn test_user_get_setting() { |
293 | 301 | let backend = test_backend(); |
302 | let operation_state = operation_state(); | |
294 | 303 | |
295 | 304 | let mut user = backend |
296 | .get_object::<User>("test_user:test.giterated.dev") | |
305 | .get_object::<User>("test_user:test.giterated.dev", &operation_state) | |
297 | 306 | .await |
298 | 307 | .expect("object should have been returned"); |
299 | 308 | |
300 | user.get_setting::<DisplayName>() | |
309 | user.get_setting::<DisplayName>(&operation_state) | |
301 | 310 | .await |
302 | 311 | .expect("object value should have been returned"); |
303 | 312 | } |
@@ -305,13 +314,14 @@ mod test { | ||
305 | 314 | #[tokio::test] |
306 | 315 | async fn test_user_set_setting() { |
307 | 316 | let backend = test_backend(); |
317 | let operation_state = operation_state(); | |
308 | 318 | |
309 | 319 | let mut user = backend |
310 | .get_object::<User>("test_user:test.giterated.dev") | |
320 | .get_object::<User>("test_user:test.giterated.dev", &operation_state) | |
311 | 321 | .await |
312 | 322 | .expect("object should have been returned"); |
313 | 323 | |
314 | user.set_setting::<DisplayName>(DisplayName(String::from("test"))) | |
324 | user.set_setting::<DisplayName>(DisplayName(String::from("test")), &operation_state) | |
315 | 325 | .await |
316 | 326 | .expect("object value should have been returned"); |
317 | 327 | } |
@@ -319,14 +329,18 @@ mod test { | ||
319 | 329 | #[tokio::test] |
320 | 330 | async fn test_respository_get() { |
321 | 331 | let backend = test_backend(); |
332 | let operation_state = operation_state(); | |
322 | 333 | |
323 | 334 | let mut repository = backend |
324 | .get_object::<Repository>("test_user:test.giterated.dev/[email protected]") | |
335 | .get_object::<Repository>( | |
336 | "test_user:test.giterated.dev/[email protected]", | |
337 | &operation_state, | |
338 | ) | |
325 | 339 | .await |
326 | 340 | .expect("object should have been returned"); |
327 | 341 | |
328 | 342 | repository |
329 | .get::<Description>() | |
343 | .get::<Description>(&operation_state) | |
330 | 344 | .await |
331 | 345 | .expect("object value should have been returned"); |
332 | 346 | } |
@@ -334,14 +348,18 @@ mod test { | ||
334 | 348 | #[tokio::test] |
335 | 349 | async fn test_repository_get_setting() { |
336 | 350 | let backend = test_backend(); |
351 | let operation_state = operation_state(); | |
337 | 352 | |
338 | 353 | let mut repository = backend |
339 | .get_object::<Repository>("test_user:test.giterated.dev/[email protected]") | |
354 | .get_object::<Repository>( | |
355 | "test_user:test.giterated.dev/[email protected]", | |
356 | &operation_state, | |
357 | ) | |
340 | 358 | .await |
341 | 359 | .expect("object should have been returned"); |
342 | 360 | |
343 | 361 | repository |
344 | .get_setting::<Description>() | |
362 | .get_setting::<Description>(&operation_state) | |
345 | 363 | .await |
346 | 364 | .expect("object value should have been returned"); |
347 | 365 | } |
@@ -349,14 +367,18 @@ mod test { | ||
349 | 367 | #[tokio::test] |
350 | 368 | async fn test_repository_set_setting() { |
351 | 369 | let backend = test_backend(); |
370 | let operation_state = operation_state(); | |
352 | 371 | |
353 | 372 | let mut repository = backend |
354 | .get_object::<Repository>("test_user:test.giterated.dev/[email protected]") | |
373 | .get_object::<Repository>( | |
374 | "test_user:test.giterated.dev/[email protected]", | |
375 | &operation_state, | |
376 | ) | |
355 | 377 | .await |
356 | 378 | .expect("object should have been returned"); |
357 | 379 | |
358 | 380 | repository |
359 | .set_setting::<Description>(Description(String::from("test"))) | |
381 | .set_setting::<Description>(Description(String::from("test")), &operation_state) | |
360 | 382 | .await |
361 | 383 | .expect("object value should have been returned"); |
362 | 384 | } |
giterated-daemon/src/main.rs
@@ -12,6 +12,7 @@ use giterated_daemon::{ | ||
12 | 12 | |
13 | 13 | use giterated_models::instance::Instance; |
14 | 14 | |
15 | use giterated_stack::{BackendWrapper, StackOperationState}; | |
15 | 16 | use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool}; |
16 | 17 | use std::{net::SocketAddr, str::FromStr, sync::Arc}; |
17 | 18 | use tokio::{ |
@@ -88,6 +89,16 @@ async fn main() -> Result<(), Error> { | ||
88 | 89 | repository_backend.clone(), |
89 | 90 | ); |
90 | 91 | |
92 | let backend = database_backend.into_backend(); | |
93 | ||
94 | let backend_wrapper = BackendWrapper::new(backend.clone()); | |
95 | ||
96 | let operation_state = { | |
97 | StackOperationState { | |
98 | giterated_backend: backend_wrapper, | |
99 | } | |
100 | }; | |
101 | ||
91 | 102 | loop { |
92 | 103 | let stream = accept_stream(&mut listener).await; |
93 | 104 | info!("Connected"); |
@@ -129,7 +140,8 @@ async fn main() -> Result<(), Error> { | ||
129 | 140 | Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(), |
130 | 141 | instance_connections.clone(), |
131 | 142 | config.clone(), |
132 | database_backend.clone(), | |
143 | backend.clone(), | |
144 | operation_state.clone(), | |
133 | 145 | )), |
134 | 146 | }; |
135 | 147 |
giterated-models/src/instance/operations.rs
@@ -106,18 +106,22 @@ impl GiteratedOperation<Instance> for RepositoryCreateRequest { | ||
106 | 106 | type Failure = InstanceError; |
107 | 107 | } |
108 | 108 | |
109 | impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Instance, B> { | |
109 | impl<S: Clone + Send + Sync, B: ObjectBackend<S> + std::fmt::Debug> Object<'_, S, Instance, B> { | |
110 | 110 | pub async fn register_account( |
111 | 111 | &mut self, |
112 | 112 | email: Option<&str>, |
113 | 113 | username: &str, |
114 | 114 | password: &Secret<Password>, |
115 | operation_state: &S, | |
115 | 116 | ) -> Result<UserAuthenticationToken, OperationError<InstanceError>> { |
116 | self.request::<RegisterAccountRequest>(RegisterAccountRequest { | |
117 | username: username.to_string(), | |
118 | email: email.map(|s| s.to_string()), | |
119 | password: password.clone(), | |
120 | }) | |
117 | self.request::<RegisterAccountRequest>( | |
118 | RegisterAccountRequest { | |
119 | username: username.to_string(), | |
120 | email: email.map(|s| s.to_string()), | |
121 | password: password.clone(), | |
122 | }, | |
123 | operation_state, | |
124 | ) | |
121 | 125 | .await |
122 | 126 | } |
123 | 127 | |
@@ -125,12 +129,16 @@ impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Instance, B> { | ||
125 | 129 | &mut self, |
126 | 130 | username: &str, |
127 | 131 | password: &Secret<Password>, |
132 | operation_state: &S, | |
128 | 133 | ) -> Result<UserAuthenticationToken, OperationError<InstanceError>> { |
129 | self.request::<AuthenticationTokenRequest>(AuthenticationTokenRequest { | |
130 | instance: self.inner.clone(), | |
131 | username: username.to_string(), | |
132 | password: password.clone(), | |
133 | }) | |
134 | self.request::<AuthenticationTokenRequest>( | |
135 | AuthenticationTokenRequest { | |
136 | instance: self.inner.clone(), | |
137 | username: username.to_string(), | |
138 | password: password.clone(), | |
139 | }, | |
140 | operation_state, | |
141 | ) | |
134 | 142 | .await |
135 | 143 | } |
136 | 144 | |
@@ -139,22 +147,30 @@ impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Instance, B> { | ||
139 | 147 | instance: &Instance, |
140 | 148 | username: &str, |
141 | 149 | password: &Secret<Password>, |
150 | operation_state: &S, | |
142 | 151 | ) -> Result<UserAuthenticationToken, OperationError<InstanceError>> { |
143 | self.request::<AuthenticationTokenRequest>(AuthenticationTokenRequest { | |
144 | instance: instance.clone(), | |
145 | username: username.to_string(), | |
146 | password: password.clone(), | |
147 | }) | |
152 | self.request::<AuthenticationTokenRequest>( | |
153 | AuthenticationTokenRequest { | |
154 | instance: instance.clone(), | |
155 | username: username.to_string(), | |
156 | password: password.clone(), | |
157 | }, | |
158 | operation_state, | |
159 | ) | |
148 | 160 | .await |
149 | 161 | } |
150 | 162 | |
151 | 163 | pub async fn token_extension( |
152 | 164 | &mut self, |
153 | 165 | token: &UserAuthenticationToken, |
166 | operation_state: &S, | |
154 | 167 | ) -> Result<Option<UserAuthenticationToken>, OperationError<InstanceError>> { |
155 | self.request::<TokenExtensionRequest>(TokenExtensionRequest { | |
156 | token: token.clone(), | |
157 | }) | |
168 | self.request::<TokenExtensionRequest>( | |
169 | TokenExtensionRequest { | |
170 | token: token.clone(), | |
171 | }, | |
172 | operation_state, | |
173 | ) | |
158 | 174 | .await |
159 | 175 | } |
160 | 176 | |
@@ -165,15 +181,19 @@ impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Instance, B> { | ||
165 | 181 | visibility: &RepositoryVisibility, |
166 | 182 | default_branch: &str, |
167 | 183 | owner: &User, |
184 | operation_state: &S, | |
168 | 185 | ) -> Result<Repository, OperationError<InstanceError>> { |
169 | self.request::<RepositoryCreateRequest>(RepositoryCreateRequest { | |
170 | instance: Some(instance.clone()), | |
171 | name: name.to_string(), | |
172 | description: None, | |
173 | visibility: visibility.clone(), | |
174 | default_branch: default_branch.to_string(), | |
175 | owner: owner.clone(), | |
176 | }) | |
186 | self.request::<RepositoryCreateRequest>( | |
187 | RepositoryCreateRequest { | |
188 | instance: Some(instance.clone()), | |
189 | name: name.to_string(), | |
190 | description: None, | |
191 | visibility: visibility.clone(), | |
192 | default_branch: default_branch.to_string(), | |
193 | owner: owner.clone(), | |
194 | }, | |
195 | operation_state, | |
196 | ) | |
177 | 197 | .await |
178 | 198 | } |
179 | 199 | } |
giterated-models/src/object.rs
@@ -18,18 +18,25 @@ mod operations; | ||
18 | 18 | pub use operations::*; |
19 | 19 | |
20 | 20 | #[derive(Debug, Clone)] |
21 | pub struct Object<'b, O: GiteratedObject, B: ObjectBackend + 'b + Send + Sync + Clone> { | |
21 | pub struct Object< | |
22 | 'b, | |
23 | S: Clone + Send + Sync, | |
24 | O: GiteratedObject, | |
25 | B: ObjectBackend<S> + 'b + Send + Sync + Clone, | |
26 | > { | |
22 | 27 | pub(crate) inner: O, |
23 | 28 | pub(crate) backend: B, |
24 | pub(crate) _marker: PhantomData<&'b ()>, | |
29 | pub(crate) _marker: PhantomData<&'b S>, | |
25 | 30 | } |
26 | 31 | |
27 | impl<'b, B: ObjectBackend + Send + Sync + Clone, O: GiteratedObject> Object<'b, O, B> { | |
32 | impl<'b, S: Clone + Send + Sync, B: ObjectBackend<S> + Send + Sync + Clone, O: GiteratedObject> | |
33 | Object<'b, S, O, B> | |
34 | { | |
28 | 35 | pub fn object(&self) -> &O { |
29 | 36 | &self.inner |
30 | 37 | } |
31 | 38 | |
32 | pub unsafe fn new_unchecked(object: O, backend: B) -> Object<'b, O, B> { | |
39 | pub unsafe fn new_unchecked(object: O, backend: B) -> Object<'b, S, O, B> { | |
33 | 40 | Object { |
34 | 41 | inner: object, |
35 | 42 | backend, |
@@ -38,8 +45,11 @@ impl<'b, B: ObjectBackend + Send + Sync + Clone, O: GiteratedObject> Object<'b, | ||
38 | 45 | } |
39 | 46 | } |
40 | 47 | |
41 | impl<O: GiteratedObject + Display, B: ObjectBackend + Send + Sync + Clone> Display | |
42 | for Object<'_, O, B> | |
48 | impl< | |
49 | S: Clone + Send + Sync, | |
50 | O: GiteratedObject + Display, | |
51 | B: ObjectBackend<S> + Send + Sync + Clone, | |
52 | > Display for Object<'_, S, O, B> | |
43 | 53 | { |
44 | 54 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
45 | 55 | self.inner.fmt(f) |
@@ -52,15 +62,21 @@ pub trait GiteratedObject: Send + Display + FromStr { | ||
52 | 62 | fn from_object_str(object_str: &str) -> Result<Self, Error>; |
53 | 63 | } |
54 | 64 | |
55 | impl<'b, O: GiteratedObject + Clone + Debug, B: ObjectBackend> Object<'b, O, B> { | |
65 | impl<'b, I: Clone + Send + Sync, O: GiteratedObject + Clone + Debug, B: ObjectBackend<I>> | |
66 | Object<'b, I, O, B> | |
67 | { | |
56 | 68 | pub async fn get<V: GiteratedObjectValue<Object = O> + Send + Debug>( |
57 | 69 | &mut self, |
70 | operation_state: &I, | |
58 | 71 | ) -> Result<V, OperationError<GetValueError>> { |
59 | 72 | let result = self |
60 | .request(GetValue { | |
61 | value_name: V::value_name().to_string(), | |
62 | _marker: PhantomData, | |
63 | }) | |
73 | .request( | |
74 | GetValue { | |
75 | value_name: V::value_name().to_string(), | |
76 | _marker: PhantomData, | |
77 | }, | |
78 | operation_state, | |
79 | ) | |
64 | 80 | .await; |
65 | 81 | |
66 | 82 | result |
@@ -68,31 +84,45 @@ impl<'b, O: GiteratedObject + Clone + Debug, B: ObjectBackend> Object<'b, O, B> | ||
68 | 84 | |
69 | 85 | pub async fn get_setting<S: Setting + Send + Clone + Debug>( |
70 | 86 | &mut self, |
87 | operation_state: &I, | |
71 | 88 | ) -> Result<S, OperationError<GetSettingError>> { |
72 | self.request(GetSetting { | |
73 | setting_name: S::name().to_string(), | |
74 | _marker: PhantomData, | |
75 | }) | |
89 | self.request( | |
90 | GetSetting { | |
91 | setting_name: S::name().to_string(), | |
92 | _marker: PhantomData, | |
93 | }, | |
94 | operation_state, | |
95 | ) | |
76 | 96 | .await |
77 | 97 | } |
78 | 98 | |
79 | 99 | pub async fn set_setting<S: Setting + Send + Clone + Debug>( |
80 | 100 | &mut self, |
81 | 101 | setting: S, |
102 | operation_state: &I, | |
82 | 103 | ) -> Result<(), OperationError<SetSettingError>> { |
83 | self.request(SetSetting { | |
84 | setting_name: S::name().to_string(), | |
85 | value: setting, | |
86 | }) | |
104 | self.request( | |
105 | SetSetting { | |
106 | setting_name: S::name().to_string(), | |
107 | value: setting, | |
108 | }, | |
109 | operation_state, | |
110 | ) | |
87 | 111 | .await |
88 | 112 | } |
89 | 113 | |
90 | 114 | pub async fn request<R: GiteratedOperation<O> + Debug>( |
91 | 115 | &mut self, |
92 | 116 | request: R, |
117 | operation_state: &I, | |
93 | 118 | ) -> Result<R::Success, OperationError<R::Failure>> { |
94 | 119 | self.backend |
95 | .object_operation(self.inner.clone(), R::operation_name(), request) | |
120 | .object_operation( | |
121 | self.inner.clone(), | |
122 | R::operation_name(), | |
123 | request, | |
124 | operation_state, | |
125 | ) | |
96 | 126 | .await |
97 | 127 | } |
98 | 128 | } |
giterated-models/src/object_backend.rs
@@ -7,12 +7,13 @@ use crate::{ | ||
7 | 7 | use std::fmt::Debug; |
8 | 8 | |
9 | 9 | #[async_trait::async_trait] |
10 | pub trait ObjectBackend: Send + Sync + Sized + Clone { | |
10 | pub trait ObjectBackend<S: Clone + Send + Sync>: Send + Sync + Sized + Clone { | |
11 | 11 | async fn object_operation<O, D>( |
12 | 12 | &self, |
13 | 13 | object: O, |
14 | 14 | operation: &str, |
15 | 15 | payload: D, |
16 | operation_state: &S, | |
16 | 17 | ) -> Result<D::Success, OperationError<D::Failure>> |
17 | 18 | where |
18 | 19 | O: GiteratedObject + Debug, |
@@ -21,5 +22,6 @@ pub trait ObjectBackend: Send + Sync + Sized + Clone { | ||
21 | 22 | async fn get_object<O: GiteratedObject + Debug>( |
22 | 23 | &self, |
23 | 24 | object_str: &str, |
24 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>>; | |
25 | operation_state: &S, | |
26 | ) -> Result<Object<S, O, Self>, OperationError<ObjectRequestError>>; | |
25 | 27 | } |
giterated-models/src/operation.rs
@@ -24,3 +24,8 @@ impl<O: GiteratedObject> GiteratedOperation<O> for AnyOperation { | ||
24 | 24 | |
25 | 25 | type Failure = Value; |
26 | 26 | } |
27 | ||
28 | /// The internal state of an operation, used to provide authentication information | |
29 | /// and the ability to make giterated calls within handlers. | |
30 | #[derive(Clone)] | |
31 | pub struct GiteratedOperationState<S: Clone + Send + Sync>(pub S); |
giterated-models/src/repository/operations.rs
@@ -153,44 +153,60 @@ impl GiteratedOperation<Repository> for RepositoryFileInspectRequest { | ||
153 | 153 | type Failure = RepositoryError; |
154 | 154 | } |
155 | 155 | |
156 | impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Repository, B> { | |
156 | impl<S: Clone + Send + Sync, B: ObjectBackend<S> + std::fmt::Debug> Object<'_, S, Repository, B> { | |
157 | 157 | pub async fn info( |
158 | 158 | &mut self, |
159 | 159 | extra_metadata: bool, |
160 | 160 | rev: Option<String>, |
161 | 161 | path: Option<String>, |
162 | operation_state: &S, | |
162 | 163 | ) -> Result<RepositoryView, OperationError<RepositoryError>> { |
163 | self.request::<RepositoryInfoRequest>(RepositoryInfoRequest { | |
164 | extra_metadata, | |
165 | rev, | |
166 | path, | |
167 | }) | |
164 | self.request::<RepositoryInfoRequest>( | |
165 | RepositoryInfoRequest { | |
166 | extra_metadata, | |
167 | rev, | |
168 | path, | |
169 | }, | |
170 | operation_state, | |
171 | ) | |
168 | 172 | .await |
169 | 173 | } |
170 | 174 | |
171 | 175 | pub async fn file_from_id( |
172 | 176 | &mut self, |
173 | 177 | id: String, |
178 | operation_state: &S, | |
174 | 179 | ) -> Result<RepositoryFile, OperationError<RepositoryError>> { |
175 | self.request::<RepositoryFileFromIdRequest>(RepositoryFileFromIdRequest(id)) | |
176 | .await | |
180 | self.request::<RepositoryFileFromIdRequest>( | |
181 | RepositoryFileFromIdRequest(id), | |
182 | operation_state, | |
183 | ) | |
184 | .await | |
177 | 185 | } |
178 | 186 | |
179 | 187 | pub async fn diff( |
180 | 188 | &mut self, |
181 | 189 | old_id: String, |
182 | 190 | new_id: String, |
191 | operation_state: &S, | |
183 | 192 | ) -> Result<RepositoryDiff, OperationError<RepositoryError>> { |
184 | self.request::<RepositoryDiffRequest>(RepositoryDiffRequest { old_id, new_id }) | |
185 | .await | |
193 | self.request::<RepositoryDiffRequest>( | |
194 | RepositoryDiffRequest { old_id, new_id }, | |
195 | operation_state, | |
196 | ) | |
197 | .await | |
186 | 198 | } |
187 | 199 | |
188 | 200 | pub async fn commit_before( |
189 | 201 | &mut self, |
190 | 202 | id: String, |
203 | operation_state: &S, | |
191 | 204 | ) -> Result<Commit, OperationError<RepositoryError>> { |
192 | self.request::<RepositoryCommitBeforeRequest>(RepositoryCommitBeforeRequest(id)) | |
193 | .await | |
205 | self.request::<RepositoryCommitBeforeRequest>( | |
206 | RepositoryCommitBeforeRequest(id), | |
207 | operation_state, | |
208 | ) | |
209 | .await | |
194 | 210 | } |
195 | 211 | |
196 | 212 | // pub async fn issues_count(&mut self) -> Result<u64, OperationError<RepositoryError>> { |
@@ -200,15 +216,17 @@ impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Repository, B> { | ||
200 | 216 | |
201 | 217 | pub async fn issue_labels( |
202 | 218 | &mut self, |
219 | operation_state: &S, | |
203 | 220 | ) -> Result<Vec<IssueLabel>, OperationError<RepositoryError>> { |
204 | self.request::<RepositoryIssueLabelsRequest>(RepositoryIssueLabelsRequest) | |
221 | self.request::<RepositoryIssueLabelsRequest>(RepositoryIssueLabelsRequest, operation_state) | |
205 | 222 | .await |
206 | 223 | } |
207 | 224 | |
208 | 225 | pub async fn issues( |
209 | 226 | &mut self, |
227 | operation_state: &S, | |
210 | 228 | ) -> Result<Vec<RepositoryIssue>, OperationError<RepositoryError>> { |
211 | self.request::<RepositoryIssuesRequest>(RepositoryIssuesRequest) | |
229 | self.request::<RepositoryIssuesRequest>(RepositoryIssuesRequest, operation_state) | |
212 | 230 | .await |
213 | 231 | } |
214 | 232 | |
@@ -217,12 +235,16 @@ impl<B: ObjectBackend + std::fmt::Debug> Object<'_, Repository, B> { | ||
217 | 235 | extra_metadata: bool, |
218 | 236 | rev: Option<&str>, |
219 | 237 | path: Option<&str>, |
238 | operation_state: &S, | |
220 | 239 | ) -> Result<Vec<RepositoryTreeEntry>, OperationError<RepositoryError>> { |
221 | self.request::<RepositoryFileInspectRequest>(RepositoryFileInspectRequest { | |
222 | extra_metadata, | |
223 | rev: rev.map(|r| r.to_string()), | |
224 | path: path.map(|p| p.to_string()), | |
225 | }) | |
240 | self.request::<RepositoryFileInspectRequest>( | |
241 | RepositoryFileInspectRequest { | |
242 | extra_metadata, | |
243 | rev: rev.map(|r| r.to_string()), | |
244 | path: path.map(|p| p.to_string()), | |
245 | }, | |
246 | operation_state, | |
247 | ) | |
226 | 248 | .await |
227 | 249 | } |
228 | 250 | } |
giterated-models/src/user/operations.rs
@@ -22,15 +22,19 @@ impl GiteratedOperation<User> for UserRepositoriesRequest { | ||
22 | 22 | type Failure = UserError; |
23 | 23 | } |
24 | 24 | |
25 | impl<B: ObjectBackend + std::fmt::Debug> Object<'_, User, B> { | |
25 | impl<S: Clone + Send + Sync, B: ObjectBackend<S> + std::fmt::Debug> Object<'_, S, User, B> { | |
26 | 26 | pub async fn repositories( |
27 | 27 | &mut self, |
28 | 28 | instance: &Instance, |
29 | operation_state: &S, | |
29 | 30 | ) -> Result<Vec<RepositorySummary>, OperationError<UserError>> { |
30 | self.request::<UserRepositoriesRequest>(UserRepositoriesRequest { | |
31 | instance: instance.clone(), | |
32 | user: self.inner.clone(), | |
33 | }) | |
31 | self.request::<UserRepositoriesRequest>( | |
32 | UserRepositoriesRequest { | |
33 | instance: instance.clone(), | |
34 | user: self.inner.clone(), | |
35 | }, | |
36 | operation_state, | |
37 | ) | |
34 | 38 | .await |
35 | 39 | } |
36 | 40 | } |
giterated-stack/Cargo.toml
@@ -12,4 +12,5 @@ serde = { version = "1.0.188", features = [ "derive" ]} | ||
12 | 12 | serde_json = "1.0" |
13 | 13 | bincode = "*" |
14 | 14 | futures-util = "*" |
15 | tracing = "*" | |
15 | \ No newline at end of file | |
15 | tracing = "*" | |
16 | tokio = { version = "1.32.0", features = [ "full" ] } | |
16 | \ No newline at end of file |
giterated-stack/src/handler.rs
@@ -11,6 +11,8 @@ use tracing::warn; | ||
11 | 11 | |
12 | 12 | use crate::{state::HandlerState, OperationHandlers}; |
13 | 13 | |
14 | use crate::StackOperationState; | |
15 | ||
14 | 16 | #[derive(Clone)] |
15 | 17 | pub struct GiteratedBackend<S: HandlerState> { |
16 | 18 | state: S, |
@@ -24,15 +26,20 @@ impl<S: HandlerState> GiteratedBackend<S> { | ||
24 | 26 | handlers: Arc::new(handlers), |
25 | 27 | } |
26 | 28 | } |
29 | ||
30 | pub fn state(&self) -> &S { | |
31 | &self.state | |
32 | } | |
27 | 33 | } |
28 | 34 | |
29 | 35 | #[async_trait::async_trait] |
30 | impl<S: HandlerState> ObjectBackend for GiteratedBackend<S> { | |
36 | impl<S: HandlerState> ObjectBackend<StackOperationState> for GiteratedBackend<S> { | |
31 | 37 | async fn object_operation<O, D>( |
32 | 38 | &self, |
33 | 39 | object: O, |
34 | 40 | operation: &str, |
35 | 41 | payload: D, |
42 | operation_state: &StackOperationState, | |
36 | 43 | ) -> Result<D::Success, OperationError<D::Failure>> |
37 | 44 | where |
38 | 45 | O: GiteratedObject + Debug, |
@@ -50,6 +57,7 @@ impl<S: HandlerState> ObjectBackend for GiteratedBackend<S> { | ||
50 | 57 | AnyObject(object.clone()), |
51 | 58 | serde_json::from_value(serialized).unwrap(), |
52 | 59 | self.state.clone(), |
60 | &operation_state, | |
53 | 61 | ) |
54 | 62 | .await; |
55 | 63 | |
@@ -81,6 +89,7 @@ impl<S: HandlerState> ObjectBackend for GiteratedBackend<S> { | ||
81 | 89 | operation, |
82 | 90 | AnyOperation(serialized), |
83 | 91 | self.state.clone(), |
92 | &operation_state, | |
84 | 93 | ) |
85 | 94 | .await; |
86 | 95 | |
@@ -108,13 +117,15 @@ impl<S: HandlerState> ObjectBackend for GiteratedBackend<S> { | ||
108 | 117 | async fn get_object<O: GiteratedObject + Debug>( |
109 | 118 | &self, |
110 | 119 | object_str: &str, |
111 | ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> { | |
120 | operation_state: &StackOperationState, | |
121 | ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> { | |
112 | 122 | let raw_result = self |
113 | 123 | .handlers |
114 | 124 | .resolve_object( |
115 | 125 | AnyObject("giterated.dev".to_string()), |
116 | 126 | ObjectRequest(object_str.to_string()), |
117 | 127 | self.state.clone(), |
128 | operation_state, | |
118 | 129 | ) |
119 | 130 | .await; |
120 | 131 |
giterated-stack/src/lib.rs
@@ -7,12 +7,20 @@ use futures_util::FutureExt; | ||
7 | 7 | use giterated_models::{ |
8 | 8 | error::OperationError, |
9 | 9 | instance::Instance, |
10 | object::{AnyObject, GiteratedObject, ObjectRequest, ObjectResponse}, | |
10 | object::{ | |
11 | AnyObject, GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse, | |
12 | }, | |
13 | object_backend::ObjectBackend, | |
11 | 14 | operation::{AnyOperation, GiteratedOperation}, |
12 | 15 | repository::Repository, |
13 | 16 | user::User, |
14 | 17 | }; |
15 | use tracing::info; | |
18 | use handler::GiteratedBackend; | |
19 | use serde::{de::DeserializeOwned, Serialize}; | |
20 | use serde_json::Value; | |
21 | use state::HandlerState; | |
22 | use tokio::{sync::mpsc::channel, task::JoinHandle}; | |
23 | use tracing::{error, warn}; | |
16 | 24 | |
17 | 25 | #[derive(Clone, Debug, Hash, Eq, PartialEq)] |
18 | 26 | struct ObjectOperationPair { |
@@ -36,9 +44,10 @@ impl<S: Send + Sync + Clone> Default for OperationHandlers<S> { | ||
36 | 44 | |
37 | 45 | impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { |
38 | 46 | pub fn insert< |
47 | A, | |
39 | 48 | O: GiteratedObject + Send + Sync, |
40 | 49 | D: GiteratedOperation<O> + 'static, |
41 | H: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone, | |
50 | H: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone, | |
42 | 51 | >( |
43 | 52 | &mut self, |
44 | 53 | handler: H, |
@@ -83,6 +92,7 @@ impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | ||
83 | 92 | operation_name: &str, |
84 | 93 | operation: AnyOperation, |
85 | 94 | state: S, |
95 | operation_state: &StackOperationState, | |
86 | 96 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { |
87 | 97 | // TODO |
88 | 98 | let object = object.to_string(); |
@@ -111,6 +121,7 @@ impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | ||
111 | 121 | AnyObject(object.to_string()), |
112 | 122 | operation.clone(), |
113 | 123 | state.clone(), |
124 | operation_state, | |
114 | 125 | ) |
115 | 126 | .await |
116 | 127 | } else { |
@@ -123,6 +134,7 @@ impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | ||
123 | 134 | instance: AnyObject, |
124 | 135 | request: ObjectRequest, |
125 | 136 | state: S, |
137 | operation_state: &StackOperationState, | |
126 | 138 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { |
127 | 139 | for handler in self.get_object.iter() { |
128 | 140 | if let Ok(response) = handler |
@@ -130,6 +142,7 @@ impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | ||
130 | 142 | instance.clone(), |
131 | 143 | AnyOperation(serde_json::to_value(request.clone()).unwrap()), |
132 | 144 | state.clone(), |
145 | operation_state, | |
133 | 146 | ) |
134 | 147 | .await |
135 | 148 | { |
@@ -143,6 +156,7 @@ impl<S: Send + Sync + Clone + 'static> OperationHandlers<S> { | ||
143 | 156 | |
144 | 157 | #[async_trait::async_trait] |
145 | 158 | pub trait GiteratedOperationHandler< |
159 | L, | |
146 | 160 | O: GiteratedObject, |
147 | 161 | D: GiteratedOperation<O>, |
148 | 162 | S: Send + Sync + Clone, |
@@ -156,11 +170,12 @@ pub trait GiteratedOperationHandler< | ||
156 | 170 | object: &O, |
157 | 171 | operation: D, |
158 | 172 | state: S, |
173 | operation_state: &StackOperationState, | |
159 | 174 | ) -> Result<D::Success, OperationError<D::Failure>>; |
160 | 175 | } |
161 | 176 | |
162 | 177 | #[async_trait::async_trait] |
163 | impl<O, D, F, S> GiteratedOperationHandler<O, D, S> for F | |
178 | impl<O, D, F, S> GiteratedOperationHandler<(), O, D, S> for F | |
164 | 179 | where |
165 | 180 | F: FnMut( |
166 | 181 | &O, |
@@ -189,17 +204,106 @@ where | ||
189 | 204 | object: &O, |
190 | 205 | operation: D, |
191 | 206 | state: S, |
207 | _operation_state: &StackOperationState, | |
192 | 208 | ) -> Result<D::Success, OperationError<D::Failure>> { |
193 | 209 | self.clone()(object, operation, state).await |
194 | 210 | } |
195 | 211 | } |
196 | 212 | |
213 | #[async_trait::async_trait] | |
214 | impl<O, O1, D, F, S> GiteratedOperationHandler<(O1,), O, D, S> for F | |
215 | where | |
216 | F: FnMut( | |
217 | &O, | |
218 | D, | |
219 | S, | |
220 | O1, | |
221 | ) -> Pin< | |
222 | Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>, | |
223 | > + Send | |
224 | + Sync | |
225 | + Clone, | |
226 | O: GiteratedObject + Send + Sync, | |
227 | D: GiteratedOperation<O> + 'static, | |
228 | <D as GiteratedOperation<O>>::Failure: Send, | |
229 | S: Send + Sync + Clone + 'static, | |
230 | O1: FromOperationState, | |
231 | { | |
232 | fn operation_name(&self) -> &str { | |
233 | D::operation_name() | |
234 | } | |
235 | ||
236 | fn object_name(&self) -> &str { | |
237 | O::object_name() | |
238 | } | |
239 | ||
240 | async fn handle( | |
241 | &self, | |
242 | object: &O, | |
243 | operation: D, | |
244 | state: S, | |
245 | operation_state: &StackOperationState, | |
246 | ) -> Result<D::Success, OperationError<D::Failure>> { | |
247 | let o1 = O1::from_state(operation_state) | |
248 | .await | |
249 | .map_err(|e| OperationError::Internal(e.to_string()))?; | |
250 | self.clone()(object, operation, state, o1).await | |
251 | } | |
252 | } | |
253 | ||
254 | #[async_trait::async_trait] | |
255 | impl<O, O1, O2, D, F, S> GiteratedOperationHandler<(O1, O2), O, D, S> for F | |
256 | where | |
257 | F: FnMut( | |
258 | &O, | |
259 | D, | |
260 | S, | |
261 | O1, | |
262 | O2, | |
263 | ) -> Pin< | |
264 | Box<dyn Future<Output = Result<D::Success, OperationError<D::Failure>>> + Send>, | |
265 | > + Send | |
266 | + Sync | |
267 | + Clone, | |
268 | O: GiteratedObject + Send + Sync, | |
269 | D: GiteratedOperation<O> + 'static, | |
270 | <D as GiteratedOperation<O>>::Failure: Send, | |
271 | S: Send + Sync + Clone + 'static, | |
272 | O1: FromOperationState, | |
273 | O2: FromOperationState, | |
274 | { | |
275 | fn operation_name(&self) -> &str { | |
276 | D::operation_name() | |
277 | } | |
278 | ||
279 | fn object_name(&self) -> &str { | |
280 | O::object_name() | |
281 | } | |
282 | ||
283 | async fn handle( | |
284 | &self, | |
285 | object: &O, | |
286 | operation: D, | |
287 | state: S, | |
288 | operation_state: &StackOperationState, | |
289 | ) -> Result<D::Success, OperationError<D::Failure>> { | |
290 | let o1 = O1::from_state(operation_state) | |
291 | .await | |
292 | .map_err(|e| OperationError::Internal(e.to_string()))?; | |
293 | let o2 = O2::from_state(operation_state) | |
294 | .await | |
295 | .map_err(|e| OperationError::Internal(e.to_string()))?; | |
296 | self.clone()(object, operation, state, o1, o2).await | |
297 | } | |
298 | } | |
299 | ||
197 | 300 | pub struct OperationWrapper<S: Send + Sync + Clone> { |
198 | 301 | func: Box< |
199 | 302 | dyn Fn( |
200 | 303 | AnyObject, |
201 | 304 | AnyOperation, |
202 | 305 | S, |
306 | StackOperationState, | |
203 | 307 | ) |
204 | 308 | -> Pin<Box<dyn Future<Output = Result<Vec<u8>, OperationError<Vec<u8>>>> + Send>> |
205 | 309 | + Send |
@@ -210,15 +314,16 @@ pub struct OperationWrapper<S: Send + Sync + Clone> { | ||
210 | 314 | |
211 | 315 | impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> { |
212 | 316 | pub fn new< |
317 | A, | |
213 | 318 | O: GiteratedObject + Send + Sync, |
214 | 319 | D: GiteratedOperation<O> + 'static, |
215 | F: GiteratedOperationHandler<O, D, S> + Send + Sync + 'static + Clone, | |
320 | F: GiteratedOperationHandler<A, O, D, S> + Send + Sync + 'static + Clone, | |
216 | 321 | >( |
217 | 322 | handler: F, |
218 | 323 | ) -> Self { |
219 | 324 | let handler = Arc::new(Box::pin(handler)); |
220 | 325 | Self { |
221 | func: Box::new(move |any_object, any_operation, state| { | |
326 | func: Box::new(move |any_object, any_operation, state, operation_state| { | |
222 | 327 | let handler = handler.clone(); |
223 | 328 | async move { |
224 | 329 | let handler = handler.clone(); |
@@ -227,7 +332,9 @@ impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> { | ||
227 | 332 | let operation: D = serde_json::from_value(any_operation.0.clone()) |
228 | 333 | .map_err(|_| OperationError::Unhandled)?; |
229 | 334 | |
230 | let result = handler.handle(&object, operation, state).await; | |
335 | let result = handler | |
336 | .handle(&object, operation, state, &operation_state) | |
337 | .await; | |
231 | 338 | result |
232 | 339 | .map(|success| serde_json::to_vec(&success).unwrap()) |
233 | 340 | .map_err(|err| match err { |
@@ -251,7 +358,193 @@ impl<S: Send + Sync + Clone + 'static> OperationWrapper<S> { | ||
251 | 358 | object: AnyObject, |
252 | 359 | operation: AnyOperation, |
253 | 360 | state: S, |
361 | operation_state: &StackOperationState, | |
254 | 362 | ) -> Result<Vec<u8>, OperationError<Vec<u8>>> { |
255 | (self.func)(object, operation, state).await | |
363 | (self.func)(object, operation, state, operation_state.clone()).await | |
364 | } | |
365 | } | |
366 | ||
367 | #[async_trait::async_trait] | |
368 | pub trait FromOperationState: Sized + Clone + Send { | |
369 | type Error: Serialize + DeserializeOwned; | |
370 | ||
371 | async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<Self::Error>>; | |
372 | } | |
373 | ||
374 | #[async_trait::async_trait] | |
375 | impl FromOperationState for BackendWrapper { | |
376 | type Error = (); | |
377 | ||
378 | async fn from_state(state: &StackOperationState) -> Result<Self, OperationError<()>> { | |
379 | Ok(state.giterated_backend.clone()) | |
380 | } | |
381 | } | |
382 | ||
383 | #[async_trait::async_trait] | |
384 | impl FromOperationState for StackOperationState { | |
385 | type Error = (); | |
386 | ||
387 | async fn from_state( | |
388 | state: &StackOperationState, | |
389 | ) -> Result<StackOperationState, OperationError<()>> { | |
390 | Ok(state.clone()) | |
391 | } | |
392 | } | |
393 | ||
394 | #[derive(Clone)] | |
395 | pub struct StackOperationState { | |
396 | pub giterated_backend: BackendWrapper, | |
397 | } | |
398 | ||
399 | #[derive(Clone)] | |
400 | pub struct BackendWrapper { | |
401 | sender: tokio::sync::mpsc::Sender<( | |
402 | tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>, | |
403 | WrappedOperation, | |
404 | )>, | |
405 | task: Arc<JoinHandle<()>>, | |
406 | } | |
407 | ||
408 | pub struct WrappedOperation { | |
409 | object: AnyObject, | |
410 | operation_payload: AnyOperation, | |
411 | operation_name: String, | |
412 | state: StackOperationState, | |
413 | } | |
414 | ||
415 | impl BackendWrapper { | |
416 | pub fn new<S: HandlerState>(backend: GiteratedBackend<S>) -> Self { | |
417 | // Spawn listener task | |
418 | ||
419 | let (send, mut recv) = channel::<( | |
420 | tokio::sync::oneshot::Sender<Result<Value, OperationError<Value>>>, | |
421 | WrappedOperation, | |
422 | )>(1024); | |
423 | ||
424 | let task = tokio::spawn(async move { | |
425 | while let Some((responder, message)) = recv.recv().await { | |
426 | let raw_result = backend | |
427 | .object_operation( | |
428 | message.object, | |
429 | &message.operation_name, | |
430 | message.operation_payload, | |
431 | &message.state, | |
432 | ) | |
433 | .await; | |
434 | ||
435 | responder.send(raw_result).unwrap(); | |
436 | } | |
437 | error!("Error, thing's dead"); | |
438 | }); | |
439 | ||
440 | Self { | |
441 | sender: send, | |
442 | task: Arc::new(task), | |
443 | } | |
444 | } | |
445 | ||
446 | pub async fn call(&self, operation: WrappedOperation) -> Result<Value, OperationError<Value>> { | |
447 | let (sender, response) = tokio::sync::oneshot::channel(); | |
448 | ||
449 | self.sender | |
450 | .send((sender, operation)) | |
451 | .await | |
452 | .map_err(|e| OperationError::Internal(e.to_string()))?; | |
453 | ||
454 | match response.await { | |
455 | Ok(result) => Ok(result?), | |
456 | Err(err) => Err(OperationError::Internal(err.to_string())), | |
457 | } | |
458 | } | |
459 | } | |
460 | ||
461 | use std::fmt::Debug; | |
462 | ||
463 | #[async_trait::async_trait] | |
464 | impl ObjectBackend<StackOperationState> for BackendWrapper { | |
465 | async fn object_operation<O, D>( | |
466 | &self, | |
467 | object: O, | |
468 | operation: &str, | |
469 | payload: D, | |
470 | operation_state: &StackOperationState, | |
471 | ) -> Result<D::Success, OperationError<D::Failure>> | |
472 | where | |
473 | O: GiteratedObject + Debug, | |
474 | D: GiteratedOperation<O> + Debug, | |
475 | { | |
476 | let operation = WrappedOperation { | |
477 | object: AnyObject(object.to_string()), | |
478 | operation_name: operation.to_string(), | |
479 | operation_payload: AnyOperation(serde_json::to_value(payload).unwrap()), | |
480 | state: operation_state.clone(), | |
481 | }; | |
482 | ||
483 | let raw_result = self.call(operation).await; | |
484 | ||
485 | match raw_result { | |
486 | Ok(result) => Ok(serde_json::from_value(result) | |
487 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
488 | Err(err) => match err { | |
489 | OperationError::Internal(internal) => { | |
490 | warn!( | |
491 | "Internal Error: {:?}", | |
492 | OperationError::<()>::Internal(internal.clone()) | |
493 | ); | |
494 | ||
495 | Err(OperationError::Internal(internal)) | |
496 | } | |
497 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
498 | OperationError::Operation(err) => Err(OperationError::Operation( | |
499 | serde_json::from_value(err) | |
500 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
501 | )), | |
502 | }, | |
503 | } | |
504 | } | |
505 | ||
506 | async fn get_object<O: GiteratedObject + Debug>( | |
507 | &self, | |
508 | object_str: &str, | |
509 | operation_state: &StackOperationState, | |
510 | ) -> Result<Object<StackOperationState, O, Self>, OperationError<ObjectRequestError>> { | |
511 | let operation = WrappedOperation { | |
512 | object: AnyObject(object_str.to_string()), | |
513 | operation_name: ObjectRequest::operation_name().to_string(), | |
514 | operation_payload: AnyOperation( | |
515 | serde_json::to_value(ObjectRequest(object_str.to_string())).unwrap(), | |
516 | ), | |
517 | state: operation_state.clone(), | |
518 | }; | |
519 | ||
520 | let raw_result = self.call(operation).await; | |
521 | ||
522 | let object: ObjectResponse = match raw_result { | |
523 | Ok(result) => Ok(serde_json::from_value(result) | |
524 | .map_err(|e| OperationError::Internal(e.to_string()))?), | |
525 | Err(err) => match err { | |
526 | OperationError::Internal(internal) => { | |
527 | warn!( | |
528 | "Internal Error: {:?}", | |
529 | OperationError::<()>::Internal(internal.clone()) | |
530 | ); | |
531 | ||
532 | Err(OperationError::Internal(internal)) | |
533 | } | |
534 | OperationError::Unhandled => Err(OperationError::Unhandled), | |
535 | OperationError::Operation(err) => Err(OperationError::Operation( | |
536 | serde_json::from_value(err) | |
537 | .map_err(|e| OperationError::Internal(e.to_string()))?, | |
538 | )), | |
539 | }, | |
540 | }?; | |
541 | ||
542 | unsafe { | |
543 | Ok(Object::new_unchecked( | |
544 | O::from_str(&object.0) | |
545 | .map_err(|_| OperationError::Internal("deserialize failure".to_string()))?, | |
546 | self.clone(), | |
547 | )) | |
548 | } | |
256 | 549 | } |
257 | 550 | } |
giterated-stack/src/state.rs
@@ -1,3 +1,5 @@ | ||
1 | use std::any::Any; | |
2 | ||
1 | 3 | /// A type which can be passed into a stateful handler. |
2 | 4 | /// |
3 | 5 | /// # Trait Bounds |
@@ -7,6 +9,6 @@ | ||
7 | 9 | /// # Blanket Impl |
8 | 10 | /// This trait is blanket-impl'd on any type that meets the requirements. You do not need |
9 | 11 | /// to manually mark your state types with it. |
10 | pub trait HandlerState: Send + Sync + Clone + 'static {} | |
12 | pub trait HandlerState: Any + Send + Sync + Clone + 'static {} | |
11 | 13 | |
12 | 14 | impl<T> HandlerState for T where T: Send + Sync + Clone + 'static {} |