JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Error handling refactor

This refactor aims to improve error handling throughout the project by refining the overarching error types and increasing usage of proper error handling. Replaced existing networked operation error with `NetworkOperationError`. `NetworkOperationError` does not forward any internal error details, which allows `OperationError` to grow into a better error type. `OperationError` now has support for storing real typed errors inside of it for better debugging. `IntoInternalError` is a trait which allows for easy conversion of error types into `OperationError::internal`.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨e02c03d

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨8196⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 ops::Deref,
4 sync::{atomic::AtomicBool, Arc},
5 };
6
7 use anyhow::Error;
8 use futures_util::{SinkExt, StreamExt};
9
10 use giterated_models::{
11 authenticated::{AuthenticationSource, UserTokenMetadata},
12 error::OperationError,
13 instance::Instance,
14 };
15
16 use giterated_models::authenticated::AuthenticatedPayload;
17 use giterated_stack::{
18 AuthenticatedInstance, AuthenticatedUser, GiteratedStack, StackOperationState,
19 };
20 use jsonwebtoken::{DecodingKey, TokenData, Validation};
21 use rsa::{
22 pkcs1::DecodeRsaPublicKey,
23 pss::{Signature, VerifyingKey},
24 sha2::Sha256,
25 signature::Verifier,
26 RsaPublicKey,
27 };
28 use serde::Serialize;
29
30 use tokio::{net::TcpStream, sync::Mutex};
31 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
32 use toml::Table;
33
34 use crate::{
35 authentication::AuthenticationTokenGranter,
36 backend::{MetadataBackend, RepositoryBackend, UserBackend},
37 federation::connections::InstanceConnections,
38 keys::PublicKeyCache,
39 };
40
41 use super::Connections;
42
43 pub async fn connection_wrapper(
44 socket: WebSocketStream<TcpStream>,
45 connections: Arc<Mutex<Connections>>,
46 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
47 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
48 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
49 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
50 addr: SocketAddr,
51 instance: impl ToOwned<Owned = Instance>,
52 instance_connections: Arc<Mutex<InstanceConnections>>,
53 config: Table,
54 runtime: Arc<GiteratedStack>,
55 mut operation_state: StackOperationState,
56 ) {
57 let connection_state = ConnectionState {
58 socket: Arc::new(Mutex::new(socket)),
59 connections,
60 repository_backend,
61 user_backend,
62 auth_granter,
63 settings_backend,
64 addr,
65 instance: instance.to_owned(),
66 handshaked: Arc::new(AtomicBool::new(false)),
67 key_cache: Arc::default(),
68 instance_connections: instance_connections.clone(),
69 config,
70 };
71
72 let _handshaked = false;
73 let mut key_cache = PublicKeyCache::default();
74
75 loop {
76 let mut socket = connection_state.socket.lock().await;
77 let message = socket.next().await;
78 drop(socket);
79
80 match message {
81 Some(Ok(message)) => {
82 let payload = match message {
83 Message::Binary(payload) => payload,
84 Message::Ping(_) => {
85 let mut socket = connection_state.socket.lock().await;
86 let _ = socket.send(Message::Pong(vec![])).await;
87 drop(socket);
88 continue;
89 }
90 Message::Close(_) => return,
91 _ => continue,
92 };
93
94 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
95
96 // Get authentication
97 let instance = {
98 let mut verified_instance: Option<AuthenticatedInstance> = None;
99 for source in &message.source {
100 if let AuthenticationSource::Instance {
101 instance,
102 signature,
103 } = source
104 {
105 let public_key = key_cache.get(&instance).await.unwrap();
106 let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap();
107 let verifying_key = VerifyingKey::<Sha256>::new(public_key);
108
109 if verifying_key
110 .verify(
111 &message.payload,
112 &Signature::try_from(signature.as_ref()).unwrap(),
113 )
114 .is_ok()
115 {
116 verified_instance =
117 Some(AuthenticatedInstance::new(instance.clone()));
118
119 break;
120 }
121 }
122 }
123
124 verified_instance
125 };
126
127 let user = {
128 let mut verified_user = None;
129 if let Some(verified_instance) = &instance {
130 for source in &message.source {
131 if let AuthenticationSource::User { user, token } = source {
132 // Get token
133 let public_key = key_cache.get(&verified_instance).await.unwrap();
134
135 let token: TokenData<UserTokenMetadata> = jsonwebtoken::decode(
136 token.as_ref(),
137 &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(),
138 &Validation::new(jsonwebtoken::Algorithm::RS256),
139 )
140 .unwrap();
141
142 if token.claims.generated_for != *verified_instance.deref() {
143 // Nope!
144 break;
145 }
146
147 if token.claims.user != *user {
148 // Nope!
149 break;
150 }
151
152 verified_user = Some(AuthenticatedUser::new(user.clone()));
153 break;
154 }
155 }
156 }
157
158 verified_user
159 };
160
161 operation_state.user = user;
162 operation_state.instance = instance;
163
164 let result = runtime
165 .handle_network_message(message, &operation_state)
166 .await;
167
168 // Asking for exploits here
169 operation_state.user = None;
170 operation_state.instance = None;
171
172 if let Err(OperationError::Internal(internal_error)) = &result {
173 error!("An internal error has occurred:\n{:?}", internal_error);
174 }
175
176 // Map error to the network variant
177 let result = result.map_err(|e| e.into_network());
178
179 let mut socket = connection_state.socket.lock().await;
180 let _ = socket
181 .send(Message::Binary(bincode::serialize(&result).unwrap()))
182 .await;
183
184 drop(socket);
185 }
186 _ => {
187 return;
188 }
189 }
190 }
191 }
192
193 #[derive(Clone)]
194 pub struct ConnectionState {
195 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
196 pub connections: Arc<Mutex<Connections>>,
197 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
198 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
199 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
200 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
201 pub addr: SocketAddr,
202 pub instance: Instance,
203 pub handshaked: Arc<AtomicBool>,
204 pub key_cache: Arc<Mutex<PublicKeyCache>>,
205 pub instance_connections: Arc<Mutex<InstanceConnections>>,
206 pub config: Table,
207 }
208
209 impl ConnectionState {
210 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
211 let payload = serde_json::to_string(&message)?;
212 self.socket
213 .lock()
214 .await
215 .send(Message::Binary(payload.into_bytes()))
216 .await?;
217
218 Ok(())
219 }
220
221 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
222 let payload = serde_json::to_string(&message)?;
223 self.socket
224 .lock()
225 .await
226 .send(Message::Binary(payload.into_bytes()))
227 .await?;
228
229 Ok(())
230 }
231
232 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
233 let mut keys = self.key_cache.lock().await;
234 keys.get(instance).await
235 }
236 }
237