JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add logging for internal errors

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨d0fc0ed

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨8072⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 ops::Deref,
4 sync::{atomic::AtomicBool, Arc},
5 };
6
7 use anyhow::Error;
8 use futures_util::{SinkExt, StreamExt};
9
10 use giterated_models::{
11 authenticated::{AuthenticationSource, UserTokenMetadata},
12 error::OperationError,
13 instance::Instance,
14 };
15
16 use giterated_models::authenticated::AuthenticatedPayload;
17 use giterated_stack::{
18 AuthenticatedInstance, AuthenticatedUser, GiteratedStack, StackOperationState,
19 };
20 use jsonwebtoken::{DecodingKey, TokenData, Validation};
21 use rsa::{
22 pkcs1::DecodeRsaPublicKey,
23 pss::{Signature, VerifyingKey},
24 sha2::Sha256,
25 signature::Verifier,
26 RsaPublicKey,
27 };
28 use serde::Serialize;
29
30 use tokio::{net::TcpStream, sync::Mutex};
31 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
32 use toml::Table;
33
34 use crate::{
35 authentication::AuthenticationTokenGranter,
36 backend::{MetadataBackend, RepositoryBackend, UserBackend},
37 federation::connections::InstanceConnections,
38 keys::PublicKeyCache,
39 };
40
41 use super::Connections;
42
43 pub async fn connection_wrapper(
44 socket: WebSocketStream<TcpStream>,
45 connections: Arc<Mutex<Connections>>,
46 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
47 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
48 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
49 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
50 addr: SocketAddr,
51 instance: impl ToOwned<Owned = Instance>,
52 instance_connections: Arc<Mutex<InstanceConnections>>,
53 config: Table,
54 runtime: Arc<GiteratedStack>,
55 mut operation_state: StackOperationState,
56 ) {
57 let connection_state = ConnectionState {
58 socket: Arc::new(Mutex::new(socket)),
59 connections,
60 repository_backend,
61 user_backend,
62 auth_granter,
63 settings_backend,
64 addr,
65 instance: instance.to_owned(),
66 handshaked: Arc::new(AtomicBool::new(false)),
67 key_cache: Arc::default(),
68 instance_connections: instance_connections.clone(),
69 config,
70 };
71
72 let _handshaked = false;
73 let mut key_cache = PublicKeyCache::default();
74
75 loop {
76 let mut socket = connection_state.socket.lock().await;
77 let message = socket.next().await;
78 drop(socket);
79
80 match message {
81 Some(Ok(message)) => {
82 let payload = match message {
83 Message::Binary(payload) => payload,
84 Message::Ping(_) => {
85 let mut socket = connection_state.socket.lock().await;
86 let _ = socket.send(Message::Pong(vec![])).await;
87 drop(socket);
88 continue;
89 }
90 Message::Close(_) => return,
91 _ => continue,
92 };
93
94 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
95
96 // Get authentication
97 let instance = {
98 let mut verified_instance: Option<AuthenticatedInstance> = None;
99 for source in &message.source {
100 if let AuthenticationSource::Instance {
101 instance,
102 signature,
103 } = source
104 {
105 let public_key = key_cache.get(&instance).await.unwrap();
106 let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap();
107 let verifying_key = VerifyingKey::<Sha256>::new(public_key);
108
109 if verifying_key
110 .verify(
111 &message.payload,
112 &Signature::try_from(signature.as_ref()).unwrap(),
113 )
114 .is_ok()
115 {
116 verified_instance =
117 Some(AuthenticatedInstance::new(instance.clone()));
118
119 break;
120 }
121 }
122 }
123
124 verified_instance
125 };
126
127 let user = {
128 let mut verified_user = None;
129 if let Some(verified_instance) = &instance {
130 for source in &message.source {
131 if let AuthenticationSource::User { user, token } = source {
132 // Get token
133 let public_key = key_cache.get(&verified_instance).await.unwrap();
134
135 let token: TokenData<UserTokenMetadata> = jsonwebtoken::decode(
136 token.as_ref(),
137 &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(),
138 &Validation::new(jsonwebtoken::Algorithm::RS256),
139 )
140 .unwrap();
141
142 if token.claims.generated_for != *verified_instance.deref() {
143 // Nope!
144 break;
145 }
146
147 if token.claims.user != *user {
148 // Nope!
149 break;
150 }
151
152 verified_user = Some(AuthenticatedUser::new(user.clone()));
153 break;
154 }
155 }
156 }
157
158 verified_user
159 };
160
161 operation_state.user = user;
162 operation_state.instance = instance;
163
164 let result = runtime
165 .handle_network_message(message, &operation_state)
166 .await;
167
168 // Asking for exploits here
169 operation_state.user = None;
170 operation_state.instance = None;
171
172 if let Err(OperationError::Internal(internal_error)) = &result {
173 error!("An internal error has occured: {}", internal_error);
174 }
175
176 let mut socket = connection_state.socket.lock().await;
177 let _ = socket
178 .send(Message::Binary(bincode::serialize(&result).unwrap()))
179 .await;
180
181 drop(socket);
182 }
183 _ => {
184 return;
185 }
186 }
187 }
188 }
189
190 #[derive(Clone)]
191 pub struct ConnectionState {
192 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
193 pub connections: Arc<Mutex<Connections>>,
194 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
195 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
196 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
197 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
198 pub addr: SocketAddr,
199 pub instance: Instance,
200 pub handshaked: Arc<AtomicBool>,
201 pub key_cache: Arc<Mutex<PublicKeyCache>>,
202 pub instance_connections: Arc<Mutex<InstanceConnections>>,
203 pub config: Table,
204 }
205
206 impl ConnectionState {
207 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
208 let payload = serde_json::to_string(&message)?;
209 self.socket
210 .lock()
211 .await
212 .send(Message::Binary(payload.into_bytes()))
213 .await?;
214
215 Ok(())
216 }
217
218 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
219 let payload = serde_json::to_string(&message)?;
220 self.socket
221 .lock()
222 .await
223 .send(Message::Binary(payload.into_bytes()))
224 .await?;
225
226 Ok(())
227 }
228
229 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
230 let mut keys = self.key_cache.lock().await;
231 keys.get(instance).await
232 }
233 }
234