JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add support for networked GetSetting to Unified Stack refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨da6d78e

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨7864⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 ops::Deref,
4 sync::{atomic::AtomicBool, Arc},
5 };
6
7 use anyhow::Error;
8 use futures_util::{SinkExt, StreamExt};
9
10 use giterated_models::{
11 authenticated::{AuthenticationSource, UserTokenMetadata},
12 instance::Instance,
13 };
14
15 use giterated_models::authenticated::AuthenticatedPayload;
16 use giterated_stack::{
17 AuthenticatedInstance, AuthenticatedUser, GiteratedStack, StackOperationState,
18 };
19 use jsonwebtoken::{DecodingKey, TokenData, Validation};
20 use rsa::{
21 pkcs1::DecodeRsaPublicKey,
22 pss::{Signature, VerifyingKey},
23 sha2::Sha256,
24 signature::Verifier,
25 RsaPublicKey,
26 };
27 use serde::Serialize;
28
29 use tokio::{net::TcpStream, sync::Mutex};
30 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
31 use toml::Table;
32
33 use crate::{
34 authentication::AuthenticationTokenGranter,
35 backend::{MetadataBackend, RepositoryBackend, UserBackend},
36 federation::connections::InstanceConnections,
37 keys::PublicKeyCache,
38 };
39
40 use super::Connections;
41
42 pub async fn connection_wrapper(
43 socket: WebSocketStream<TcpStream>,
44 connections: Arc<Mutex<Connections>>,
45 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
46 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
47 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
48 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
49 addr: SocketAddr,
50 instance: impl ToOwned<Owned = Instance>,
51 instance_connections: Arc<Mutex<InstanceConnections>>,
52 config: Table,
53 runtime: Arc<GiteratedStack>,
54 mut operation_state: StackOperationState,
55 ) {
56 let connection_state = ConnectionState {
57 socket: Arc::new(Mutex::new(socket)),
58 connections,
59 repository_backend,
60 user_backend,
61 auth_granter,
62 settings_backend,
63 addr,
64 instance: instance.to_owned(),
65 handshaked: Arc::new(AtomicBool::new(false)),
66 key_cache: Arc::default(),
67 instance_connections: instance_connections.clone(),
68 config,
69 };
70
71 let _handshaked = false;
72 let mut key_cache = PublicKeyCache::default();
73
74 loop {
75 let mut socket = connection_state.socket.lock().await;
76 let message = socket.next().await;
77 drop(socket);
78
79 match message {
80 Some(Ok(message)) => {
81 let payload = match message {
82 Message::Binary(payload) => payload,
83 Message::Ping(_) => {
84 let mut socket = connection_state.socket.lock().await;
85 let _ = socket.send(Message::Pong(vec![])).await;
86 drop(socket);
87 continue;
88 }
89 Message::Close(_) => return,
90 _ => continue,
91 };
92
93 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
94
95 // Get authentication
96 let instance = {
97 let mut verified_instance: Option<AuthenticatedInstance> = None;
98 for source in &message.source {
99 if let AuthenticationSource::Instance {
100 instance,
101 signature,
102 } = source
103 {
104 let public_key = key_cache.get(&instance).await.unwrap();
105 let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap();
106 let verifying_key = VerifyingKey::<Sha256>::new(public_key);
107
108 if verifying_key
109 .verify(
110 &message.payload,
111 &Signature::try_from(signature.as_ref()).unwrap(),
112 )
113 .is_ok()
114 {
115 verified_instance =
116 Some(AuthenticatedInstance::new(instance.clone()));
117
118 break;
119 }
120 }
121 }
122
123 verified_instance
124 };
125
126 let user = {
127 let mut verified_user = None;
128 if let Some(verified_instance) = &instance {
129 for source in &message.source {
130 if let AuthenticationSource::User { user, token } = source {
131 // Get token
132 let public_key = key_cache.get(&verified_instance).await.unwrap();
133
134 let token: TokenData<UserTokenMetadata> = jsonwebtoken::decode(
135 token.as_ref(),
136 &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(),
137 &Validation::new(jsonwebtoken::Algorithm::RS256),
138 )
139 .unwrap();
140
141 if token.claims.generated_for != *verified_instance.deref() {
142 // Nope!
143 break;
144 }
145
146 if token.claims.user != *user {
147 // Nope!
148 break;
149 }
150
151 verified_user = Some(AuthenticatedUser::new(user.clone()));
152 break;
153 }
154 }
155 }
156
157 verified_user
158 };
159
160 operation_state.user = user;
161 operation_state.instance = instance;
162
163 let result = runtime
164 .handle_network_message(message, &operation_state)
165 .await;
166
167 // Asking for exploits here
168 operation_state.user = None;
169 operation_state.instance = None;
170
171 let mut socket = connection_state.socket.lock().await;
172 let _ = socket
173 .send(Message::Binary(bincode::serialize(&result).unwrap()))
174 .await;
175
176 drop(socket);
177 }
178 _ => {
179 return;
180 }
181 }
182 }
183 }
184
185 #[derive(Clone)]
186 pub struct ConnectionState {
187 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
188 pub connections: Arc<Mutex<Connections>>,
189 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
190 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
191 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
192 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
193 pub addr: SocketAddr,
194 pub instance: Instance,
195 pub handshaked: Arc<AtomicBool>,
196 pub key_cache: Arc<Mutex<PublicKeyCache>>,
197 pub instance_connections: Arc<Mutex<InstanceConnections>>,
198 pub config: Table,
199 }
200
201 impl ConnectionState {
202 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
203 let payload = serde_json::to_string(&message)?;
204 self.socket
205 .lock()
206 .await
207 .send(Message::Binary(payload.into_bytes()))
208 .await?;
209
210 Ok(())
211 }
212
213 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
214 let payload = serde_json::to_string(&message)?;
215 self.socket
216 .lock()
217 .await
218 .send(Message::Binary(payload.into_bytes()))
219 .await?;
220
221 Ok(())
222 }
223
224 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
225 let mut keys = self.key_cache.lock().await;
226 keys.get(instance).await
227 }
228 }
229