JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

Error handling fixes

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨5eaea71

⁨src/daemon_backend.rs⁩ - ⁨5051⁩ bytes
Raw
1 use std::{fmt::Debug, str::FromStr, sync::Arc};
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_models::{
5 authenticated::{Authenticated, AuthenticationSourceProviders},
6 error::OperationError,
7 error::{IntoInternalError, NetworkOperationError},
8 message::GiteratedMessage,
9 object::{GiteratedObject, Object, ObjectRequest, ObjectRequestError, ObjectResponse},
10 object_backend::ObjectBackend,
11 operation::GiteratedOperation,
12 };
13 use serde::de::DeserializeOwned;
14 use tokio_tungstenite::tungstenite::Message;
15
16 use crate::{DaemonConnectionPool, Socket};
17
18 #[derive(Clone)]
19 pub struct NetworkOperationState {
20 authentication: Vec<Arc<dyn AuthenticationSourceProviders + Send + Sync>>,
21 }
22
23 impl NetworkOperationState {
24 pub fn new() -> Self {
25 Self {
26 authentication: vec![],
27 }
28 }
29
30 pub fn authenticate<S: AuthenticationSourceProviders + Send + Sync + 'static>(
31 &mut self,
32 source: S,
33 ) {
34 self.authentication.push(Arc::new(source))
35 }
36 }
37
38 #[async_trait::async_trait(?Send)]
39 impl ObjectBackend<NetworkOperationState> for DaemonConnectionPool {
40 async fn object_operation<O: GiteratedObject + Debug, D: GiteratedOperation<O> + Debug>(
41 &self,
42 object: O,
43 operation: &str,
44 payload: D,
45 operation_state: &NetworkOperationState,
46 ) -> Result<D::Success, OperationError<D::Failure>> {
47 let message = GiteratedMessage {
48 object,
49 operation: operation.to_string(),
50 payload,
51 };
52
53 let mut connection = self.0.get().await.map_err(|e| {
54 OperationError::Internal(anyhow::Error::from(ConnectionFailed(e.to_string())))
55 })?;
56
57 let mut authenticated = Authenticated::new(message);
58 for authentication in &operation_state.authentication {
59 authenticated.append_authentication(authentication.clone());
60 }
61
62 send_expect(&mut connection, authenticated).await
63 }
64
65 async fn get_object<O: GiteratedObject + Debug>(
66 &self,
67 object_str: &str,
68 operation_state: &NetworkOperationState,
69 ) -> Result<Object<NetworkOperationState, O, Self>, OperationError<ObjectRequestError>> {
70 let operation = ObjectRequest(object_str.to_string());
71 info!("Get object: {:?}", operation);
72 let message = GiteratedMessage {
73 object: self.0.manager().target_instance.clone(),
74 operation: ObjectRequest::operation_name().to_string(),
75 payload: operation,
76 };
77
78 let mut connection = self.0.get().await.map_err(|e| {
79 OperationError::Internal(anyhow::Error::from(ConnectionFailed(e.to_string())))
80 })?;
81
82 let mut authenticated = Authenticated::new(message);
83 for authentication in &operation_state.authentication {
84 authenticated.append_authentication(authentication.clone());
85 }
86
87 let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?;
88
89 if let Ok(object) = O::from_str(&object_raw.0) {
90 Ok(unsafe { Object::new_unchecked(object, self.clone()) })
91 } else {
92 panic!()
93 }
94 }
95 }
96
97 async fn send_expect<
98 O: GiteratedObject,
99 D: GiteratedOperation<O>,
100 B: DeserializeOwned,
101 R: DeserializeOwned,
102 >(
103 socket: &mut Socket,
104 message: Authenticated<O, D>,
105 ) -> Result<R, OperationError<B>> {
106 let payload = bincode::serialize(&message.into_payload()).unwrap();
107
108 socket
109 .send(Message::Binary(payload))
110 .await
111 .as_internal_error()?;
112
113 while let Some(message) = socket.next().await {
114 let payload = match message.as_internal_error()? {
115 Message::Binary(payload) => payload,
116 _ => {
117 continue;
118 }
119 };
120
121 let raw_result =
122 bincode::deserialize::<Result<Vec<u8>, NetworkOperationError<Vec<u8>>>>(&payload)
123 .map_err(|e| OperationError::Internal(anyhow::Error::from(e)))?;
124
125 // Map ok
126 let raw_result = match raw_result {
127 Ok(raw) => Ok(serde_json::from_slice(&raw)
128 .map_err(|e| OperationError::Internal(anyhow::Error::from(e)))?),
129 Err(err) => Err(match err {
130 NetworkOperationError::Operation(err) => OperationError::Operation(
131 serde_json::from_slice(&err)
132 .map_err(|e| OperationError::Internal(anyhow::Error::from(e)))?,
133 ),
134 NetworkOperationError::Internal => {
135 OperationError::Internal(anyhow::Error::from(NetworkError))
136 }
137 NetworkOperationError::Unhandled => OperationError::Unhandled,
138 }),
139 };
140
141 return raw_result;
142 }
143
144 panic!()
145 }
146
147 #[derive(Debug, thiserror::Error)]
148 #[error("a remote internal error occurred")]
149 pub struct NetworkError;
150
151 #[derive(Debug, thiserror::Error)]
152 #[error("failed to get connection from pool: {0}")]
153 pub struct ConnectionFailed(String);
154