JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Unified stack refactor clean up

Clean up obsolete code and some warnings

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨356f714

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨8901⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 ops::Deref,
4 sync::{atomic::AtomicBool, Arc},
5 };
6
7 use anyhow::Error;
8 use futures_util::{SinkExt, StreamExt};
9
10 use giterated_models::{
11 authenticated::{AuthenticationSource, UserTokenMetadata},
12 error::OperationError,
13 instance::Instance,
14 };
15
16 use giterated_models::object_backend::ObjectBackend;
17
18 use giterated_models::{
19 authenticated::AuthenticatedPayload, message::GiteratedMessage, object::AnyObject,
20 operation::AnyOperation,
21 };
22 use giterated_stack::{
23 runtime::GiteratedRuntime, AuthenticatedInstance, AuthenticatedUser, StackOperationState,
24 };
25 use jsonwebtoken::{DecodingKey, TokenData, Validation};
26 use rsa::{
27 pkcs1::DecodeRsaPublicKey,
28 pss::{Signature, VerifyingKey},
29 sha2::Sha256,
30 signature::Verifier,
31 RsaPublicKey,
32 };
33 use serde::Serialize;
34
35 use tokio::{net::TcpStream, sync::Mutex};
36 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
37 use toml::Table;
38
39 use crate::{
40 authentication::AuthenticationTokenGranter,
41 backend::{MetadataBackend, RepositoryBackend, UserBackend},
42 federation::connections::InstanceConnections,
43 keys::PublicKeyCache,
44 };
45
46 use super::Connections;
47
48 pub async fn connection_wrapper(
49 socket: WebSocketStream<TcpStream>,
50 connections: Arc<Mutex<Connections>>,
51 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
52 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
53 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
54 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
55 addr: SocketAddr,
56 instance: impl ToOwned<Owned = Instance>,
57 instance_connections: Arc<Mutex<InstanceConnections>>,
58 config: Table,
59 runtime: Arc<GiteratedRuntime>,
60 mut operation_state: StackOperationState,
61 ) {
62 let connection_state = ConnectionState {
63 socket: Arc::new(Mutex::new(socket)),
64 connections,
65 repository_backend,
66 user_backend,
67 auth_granter,
68 settings_backend,
69 addr,
70 instance: instance.to_owned(),
71 handshaked: Arc::new(AtomicBool::new(false)),
72 key_cache: Arc::default(),
73 instance_connections: instance_connections.clone(),
74 config,
75 };
76
77 let _handshaked = false;
78 let mut key_cache = PublicKeyCache::default();
79
80 loop {
81 let mut socket = connection_state.socket.lock().await;
82 let message = socket.next().await;
83 drop(socket);
84
85 match message {
86 Some(Ok(message)) => {
87 let payload = match message {
88 Message::Binary(payload) => payload,
89 Message::Ping(_) => {
90 let mut socket = connection_state.socket.lock().await;
91 let _ = socket.send(Message::Pong(vec![])).await;
92 drop(socket);
93 continue;
94 }
95 Message::Close(_) => return,
96 _ => continue,
97 };
98
99 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
100
101 // Get authentication
102 let instance = {
103 let mut verified_instance: Option<AuthenticatedInstance> = None;
104 for source in &message.source {
105 if let AuthenticationSource::Instance {
106 instance,
107 signature,
108 } = source
109 {
110 let public_key = key_cache.get(&instance).await.unwrap();
111 let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap();
112 let verifying_key = VerifyingKey::<Sha256>::new(public_key);
113
114 if verifying_key
115 .verify(
116 &message.payload,
117 &Signature::try_from(signature.as_ref()).unwrap(),
118 )
119 .is_ok()
120 {
121 verified_instance =
122 Some(AuthenticatedInstance::new(instance.clone()));
123
124 break;
125 }
126 }
127 }
128
129 verified_instance
130 };
131
132 let user = {
133 let mut verified_user = None;
134 if let Some(verified_instance) = &instance {
135 for source in &message.source {
136 if let AuthenticationSource::User { user, token } = source {
137 // Get token
138 let public_key = key_cache.get(&verified_instance).await.unwrap();
139
140 let token: TokenData<UserTokenMetadata> = jsonwebtoken::decode(
141 token.as_ref(),
142 &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(),
143 &Validation::new(jsonwebtoken::Algorithm::RS256),
144 )
145 .unwrap();
146
147 if token.claims.generated_for != *verified_instance.deref() {
148 // Nope!
149 break;
150 }
151
152 if token.claims.user != *user {
153 // Nope!
154 break;
155 }
156
157 verified_user = Some(AuthenticatedUser::new(user.clone()));
158 break;
159 }
160 }
161 }
162
163 verified_user
164 };
165
166 let message: GiteratedMessage<AnyObject, AnyOperation> = message.into_message();
167
168 operation_state.user = user;
169 operation_state.instance = instance;
170
171 let result = runtime
172 .object_operation(
173 message.object,
174 &message.operation,
175 message.payload,
176 &operation_state,
177 )
178 .await;
179
180 // Asking for exploits here
181 operation_state.user = None;
182 operation_state.instance = None;
183
184 // Map result to Vec<u8> on both
185 let result = match result {
186 Ok(result) => Ok(serde_json::to_vec(&result).unwrap()),
187 Err(err) => Err(match err {
188 OperationError::Operation(err) => {
189 OperationError::Operation(serde_json::to_vec(&err).unwrap())
190 }
191 OperationError::Internal(err) => OperationError::Internal(err),
192 OperationError::Unhandled => OperationError::Unhandled,
193 }),
194 };
195
196 let mut socket = connection_state.socket.lock().await;
197 let _ = socket
198 .send(Message::Binary(bincode::serialize(&result).unwrap()))
199 .await;
200
201 drop(socket);
202 }
203 _ => {
204 return;
205 }
206 }
207 }
208 }
209
210 #[derive(Clone)]
211 pub struct ConnectionState {
212 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
213 pub connections: Arc<Mutex<Connections>>,
214 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
215 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
216 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
217 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
218 pub addr: SocketAddr,
219 pub instance: Instance,
220 pub handshaked: Arc<AtomicBool>,
221 pub key_cache: Arc<Mutex<PublicKeyCache>>,
222 pub instance_connections: Arc<Mutex<InstanceConnections>>,
223 pub config: Table,
224 }
225
226 impl ConnectionState {
227 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
228 let payload = serde_json::to_string(&message)?;
229 self.socket
230 .lock()
231 .await
232 .send(Message::Binary(payload.into_bytes()))
233 .await?;
234
235 Ok(())
236 }
237
238 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
239 let payload = serde_json::to_string(&message)?;
240 self.socket
241 .lock()
242 .await
243 .send(Message::Binary(payload.into_bytes()))
244 .await?;
245
246 Ok(())
247 }
248
249 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
250 let mut keys = self.key_cache.lock().await;
251 keys.get(instance).await
252 }
253 }
254