JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Giterated Stack `ObjectValue` and `Setting` refactor.

This refactor adds value and setting update events, as well as value getters. Additionally, the stack is now the owner of the ability to write settings into storage. This is accomplished with the `MetadataProvider` trait. This sets up the ground work for push federation, cache, and basically everything else. commit 7befc583cb3e0c6719506c550ed66ac76293413c Author: Amber <[email protected]> Date: Fri Sep 29 15:46:48 2023 -0500 Finish value and settings refactor in the stack. commit 3ac09994a0caafd1a0b95d9a781c7f202f20e75b Author: Amber <[email protected]> Date: Fri Sep 29 09:46:32 2023 -0500 Add set_setting handling back in commit 84fd31e3eae85d98fa68a28b333dbb32cde3bdb8 Author: Amber <[email protected]> Date: Wed Sep 27 06:36:31 2023 -0500 Remove some allocations from meta types commit 16c310ce3680c4a14ed35083b6a230aaecd43152 Author: Amber <[email protected]> Date: Wed Sep 27 05:35:03 2023 -0500 Add cargo metadata commit eb2520a20001bac7b21c6c3d34f62db32f0ada80 Author: Amber <[email protected]> Date: Wed Sep 27 05:26:27 2023 -0500 Refactor setting and value management to use the unified stack. Allows for tight management, inspection, and eventing of setting and value management. commit 901fe103da0fce4b40f33b0a8b64404049ae03cf Author: Amber <[email protected]> Date: Wed Sep 27 02:38:33 2023 -0500 Set up ground work for value / settings refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨c377e4d

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨8194⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 ops::Deref,
4 sync::{atomic::AtomicBool, Arc},
5 };
6
7 use anyhow::Error;
8 use futures_util::{SinkExt, StreamExt};
9
10 use giterated_models::{
11 authenticated::{AuthenticationSource, UserTokenMetadata},
12 error::OperationError,
13 instance::Instance,
14 };
15
16 use giterated_models::authenticated::AuthenticatedPayload;
17 use giterated_stack::{
18 AuthenticatedInstance, AuthenticatedUser, GiteratedStack, StackOperationState,
19 };
20 use jsonwebtoken::{DecodingKey, TokenData, Validation};
21 use rsa::{
22 pkcs1::DecodeRsaPublicKey,
23 pss::{Signature, VerifyingKey},
24 sha2::Sha256,
25 signature::Verifier,
26 RsaPublicKey,
27 };
28 use serde::Serialize;
29
30 use tokio::{net::TcpStream, sync::Mutex};
31 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
32 use toml::Table;
33
34 use crate::{
35 authentication::AuthenticationTokenGranter,
36 backend::{MetadataBackend, RepositoryBackend, UserBackend},
37 federation::connections::InstanceConnections,
38 keys::PublicKeyCache,
39 };
40
41 use super::Connections;
42
43 pub async fn connection_wrapper(
44 socket: WebSocketStream<TcpStream>,
45 connections: Arc<Mutex<Connections>>,
46 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
47 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
48 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
49 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
50 addr: SocketAddr,
51 instance: impl ToOwned<Owned = Instance>,
52 instance_connections: Arc<Mutex<InstanceConnections>>,
53 config: Table,
54 runtime: Arc<GiteratedStack>,
55 mut operation_state: StackOperationState,
56 ) {
57 let connection_state = ConnectionState {
58 socket: Arc::new(Mutex::new(socket)),
59 connections,
60 repository_backend,
61 user_backend,
62 auth_granter,
63 settings_backend,
64 addr,
65 instance: instance.to_owned(),
66 handshaked: Arc::new(AtomicBool::new(false)),
67 key_cache: Arc::default(),
68 instance_connections: instance_connections.clone(),
69 config,
70 };
71
72 let _handshaked = false;
73 let mut key_cache = PublicKeyCache::default();
74
75 loop {
76 let mut socket = connection_state.socket.lock().await;
77 let message = socket.next().await;
78 drop(socket);
79
80 match message {
81 Some(Ok(message)) => {
82 let payload = match message {
83 Message::Binary(payload) => payload,
84 Message::Ping(_) => {
85 let mut socket = connection_state.socket.lock().await;
86 let _ = socket.send(Message::Pong(vec![])).await;
87 drop(socket);
88 continue;
89 }
90 Message::Close(_) => return,
91 _ => continue,
92 };
93
94 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
95
96 // Get authentication
97 let instance = {
98 let mut verified_instance: Option<AuthenticatedInstance> = None;
99 for source in &message.source {
100 if let AuthenticationSource::Instance {
101 instance,
102 signature,
103 } = source
104 {
105 let public_key = key_cache.get(instance).await.unwrap();
106 let public_key = RsaPublicKey::from_pkcs1_pem(&public_key).unwrap();
107 let verifying_key = VerifyingKey::<Sha256>::new(public_key);
108
109 if verifying_key
110 .verify(
111 &message.payload,
112 &Signature::try_from(signature.as_ref()).unwrap(),
113 )
114 .is_ok()
115 {
116 verified_instance =
117 Some(AuthenticatedInstance::new(instance.clone()));
118
119 break;
120 }
121 }
122 }
123
124 verified_instance
125 };
126
127 let user = {
128 let mut verified_user = None;
129 if let Some(verified_instance) = &instance {
130 for source in &message.source {
131 if let AuthenticationSource::User { user, token } = source {
132 // Get token
133 let public_key = key_cache.get(verified_instance).await.unwrap();
134
135 let token: TokenData<UserTokenMetadata> = jsonwebtoken::decode(
136 token.as_ref(),
137 &DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap(),
138 &Validation::new(jsonwebtoken::Algorithm::RS256),
139 )
140 .unwrap();
141
142 if token.claims.generated_for != *verified_instance.deref() {
143 // Nope!
144 break;
145 }
146
147 if token.claims.user != *user {
148 // Nope!
149 break;
150 }
151
152 verified_user = Some(AuthenticatedUser::new(user.clone()));
153 break;
154 }
155 }
156 }
157
158 verified_user
159 };
160
161 operation_state.user = user;
162 operation_state.instance = instance;
163
164 let result = runtime
165 .handle_network_message(message, &operation_state)
166 .await;
167
168 // Asking for exploits here
169 operation_state.user = None;
170 operation_state.instance = None;
171
172 if let Err(OperationError::Internal(internal_error)) = &result {
173 error!("An internal error has occurred:\n{:?}", internal_error);
174 }
175
176 // Map error to the network variant
177 let result = result.map_err(|e| e.into_network());
178
179 let mut socket = connection_state.socket.lock().await;
180 let _ = socket
181 .send(Message::Binary(bincode::serialize(&result).unwrap()))
182 .await;
183
184 drop(socket);
185 }
186 _ => {
187 return;
188 }
189 }
190 }
191 }
192
193 #[derive(Clone)]
194 pub struct ConnectionState {
195 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
196 pub connections: Arc<Mutex<Connections>>,
197 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
198 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
199 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
200 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
201 pub addr: SocketAddr,
202 pub instance: Instance,
203 pub handshaked: Arc<AtomicBool>,
204 pub key_cache: Arc<Mutex<PublicKeyCache>>,
205 pub instance_connections: Arc<Mutex<InstanceConnections>>,
206 pub config: Table,
207 }
208
209 impl ConnectionState {
210 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
211 let payload = serde_json::to_string(&message)?;
212 self.socket
213 .lock()
214 .await
215 .send(Message::Binary(payload.into_bytes()))
216 .await?;
217
218 Ok(())
219 }
220
221 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
222 let payload = serde_json::to_string(&message)?;
223 self.socket
224 .lock()
225 .await
226 .send(Message::Binary(payload.into_bytes()))
227 .await?;
228
229 Ok(())
230 }
231
232 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
233 let mut keys = self.key_cache.lock().await;
234 keys.get(instance).await
235 }
236 }
237