JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Giterated stack changeover, refactor still incomplete

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨8d40dfe

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨9811⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{atomic::AtomicBool, Arc},
4 };
5
6 use anyhow::Error;
7 use futures_util::{SinkExt, StreamExt};
8
9 use giterated_models::{error::OperationError, instance::Instance};
10
11 use giterated_models::object_backend::ObjectBackend;
12
13 use giterated_models::{
14 authenticated::AuthenticatedPayload, message::GiteratedMessage, object::AnyObject,
15 operation::AnyOperation,
16 };
17 use serde::Serialize;
18
19 use tokio::{net::TcpStream, sync::Mutex};
20 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
21 use toml::Table;
22
23 use crate::{
24 authentication::AuthenticationTokenGranter,
25 backend::{MetadataBackend, RepositoryBackend, UserBackend},
26 database_backend::DatabaseBackend,
27 federation::connections::InstanceConnections,
28 keys::PublicKeyCache,
29 };
30
31 use super::Connections;
32
33 pub async fn connection_wrapper(
34 socket: WebSocketStream<TcpStream>,
35 connections: Arc<Mutex<Connections>>,
36 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
37 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
38 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
39 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
40 addr: SocketAddr,
41 instance: impl ToOwned<Owned = Instance>,
42 instance_connections: Arc<Mutex<InstanceConnections>>,
43 config: Table,
44 backend: DatabaseBackend,
45 ) {
46 let connection_state = ConnectionState {
47 socket: Arc::new(Mutex::new(socket)),
48 connections,
49 repository_backend,
50 user_backend,
51 auth_granter,
52 settings_backend,
53 addr,
54 instance: instance.to_owned(),
55 handshaked: Arc::new(AtomicBool::new(false)),
56 key_cache: Arc::default(),
57 instance_connections: instance_connections.clone(),
58 config,
59 };
60
61 let _handshaked = false;
62
63 let backend = backend.into_backend();
64
65 loop {
66 let mut socket = connection_state.socket.lock().await;
67 let message = socket.next().await;
68 drop(socket);
69
70 match message {
71 Some(Ok(message)) => {
72 let payload = match message {
73 Message::Binary(payload) => payload,
74 Message::Ping(_) => {
75 let mut socket = connection_state.socket.lock().await;
76 let _ = socket.send(Message::Pong(vec![])).await;
77 drop(socket);
78 continue;
79 }
80 Message::Close(_) => return,
81 _ => continue,
82 };
83
84 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
85
86 let message: GiteratedMessage<AnyObject, AnyOperation> = message.into_message();
87
88 let result = backend
89 .object_operation(message.object, &message.operation, message.payload)
90 .await;
91
92 // Map result to Vec<u8> on both
93 let result = match result {
94 Ok(result) => Ok(serde_json::to_vec(&result).unwrap()),
95 Err(err) => Err(match err {
96 OperationError::Operation(err) => {
97 OperationError::Operation(serde_json::to_vec(&err).unwrap())
98 }
99 OperationError::Internal(err) => OperationError::Internal(err),
100 OperationError::Unhandled => OperationError::Unhandled,
101 }),
102 };
103
104 let mut socket = connection_state.socket.lock().await;
105 let _ = socket
106 .send(Message::Binary(bincode::serialize(&result).unwrap()))
107 .await;
108
109 drop(socket);
110 }
111 _ => {
112 return;
113 }
114 }
115 }
116
117 // loop {
118 // let mut socket = connection_state.socket.lock().await;
119 // let message = socket.next().await;
120 // drop(socket);
121
122 // match message {
123 // Some(Ok(message)) => {
124 // let payload = match message {
125 // Message::Binary(payload) => payload,
126 // Message::Ping(_) => {
127 // let mut socket = connection_state.socket.lock().await;
128 // let _ = socket.send(Message::Pong(vec![])).await;
129 // drop(socket);
130 // continue;
131 // }
132 // Message::Close(_) => return,
133 // _ => continue,
134 // };
135
136 // let message = NetworkMessage(payload.clone());
137
138 // if !handshaked {
139 // if handshake_handle(&message, &connection_state).await.is_ok() {
140 // if connection_state.handshaked.load(Ordering::SeqCst) {
141 // handshaked = true;
142 // }
143 // }
144 // } else {
145 // let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap();
146
147 // if let Some(target_instance) = &raw.target_instance {
148 // if connection_state.instance != *target_instance {
149 // // Forward request
150 // info!("Forwarding message to {}", target_instance.url);
151 // let mut instance_connections = instance_connections.lock().await;
152 // let pool = instance_connections.get_or_open(&target_instance).unwrap();
153 // let pool_clone = pool.clone();
154 // drop(pool);
155
156 // let result = wrap_forwarded(&pool_clone, raw).await;
157
158 // let mut socket = connection_state.socket.lock().await;
159 // let _ = socket.send(result).await;
160
161 // continue;
162 // }
163 // }
164
165 // let message_type = &raw.message_type;
166
167 // match authentication_handle(message_type, &message, &connection_state).await {
168 // Err(e) => {
169 // let _ = connection_state
170 // .send_raw(ConnectionError(e.to_string()))
171 // .await;
172 // }
173 // Ok(true) => continue,
174 // Ok(false) => {}
175 // }
176
177 // match repository_handle(message_type, &message, &connection_state).await {
178 // Err(e) => {
179 // let _ = connection_state
180 // .send_raw(ConnectionError(e.to_string()))
181 // .await;
182 // }
183 // Ok(true) => continue,
184 // Ok(false) => {}
185 // }
186
187 // match user_handle(message_type, &message, &connection_state).await {
188 // Err(e) => {
189 // let _ = connection_state
190 // .send_raw(ConnectionError(e.to_string()))
191 // .await;
192 // }
193 // Ok(true) => continue,
194 // Ok(false) => {}
195 // }
196
197 // match authentication_handle(message_type, &message, &connection_state).await {
198 // Err(e) => {
199 // let _ = connection_state
200 // .send_raw(ConnectionError(e.to_string()))
201 // .await;
202 // }
203 // Ok(true) => continue,
204 // Ok(false) => {}
205 // }
206
207 // error!(
208 // "Message completely unhandled: {}",
209 // std::str::from_utf8(&payload).unwrap()
210 // );
211 // }
212 // }
213 // Some(Err(e)) => {
214 // error!("Closing connection for {:?} for {}", e, addr);
215 // return;
216 // }
217 // _ => {
218 // continue;
219 // }
220 // }
221 // }
222 }
223
224 #[derive(Clone)]
225 pub struct ConnectionState {
226 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
227 pub connections: Arc<Mutex<Connections>>,
228 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
229 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
230 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
231 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
232 pub addr: SocketAddr,
233 pub instance: Instance,
234 pub handshaked: Arc<AtomicBool>,
235 pub key_cache: Arc<Mutex<PublicKeyCache>>,
236 pub instance_connections: Arc<Mutex<InstanceConnections>>,
237 pub config: Table,
238 }
239
240 impl ConnectionState {
241 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
242 let payload = serde_json::to_string(&message)?;
243 self.socket
244 .lock()
245 .await
246 .send(Message::Binary(payload.into_bytes()))
247 .await?;
248
249 Ok(())
250 }
251
252 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
253 let payload = serde_json::to_string(&message)?;
254 self.socket
255 .lock()
256 .await
257 .send(Message::Binary(payload.into_bytes()))
258 .await?;
259
260 Ok(())
261 }
262
263 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
264 let mut keys = self.key_cache.lock().await;
265 keys.get(instance).await
266 }
267 }
268