JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Unified stack `GetValue` implementation

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨325f5af

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨7766⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 ops::Deref,
4 sync::{atomic::AtomicBool, Arc},
5 };
6
7 use anyhow::Error;
8 use futures_util::{SinkExt, StreamExt};
9
10 use giterated_models::{
11 authenticated::{AuthenticationSource, UserTokenMetadata},
12 instance::Instance,
13 };
14
15 use giterated_models::authenticated::AuthenticatedPayload;
16 use giterated_stack::{
17 AuthenticatedInstance, AuthenticatedUser, GiteratedStack, StackOperationState,
18 };
19 use jsonwebtoken::{DecodingKey, TokenData, Validation};
20 use rsa::{
21 pkcs1::DecodeRsaPublicKey,
22 pss::{Signature, VerifyingKey},
23 sha2::Sha256,
24 signature::Verifier,
25 RsaPublicKey,
26 };
27 use serde::Serialize;
28
29 use tokio::{net::TcpStream, sync::Mutex};
30 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
31 use toml::Table;
32
33 use crate::{
34 authentication::AuthenticationTokenGranter,
35 backend::{MetadataBackend, RepositoryBackend, UserBackend},
36 federation::connections::InstanceConnections,
37 keys::PublicKeyCache,
38 };
39
40 use super::Connections;
41
42 pub async fn connection_wrapper(
43 socket: WebSocketStream<TcpStream>,
44 connections: Arc<Mutex<Connections>>,
45 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
46 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
47 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
48 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
49 addr: SocketAddr,
50 instance: impl ToOwned<Owned = Instance>,
51 instance_connections: Arc<Mutex<InstanceConnections>>,
52 config: Table,
53 runtime: Arc<GiteratedStack>,
54 mut operation_state: StackOperationState,
55 ) {
56 let connection_state = ConnectionState {
57 socket: Arc::new(Mutex::new(socket)),
58 connections,
59 repository_backend,
60 user_backend,
61 auth_granter,
62 settings_backend,
63 addr,
64 instance: instance.to_owned(),
65 handshaked: Arc::new(AtomicBool::new(false)),
66 key_cache: Arc::default(),
67 instance_connections: instance_connections.clone(),
68 config,
69 };
70
71 let _handshaked = false;
72 let mut key_cache = PublicKeyCache::default();
73
74 loop {
75 let mut socket = connection_state.socket.lock().await;
76 let message = socket.next().await;
77 drop(socket);
78
79 match message {
80 Some(Ok(message)) => {
81 let payload = match message {
82 Message::Binary(payload) => payload,
83 Message::Ping(_) => {
84 let mut socket = connection_state.socket.lock().await;
85 let _ = socket.send(Message::Pong(vec![])).await;
86 drop(socket);
87 continue;
88 }
89 Message::Close(_) => return,
90 _ => continue,
91 };
92
93 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
94
95 // Get authentication
96 let instance = {
97 let mut verified_instance: Option<AuthenticatedInstance> = None;
98 for source in &message.source {
99 if let AuthenticationSource::Instance {
100 instance,
101 signature,
102 } = source
103 {
104 let public_key = key_cache.get(&instance).await.unwrap();
105 let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap();
106 let verifying_key = VerifyingKey::<Sha256>::new(public_key);
107
108 if verifying_key
109 .verify(
110 &message.payload,
111 &Signature::try_from(signature.as_ref()).unwrap(),
112 )
113 .is_ok()
114 {
115 verified_instance =
116 Some(AuthenticatedInstance::new(instance.clone()));
117
118 break;
119 }
120 }
121 }
122
123 verified_instance
124 };
125
126 let _user = {
127 let mut verified_user = None;
128 if let Some(verified_instance) = &instance {
129 for source in &message.source {
130 if let AuthenticationSource::User { user, token } = source {
131 // Get token
132 let public_key = key_cache.get(&verified_instance).await.unwrap();
133
134 let token: TokenData<UserTokenMetadata> = jsonwebtoken::decode(
135 token.as_ref(),
136 &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(),
137 &Validation::new(jsonwebtoken::Algorithm::RS256),
138 )
139 .unwrap();
140
141 if token.claims.generated_for != *verified_instance.deref() {
142 // Nope!
143 break;
144 }
145
146 if token.claims.user != *user {
147 // Nope!
148 break;
149 }
150
151 verified_user = Some(AuthenticatedUser::new(user.clone()));
152 break;
153 }
154 }
155 }
156
157 verified_user
158 };
159
160 let result = runtime
161 .handle_network_message(message, &operation_state)
162 .await;
163
164 // Asking for exploits here
165 operation_state.user = None;
166 operation_state.instance = None;
167
168 let mut socket = connection_state.socket.lock().await;
169 let _ = socket
170 .send(Message::Binary(bincode::serialize(&result).unwrap()))
171 .await;
172
173 drop(socket);
174 }
175 _ => {
176 return;
177 }
178 }
179 }
180 }
181
182 #[derive(Clone)]
183 pub struct ConnectionState {
184 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
185 pub connections: Arc<Mutex<Connections>>,
186 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
187 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
188 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
189 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
190 pub addr: SocketAddr,
191 pub instance: Instance,
192 pub handshaked: Arc<AtomicBool>,
193 pub key_cache: Arc<Mutex<PublicKeyCache>>,
194 pub instance_connections: Arc<Mutex<InstanceConnections>>,
195 pub config: Table,
196 }
197
198 impl ConnectionState {
199 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
200 let payload = serde_json::to_string(&message)?;
201 self.socket
202 .lock()
203 .await
204 .send(Message::Binary(payload.into_bytes()))
205 .await?;
206
207 Ok(())
208 }
209
210 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
211 let payload = serde_json::to_string(&message)?;
212 self.socket
213 .lock()
214 .await
215 .send(Message::Binary(payload.into_bytes()))
216 .await?;
217
218 Ok(())
219 }
220
221 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
222 let mut keys = self.key_cache.lock().await;
223 keys.get(instance).await
224 }
225 }
226