JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

The long awaited, exhalted huge networking stack change.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨21b6a72

⁨giterated-protocol/src/substack.rs⁩ - ⁨15027⁩ bytes
Raw
1 use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
2
3 use futures_util::{sink::SinkExt, StreamExt};
4 use giterated_stack::{
5 models::{
6 Error, GiteratedObject, GiteratedOperation, Instance, IntoInternalError,
7 NetworkOperationError, OperationError,
8 },
9 AnyFailure, AnyObject, AnyOperation, AnySuccess, GiteratedStack, ObjectOperationPair,
10 OperationState, StackOperationState, SubstackBuilder,
11 };
12 use serde::{Deserialize, Serialize};
13 use tokio::net::TcpStream;
14 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
15 use tracing::{info, trace, warn};
16
17 use crate::{
18 AuthenticatedPayload, AuthenticationSourceProviders, GiteratedMessage, NetworkOperationState,
19 };
20
21 /// A Giterated substack that attempts to resolve with a remote, networked Giterated Daemon.
22 ///
23 /// # Usage
24 ///
25 /// Convert the [`NetworkedSubstack`] into a [`SubStackBuilder<NetworkedSubstack>`] and merge it with
26 /// a runtime.
27 ///
28 /// ```
29 /// let mut runtime = GiteratedStack::default();
30 ///
31 /// let network_substack = NetworkedSubstack::default();
32 ///
33 /// runtime.merge_builder(network_substack.into_substack());
34 /// ```
35 ///
36 /// To handle messages that are sourced from the network, use [`NetworkedObject`] and [`NetworkedOperation`].
37 ///
38 /// These are wrappers around the raw payloads from the network. The return payload from handling [`NetworkedOperation`] is then
39 /// sent back to the requester.
40 ///
41 /// ```
42 /// // Start with a network payload
43 /// let network_payload: AuthenticatedPayload = { todo!() };
44 ///
45 /// let networked_object = runtime.get_object::<NetworkedObject>(network_payload.object).await?;
46 /// let operation_name = payload.operation;
47 /// let networked_operation = NetworkedOperation(payload);
48 ///
49 /// // Operation state depends on the authentication in the payload, it
50 /// // isn't relevant here.
51 /// let operation_state = StackOperationState::default();
52 ///
53 /// let result = networked_object.request(networked_operation, &operation_state);
54 ///
55 /// // `result` is Result<Vec<u8>, OperationError<Vec<u8>> which is also the type that
56 /// // giterated's networked protocol uses for responses, so you can send it directly.
57 /// ```
58 ///
59 /// TODO: The above docs are 100% false about the network protocol type
60 #[derive(Clone)]
61 pub struct NetworkedSubstack {
62 pub home_uri: Option<String>,
63 }
64
65 impl Default for NetworkedSubstack {
66 fn default() -> Self {
67 Self { home_uri: None }
68 }
69 }
70
71 impl NetworkedSubstack {
72 pub fn into_server_substack(self) -> SubstackBuilder<Self, StackOperationState> {
73 let mut stack = SubstackBuilder::new(self);
74
75 stack.object::<NetworkedObject>();
76 stack.operation(handle_network_operation);
77
78 // TODO: optional
79 stack.dynamic_operation(try_handle_with_remote);
80
81 stack
82 }
83
84 pub fn into_client_substack(self) -> SubstackBuilder<Self, NetworkOperationState> {
85 let mut stack: SubstackBuilder<NetworkedSubstack, NetworkOperationState> =
86 SubstackBuilder::new(self);
87
88 stack.object::<NetworkedObject>();
89
90 // TODO: optional
91 stack.dynamic_operation(try_handle_with_remote);
92
93 stack
94 }
95 }
96
97 pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
98 object: NetworkedObject,
99 operation: NetworkedOperation,
100 _state: NetworkedSubstack,
101 OperationState(operation_state): OperationState<OS>,
102 stack: GiteratedStack<OS>,
103 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
104 trace!("Handle network operation {}", operation.name);
105 let mut result = None;
106
107 for (_, object_meta) in &stack.inner.metadata.objects {
108 if object_meta.name == NetworkedObject::object_name() {
109 continue;
110 }
111
112 if let Ok(object) = (object_meta.from_str)(&object.0) {
113 result = Some((object, object_meta));
114 break;
115 }
116 }
117
118 let (object, object_meta) = result.ok_or_else(|| OperationError::Unhandled)?;
119
120 trace!(
121 "Resolved object type {} for network operation {}.",
122 object_meta.name,
123 operation.name
124 );
125
126 let operation_meta = stack
127 .inner
128 .metadata
129 .operations
130 .get(&ObjectOperationPair {
131 object_name: &object_meta.name,
132 operation_name: &operation.name,
133 })
134 .ok_or_else(|| OperationError::Unhandled)?;
135
136 trace!(
137 "Resolved operation {}::{} for network operation.",
138 object_meta.name,
139 operation_meta.name
140 );
141
142 info!("Operation: {:?}", operation.payload);
143 let operation = (operation_meta.deserialize)(&operation.payload)
144 .as_internal_error_with_context(format!(
145 "deserializing object operation {}::{}",
146 object_meta.name, operation_meta.name
147 ))?;
148
149 trace!(
150 "Deserialized operation {}::{} for network operation.",
151 object_meta.name,
152 operation_meta.name
153 );
154
155 let result = stack
156 .new_operation_func(object, operation, operation_state)
157 .await;
158
159 match result {
160 Ok(success) => {
161 trace!(
162 "Network operation {}::{} was successful",
163 object_meta.name,
164 operation_meta.name
165 );
166
167 Ok(
168 (operation_meta.serialize_success)(success).as_internal_error_with_context(
169 format!(
170 "serializing success for object operation {}::{}",
171 object_meta.name, operation_meta.name
172 ),
173 )?,
174 )
175 }
176 Err(err) => {
177 trace!(
178 "Network operation {}::{} failed",
179 object_meta.name,
180 operation_meta.name
181 );
182 Err(match err {
183 OperationError::Operation(failure) => OperationError::Operation(
184 (operation_meta.serialize_error)(failure).as_internal_error_with_context(
185 format!(
186 "serializing error for object operation {}::{}",
187 object_meta.name, operation_meta.name
188 ),
189 )?,
190 ),
191 OperationError::Internal(internal) => {
192 warn!(
193 "A networked operation encountered an internal error: {:#?}",
194 internal
195 );
196
197 OperationError::Internal(internal)
198 }
199 OperationError::Unhandled => OperationError::Unhandled,
200 })
201 }
202 }
203 }
204
205 #[derive(Clone, Debug, Serialize, Deserialize)]
206 pub struct NetworkedObject(pub String);
207
208 impl FromStr for NetworkedObject {
209 type Err = ();
210
211 fn from_str(s: &str) -> Result<Self, Self::Err> {
212 Ok(NetworkedObject(s.to_string()))
213 }
214 }
215
216 impl Display for NetworkedObject {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 f.write_str(&self.0)
219 }
220 }
221
222 impl GiteratedObject for NetworkedObject {
223 fn object_name() -> &'static str {
224 "networked_object"
225 }
226
227 fn from_object_str(_object_str: &str) -> Result<Self, Error> {
228 todo!()
229 }
230
231 fn home_uri(&self) -> String {
232 todo!()
233 }
234 }
235
236 #[derive(Clone, Debug, Serialize, Deserialize)]
237 pub struct NetworkedOperation {
238 pub name: String,
239 pub payload: Vec<u8>,
240 }
241
242 impl NetworkedOperation {
243 pub fn new(name: String, payload: Vec<u8>) -> Self {
244 Self { name, payload }
245 }
246 }
247
248 impl GiteratedOperation<NetworkedObject> for NetworkedOperation {
249 type Success = Vec<u8>;
250
251 type Failure = Vec<u8>;
252
253 fn operation_name() -> &'static str {
254 "networked_operation"
255 }
256 }
257
258 /// Handler which will attempt to resolve any operation that doesn't resolve locally
259 /// against a remote instance.
260 pub async fn try_handle_with_remote<OS>(
261 object: AnyObject,
262 operation: AnyOperation,
263 state: NetworkedSubstack,
264 _stack: GiteratedStack<OS>,
265 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
266 if object.is::<NetworkedObject>() {
267 return Err(OperationError::Unhandled);
268 }
269 trace!(
270 "Try handling object operation {}::{} with remote",
271 object.kind(),
272 operation.kind().operation_name
273 );
274 // TODO:
275 // Ideally we support pass-through on object types that aren't used locally.
276 // For now, we aren't worrying about that.
277 let object_meta = object.meta().clone();
278
279 let operation_meta = operation.meta().clone();
280
281 trace!(
282 "Serializing with {}::{}",
283 operation.kind().object_name,
284 operation.kind().operation_name
285 );
286
287 let object_home_uri = (object_meta.home_uri)(object.clone());
288
289 if let Some(home_uri) = state.home_uri {
290 if home_uri == object_home_uri {
291 // This isn't a remote request, requests aren't supposed to hit this layer
292 // if they're not remote.
293 warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(),
294 operation.kind().operation_name);
295
296 return Err(OperationError::Unhandled);
297 }
298 }
299
300 trace!(
301 "Handling object operation {}::{} sending payload",
302 object.kind(),
303 operation.kind().operation_name
304 );
305
306 let object = NetworkedObject((object_meta.to_str)(object).to_string());
307 let operation = NetworkedOperation::new(
308 operation_meta.name.clone(),
309 (operation_meta.serialize)(operation.clone()).as_internal_error_with_context(format!(
310 "try serializing object operation {}::{} for remote",
311 object_meta.name, operation_meta.name
312 ))?,
313 );
314
315 // let authenticated = Authenticated::new(object, operation);
316
317 let message = GiteratedMessage {
318 object,
319 operation: NetworkedOperation::operation_name().to_string(),
320 payload: operation,
321 };
322
323 let authenticated = Authenticated::new(message);
324
325 let mut socket: WebSocketStream<MaybeTlsStream<TcpStream>> = connect_to(
326 &Instance::from_str(&object_home_uri).unwrap(),
327 &Some(("127.0.0.1:1111").parse().unwrap()),
328 )
329 .await
330 .as_internal_error()?;
331
332 // TODO AUTH
333
334 let result: Result<Vec<u8>, OperationError<Vec<u8>>> =
335 send_expect(&mut socket, authenticated).await;
336
337 match result {
338 Ok(success) => {
339 let success = (operation_meta.deserialize_success)(success)
340 .as_internal_error_with_context(format!(
341 "try deserializing object operation success {}::{} for remote",
342 object_meta.name, operation_meta.name
343 ))?;
344
345 Ok(success)
346 }
347 Err(err) => Err(match err {
348 OperationError::Operation(failure) => {
349 let failure = (operation_meta.deserialize_failure)(failure)
350 .as_internal_error_with_context(format!(
351 "try deserializing object operation failure {}::{} for remote",
352 object_meta.name, operation_meta.name
353 ))?;
354
355 OperationError::Operation(failure)
356 }
357 OperationError::Internal(internal) => OperationError::Internal(internal),
358 OperationError::Unhandled => OperationError::Unhandled,
359 }),
360 }
361 }
362
363 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
364
365 async fn connect_to(
366 instance: &Instance,
367
368 socket_addr: &Option<SocketAddr>,
369 ) -> Result<Socket, Error> {
370 if let Some(addr) = socket_addr {
371 info!(
372 "Connecting to {}",
373 format!("ws://{}/.giterated/daemon/", addr)
374 );
375
376 let (websocket, _response) =
377 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
378
379 info!("Connection established with {}", addr);
380
381 Ok(websocket)
382 } else {
383 info!(
384 "Connecting to {}",
385 format!("wss://{}/.giterated/daemon/", instance.0)
386 );
387
388 let (websocket, _response) =
389 connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?;
390
391 info!("Connection established with {}", instance.0);
392
393 Ok(websocket)
394 }
395 }
396
397 async fn send_expect<O: GiteratedObject, D: GiteratedOperation<O>>(
398 socket: &mut Socket,
399 message: Authenticated<O, D>,
400 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
401 let payload = bincode::serialize(&message.into_payload()).unwrap();
402
403 socket
404 .send(Message::Binary(payload))
405 .await
406 .as_internal_error()?;
407
408 while let Some(message) = socket.next().await {
409 let payload = match message.as_internal_error()? {
410 Message::Binary(payload) => payload,
411
412 _ => {
413 continue;
414 }
415 };
416
417 let raw_result =
418 bincode::deserialize::<Result<Vec<u8>, NetworkOperationError<Vec<u8>>>>(&payload)
419 .map_err(|e| OperationError::Internal(Error::from(e)))?;
420
421 trace!(
422 "Received response for networked operation {}::{}.",
423 O::object_name(),
424 D::operation_name()
425 );
426
427 return match raw_result {
428 Ok(success) => Ok(success),
429 Err(err) => Err(match err {
430 NetworkOperationError::Operation(operation_error) => {
431 OperationError::Operation(operation_error)
432 }
433 NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()),
434 NetworkOperationError::Unhandled => OperationError::Unhandled,
435 }),
436 };
437 }
438
439 panic!()
440 }
441
442 #[derive(Debug, Clone, thiserror::Error)]
443 #[error("a remote internal error occurred")]
444 pub struct RemoteError;
445
446 #[derive(Debug, thiserror::Error)]
447 #[error("a remote internal error occurred")]
448
449 pub struct NetworkError;
450
451 #[derive(Debug)]
452 pub struct Authenticated<O: GiteratedObject, D: GiteratedOperation<O>> {
453 pub source: Vec<Arc<dyn AuthenticationSourceProviders + Send + Sync>>,
454 pub message: GiteratedMessage<O, D>,
455 }
456
457 impl<O: GiteratedObject, D: GiteratedOperation<O>> Authenticated<O, D> {
458 pub fn new(message: GiteratedMessage<O, D>) -> Self {
459 Self {
460 source: vec![],
461 message,
462 }
463 }
464
465 pub fn append_authentication(
466 &mut self,
467 authentication: Arc<dyn AuthenticationSourceProviders + Send + Sync>,
468 ) {
469 self.source.push(authentication);
470 }
471
472 pub fn into_payload(mut self) -> AuthenticatedPayload {
473 let payload = serde_json::to_vec(&self.message.payload).unwrap();
474
475 AuthenticatedPayload {
476 object: self.message.object.to_string(),
477 operation: self.message.operation,
478 source: self
479 .source
480 .drain(..)
481 .map(|provider| provider.as_ref().authenticate_all(&payload))
482 .flatten()
483 .collect::<Vec<_>>(),
484 payload,
485 }
486 }
487 }
488