JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

woo

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨2d48bc0

⁨src/connection.rs⁩ - ⁨18219⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use anyhow::Error;
4 use futures_util::{stream::StreamExt, SinkExt};
5 use serde::Serialize;
6 use tokio::{
7 net::TcpStream,
8 sync::{
9 broadcast::{Receiver, Sender},
10 Mutex,
11 },
12 task::JoinHandle,
13 };
14 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
15
16 use crate::{
17 authentication::AuthenticationTokenGranter,
18 backend::{DiscoveryBackend, IssuesBackend, RepositoryBackend, UserBackend},
19 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
20 listener::Listeners,
21 messages::{
22 authentication::{
23 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
24 TokenExtensionResponse,
25 },
26 repository::{
27 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
28 },
29 MessageKind,
30 },
31 model::{
32 instance::{Instance, InstanceMeta},
33 repository::Repository,
34 user::User,
35 },
36 };
37
38 pub struct RawConnection {
39 pub task: JoinHandle<()>,
40 }
41
42 pub struct InstanceConnection {
43 pub instance: InstanceMeta,
44 pub sender: Sender<MessageKind>,
45 pub task: JoinHandle<()>,
46 }
47
48 /// Represents a connection which hasn't finished the handshake.
49 pub struct UnestablishedConnection {
50 pub socket: WebSocketStream<TcpStream>,
51 }
52
53 #[derive(Default)]
54 pub struct Connections {
55 pub connections: Vec<RawConnection>,
56 pub instance_connections: HashMap<Instance, InstanceConnection>,
57 }
58
59 pub async fn connection_worker(
60 mut socket: WebSocketStream<TcpStream>,
61 listeners: Arc<Mutex<Listeners>>,
62 connections: Arc<Mutex<Connections>>,
63 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
64 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
65 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
66 discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
67 addr: SocketAddr,
68 ) {
69 let mut handshaked = false;
70 let this_instance = Instance {
71 url: String::from("giterated.dev"),
72 };
73
74 while let Some(message) = socket.next().await {
75 let message = match message {
76 Ok(message) => message,
77 Err(err) => {
78 error!("Error reading message: {:?}", err);
79 continue;
80 }
81 };
82
83 let payload = match message {
84 Message::Text(text) => text.into_bytes(),
85 Message::Binary(bytes) => bytes,
86 Message::Ping(_) => continue,
87 Message::Pong(_) => continue,
88 Message::Close(_) => {
89 info!("Closing connection with {}.", addr);
90
91 return;
92 }
93 _ => unreachable!(),
94 };
95
96 let message = match serde_json::from_slice::<MessageKind>(&payload) {
97 Ok(message) => message,
98 Err(err) => {
99 error!("Error deserializing message from {}: {:?}", addr, err);
100 continue;
101 }
102 };
103
104 // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
105
106 if let MessageKind::Handshake(handshake) = message {
107 match handshake {
108 HandshakeMessage::Initiate(_) => {
109 // Send HandshakeMessage::Response
110 let message = HandshakeResponse {
111 identity: Instance {
112 url: String::from("foo.com"),
113 },
114 version: String::from("0.1.0"),
115 };
116
117 let _result = send(
118 &mut socket,
119 MessageKind::Handshake(HandshakeMessage::Response(message)),
120 )
121 .await;
122
123 continue;
124 }
125 HandshakeMessage::Response(_) => {
126 // Send HandshakeMessage::Finalize
127 let message = HandshakeFinalize { success: true };
128
129 let _result = send(
130 &mut socket,
131 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
132 )
133 .await;
134
135 continue;
136 }
137 HandshakeMessage::Finalize(_) => {
138 handshaked = true;
139
140 // Send HandshakeMessage::Finalize
141 let message = HandshakeFinalize { success: true };
142
143 let _result = send(
144 &mut socket,
145 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
146 )
147 .await;
148
149 continue;
150 }
151 }
152 }
153
154 if !handshaked {
155 continue;
156 }
157
158 if let MessageKind::Repository(repository) = &message {
159 if repository.target.instance != this_instance {
160 info!("Forwarding command to {}", repository.target.instance.url);
161 // We need to send this command to a different instance
162
163 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
164
165 // Wait for response
166 while let Ok(message) = listener.recv().await {
167 if let MessageKind::Repository(RepositoryMessage {
168 command: RepositoryMessageKind::Response(_),
169 ..
170 }) = message
171 {
172 let _result = send(&mut socket, message).await;
173 }
174 }
175 continue;
176 } else {
177 // This message is targeting this instance
178 match &repository.command {
179 RepositoryMessageKind::Request(request) => match request.clone() {
180 RepositoryRequest::CreateRepository(request) => {
181 let mut backend = backend.lock().await;
182 let request = request.validate().await.unwrap();
183 let response = backend.create_repository(&request).await;
184
185 let response = match response {
186 Ok(response) => response,
187 Err(err) => {
188 error!("Error handling request: {:?}", err);
189 continue;
190 }
191 };
192 drop(backend);
193
194 let _result = send(
195 &mut socket,
196 MessageKind::Repository(RepositoryMessage {
197 target: repository.target.clone(),
198 command: RepositoryMessageKind::Response(
199 RepositoryResponse::CreateRepository(response),
200 ),
201 }),
202 )
203 .await;
204
205 continue;
206 }
207 RepositoryRequest::RepositoryFileInspect(request) => {
208 let mut backend = backend.lock().await;
209 let request = request.validate().await.unwrap();
210 let response = backend.repository_file_inspect(&request);
211
212 let response = match response {
213 Ok(response) => response,
214 Err(err) => {
215 error!("Error handling request: {:?}", err);
216 continue;
217 }
218 };
219 drop(backend);
220
221 let _result = send(
222 &mut socket,
223 MessageKind::Repository(RepositoryMessage {
224 target: repository.target.clone(),
225 command: RepositoryMessageKind::Response(
226 RepositoryResponse::RepositoryFileInspection(response),
227 ),
228 }),
229 )
230 .await;
231
232 continue;
233 }
234 RepositoryRequest::RepositoryInfo(request) => {
235 let mut backend = backend.lock().await;
236 let request = request.validate().await.unwrap();
237 let response = backend.repository_info(&request).await;
238
239 let response = match response {
240 Ok(response) => response,
241 Err(err) => {
242 error!("Error handling request: {:?}", err);
243 continue;
244 }
245 };
246 drop(backend);
247
248 let _result = send(
249 &mut socket,
250 MessageKind::Repository(RepositoryMessage {
251 target: repository.target.clone(),
252 command: RepositoryMessageKind::Response(
253 RepositoryResponse::RepositoryInfo(response),
254 ),
255 }),
256 )
257 .await;
258
259 continue;
260 }
261 RepositoryRequest::IssuesCount(request) => {
262 let request = &request.validate().await.unwrap();
263
264 let mut backend = backend.lock().await;
265 let response = backend.issues_count(request);
266
267 let response = match response {
268 Ok(response) => response,
269 Err(err) => {
270 error!("Error handling request: {:?}", err);
271 continue;
272 }
273 };
274 drop(backend);
275
276 let _result = send(
277 &mut socket,
278 MessageKind::Repository(RepositoryMessage {
279 target: repository.target.clone(),
280 command: RepositoryMessageKind::Response(
281 RepositoryResponse::IssuesCount(response),
282 ),
283 }),
284 )
285 .await;
286
287 continue;
288 }
289 RepositoryRequest::IssueLabels(request) => {
290 let request = &request.validate().await.unwrap();
291
292 let mut backend = backend.lock().await;
293 let response = backend.issue_labels(request);
294
295 let response = match response {
296 Ok(response) => response,
297 Err(err) => {
298 error!("Error handling request: {:?}", err);
299 continue;
300 }
301 };
302 drop(backend);
303
304 let _result = send(
305 &mut socket,
306 MessageKind::Repository(RepositoryMessage {
307 target: repository.target.clone(),
308 command: RepositoryMessageKind::Response(
309 RepositoryResponse::IssueLabels(response),
310 ),
311 }),
312 )
313 .await;
314
315 continue;
316 }
317 RepositoryRequest::Issues(request) => {
318 let request = request.validate().await.unwrap();
319
320 let mut backend = backend.lock().await;
321 let response = backend.issues(&request);
322
323 let response = match response {
324 Ok(response) => response,
325 Err(err) => {
326 error!("Error handling request: {:?}", err);
327 continue;
328 }
329 };
330 drop(backend);
331
332 let _result = send(
333 &mut socket,
334 MessageKind::Repository(RepositoryMessage {
335 target: repository.target.clone(),
336 command: RepositoryMessageKind::Response(
337 RepositoryResponse::Issues(response),
338 ),
339 }),
340 )
341 .await;
342
343 continue;
344 }
345 },
346 RepositoryMessageKind::Response(_response) => {
347 unreachable!()
348 }
349 }
350 }
351 }
352
353 if let MessageKind::Authentication(authentication) = &message {
354 match authentication {
355 AuthenticationMessage::Request(request) => match request {
356 AuthenticationRequest::AuthenticationToken(token) => {
357 let mut granter = auth_granter.lock().await;
358
359 let response = granter.token_request(token.clone()).await.unwrap();
360 drop(granter);
361
362 let _result = send(
363 &mut socket,
364 MessageKind::Authentication(AuthenticationMessage::Response(
365 AuthenticationResponse::AuthenticationToken(response),
366 )),
367 )
368 .await;
369
370 continue;
371 }
372 AuthenticationRequest::TokenExtension(request) => {
373 let mut granter = auth_granter.lock().await;
374
375 let response = granter
376 .extension_request(request.clone())
377 .await
378 .unwrap_or(TokenExtensionResponse { new_token: None });
379 drop(granter);
380
381 let _result = send(
382 &mut socket,
383 MessageKind::Authentication(AuthenticationMessage::Response(
384 AuthenticationResponse::TokenExtension(response),
385 )),
386 )
387 .await;
388
389 continue;
390 }
391 AuthenticationRequest::RegisterAccount(request) => {
392 let request = request.inner().await.clone();
393
394 let mut user_backend = user_backend.lock().await;
395
396 let response = user_backend.register(request.clone()).await.unwrap();
397 drop(user_backend);
398
399 let _result = send(
400 &mut socket,
401 MessageKind::Authentication(AuthenticationMessage::Response(
402 AuthenticationResponse::RegisterAccount(response),
403 )),
404 )
405 .await;
406
407 continue;
408 }
409 },
410 AuthenticationMessage::Response(_) => unreachable!(),
411 }
412 }
413
414 if let MessageKind::Discovery(message) = &message {
415 let mut backend = discovery_backend.lock().await;
416 backend.try_handle(message).await.unwrap();
417
418 continue;
419 }
420 }
421
422 info!("Connection closed");
423 }
424
425 async fn send_and_get_listener(
426 message: MessageKind,
427 listeners: &Arc<Mutex<Listeners>>,
428 connections: &Arc<Mutex<Connections>>,
429 ) -> Receiver<MessageKind> {
430 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
431 match &message {
432 MessageKind::Handshake(_) => {
433 todo!()
434 }
435 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
436 MessageKind::Authentication(_) => todo!(),
437 MessageKind::Discovery(_) => todo!(),
438 };
439
440 let target = match (&instance, &user, &repository) {
441 (Some(instance), _, _) => instance.clone(),
442 (_, Some(user), _) => user.instance.clone(),
443 (_, _, Some(repository)) => repository.instance.clone(),
444 _ => unreachable!(),
445 };
446
447 let mut listeners = listeners.lock().await;
448 let listener = listeners.add(instance, user, repository);
449 drop(listeners);
450
451 let connections = connections.lock().await;
452
453 if let Some(connection) = connections.instance_connections.get(&target) {
454 connection.sender.send(message);
455 } else {
456 error!("Unable to message {}, this is a bug.", target.url);
457
458 panic!();
459 }
460
461 drop(connections);
462
463 listener
464 }
465
466 async fn send<T: Serialize>(
467 socket: &mut WebSocketStream<TcpStream>,
468 message: T,
469 ) -> Result<(), Error> {
470 socket
471 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
472 .await?;
473
474 Ok(())
475 }
476