JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add all the user request handling

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨e3bda14

⁨src/connection.rs⁩ - ⁨24971⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc};
2
3 use anyhow::Error;
4 use futures_util::{stream::StreamExt, SinkExt};
5 use semver::Version;
6 use serde::Serialize;
7 use tokio::{
8 net::TcpStream,
9 sync::{
10 broadcast::{Receiver, Sender},
11 Mutex,
12 },
13 task::JoinHandle,
14 };
15 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
16
17 use crate::{
18 authentication::AuthenticationTokenGranter,
19 backend::{DiscoveryBackend, RepositoryBackend, UserBackend},
20 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
21 listener::Listeners,
22 messages::{
23 authentication::{
24 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
25 TokenExtensionResponse,
26 },
27 repository::{
28 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
29 },
30 user::{
31 UserMessage, UserMessageKind, UserMessageRequest, UserMessageResponse,
32 UserRepositoriesResponse,
33 },
34 MessageKind,
35 },
36 model::{
37 instance::{Instance, InstanceMeta},
38 repository::Repository,
39 user::User,
40 },
41 validate_version, version,
42 };
43
44 pub struct RawConnection {
45 pub task: JoinHandle<()>,
46 }
47
48 pub struct InstanceConnection {
49 pub instance: InstanceMeta,
50 pub sender: Sender<MessageKind>,
51 pub task: JoinHandle<()>,
52 }
53
54 /// Represents a connection which hasn't finished the handshake.
55 pub struct UnestablishedConnection {
56 pub socket: WebSocketStream<TcpStream>,
57 }
58
59 #[derive(Default)]
60 pub struct Connections {
61 pub connections: Vec<RawConnection>,
62 pub instance_connections: HashMap<Instance, InstanceConnection>,
63 }
64
65 pub async fn connection_worker(
66 mut socket: WebSocketStream<TcpStream>,
67 listeners: Arc<Mutex<Listeners>>,
68 connections: Arc<Mutex<Connections>>,
69 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
70 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
71 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
72 discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
73 addr: SocketAddr,
74 ) {
75 let mut handshaked = false;
76 let this_instance = Instance {
77 url: String::from("giterated.dev"),
78 };
79
80 while let Some(message) = socket.next().await {
81 let message = match message {
82 Ok(message) => message,
83 Err(err) => {
84 error!("Error reading message: {:?}", err);
85 continue;
86 }
87 };
88
89 let payload = match message {
90 Message::Text(text) => text.into_bytes(),
91 Message::Binary(bytes) => bytes,
92 Message::Ping(_) => continue,
93 Message::Pong(_) => continue,
94 Message::Close(_) => {
95 info!("Closing connection with {}.", addr);
96
97 return;
98 }
99 _ => unreachable!(),
100 };
101
102 let message = match serde_json::from_slice::<MessageKind>(&payload) {
103 Ok(message) => message,
104 Err(err) => {
105 error!("Error deserializing message from {}: {:?}", addr, err);
106 continue;
107 }
108 };
109
110 // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
111
112 if let MessageKind::Handshake(handshake) = message {
113 match handshake {
114 HandshakeMessage::Initiate(request) => {
115 // Send HandshakeMessage::Response
116 let message = HandshakeResponse {
117 identity: this_instance.clone(),
118 version: version(),
119 };
120
121 let version_check = validate_version(&request.version);
122
123 let _result = if !version_check {
124 error!(
125 "Version compatibility failure! Our Version: {}, Their Version: {}",
126 Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap())
127 .unwrap(),
128 request.version
129 );
130
131 send(
132 &mut socket,
133 MessageKind::Handshake(HandshakeMessage::Finalize(HandshakeFinalize {
134 success: false,
135 })),
136 )
137 .await
138 } else {
139 send(
140 &mut socket,
141 MessageKind::Handshake(HandshakeMessage::Response(message)),
142 )
143 .await
144 };
145
146 continue;
147 }
148 HandshakeMessage::Response(response) => {
149 // Check version
150 let message = if validate_version(&response.version) {
151 error!(
152 "Version compatibility failure! Our Version: {}, Their Version: {}",
153 version(),
154 response.version
155 );
156
157 HandshakeFinalize { success: false }
158 } else {
159 info!("Connected with a compatible version");
160
161 HandshakeFinalize { success: true }
162 };
163
164 let _result = send(
165 &mut socket,
166 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
167 )
168 .await;
169
170 continue;
171 }
172 HandshakeMessage::Finalize(response) => {
173 if !response.success {
174 error!("Error during handshake, aborting connection");
175 return;
176 }
177
178 handshaked = true;
179
180 // Send HandshakeMessage::Finalize
181 let message = HandshakeFinalize { success: true };
182
183 let _result = send(
184 &mut socket,
185 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
186 )
187 .await;
188
189 continue;
190 }
191 }
192 }
193
194 if !handshaked {
195 continue;
196 }
197
198 if let MessageKind::Repository(repository) = &message {
199 if repository.target.instance != this_instance {
200 info!("Forwarding command to {}", repository.target.instance.url);
201 // We need to send this command to a different instance
202
203 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
204
205 // Wait for response
206 while let Ok(message) = listener.recv().await {
207 if let MessageKind::Repository(RepositoryMessage {
208 command: RepositoryMessageKind::Response(_),
209 ..
210 }) = message
211 {
212 let _result = send(&mut socket, message).await;
213 }
214 }
215 continue;
216 } else {
217 // This message is targeting this instance
218 match &repository.command {
219 RepositoryMessageKind::Request(request) => match request.clone() {
220 RepositoryRequest::CreateRepository(request) => {
221 let mut backend = backend.lock().await;
222 let request = request.validate().await.unwrap();
223 let response = backend.create_repository(&request).await;
224
225 let response = match response {
226 Ok(response) => response,
227 Err(err) => {
228 error!("Error handling request: {:?}", err);
229 continue;
230 }
231 };
232 drop(backend);
233
234 let _result = send(
235 &mut socket,
236 MessageKind::Repository(RepositoryMessage {
237 target: repository.target.clone(),
238 command: RepositoryMessageKind::Response(
239 RepositoryResponse::CreateRepository(response),
240 ),
241 }),
242 )
243 .await;
244
245 continue;
246 }
247 RepositoryRequest::RepositoryFileInspect(request) => {
248 let mut backend = backend.lock().await;
249 let request = request.validate().await.unwrap();
250 let response = backend.repository_file_inspect(&request);
251
252 let response = match response {
253 Ok(response) => response,
254 Err(err) => {
255 error!("Error handling request: {:?}", err);
256 continue;
257 }
258 };
259 drop(backend);
260
261 let _result = send(
262 &mut socket,
263 MessageKind::Repository(RepositoryMessage {
264 target: repository.target.clone(),
265 command: RepositoryMessageKind::Response(
266 RepositoryResponse::RepositoryFileInspection(response),
267 ),
268 }),
269 )
270 .await;
271
272 continue;
273 }
274 RepositoryRequest::RepositoryInfo(request) => {
275 let mut backend = backend.lock().await;
276 let request = request.validate().await.unwrap();
277 let response = backend.repository_info(&request).await;
278
279 let response = match response {
280 Ok(response) => response,
281 Err(err) => {
282 error!("Error handling request: {:?}", err);
283 continue;
284 }
285 };
286 drop(backend);
287
288 let _result = send(
289 &mut socket,
290 MessageKind::Repository(RepositoryMessage {
291 target: repository.target.clone(),
292 command: RepositoryMessageKind::Response(
293 RepositoryResponse::RepositoryInfo(response),
294 ),
295 }),
296 )
297 .await;
298
299 continue;
300 }
301 RepositoryRequest::IssuesCount(request) => {
302 let request = &request.validate().await.unwrap();
303
304 let mut backend = backend.lock().await;
305 let response = backend.issues_count(request);
306
307 let response = match response {
308 Ok(response) => response,
309 Err(err) => {
310 error!("Error handling request: {:?}", err);
311 continue;
312 }
313 };
314 drop(backend);
315
316 let _result = send(
317 &mut socket,
318 MessageKind::Repository(RepositoryMessage {
319 target: repository.target.clone(),
320 command: RepositoryMessageKind::Response(
321 RepositoryResponse::IssuesCount(response),
322 ),
323 }),
324 )
325 .await;
326
327 continue;
328 }
329 RepositoryRequest::IssueLabels(request) => {
330 let request = &request.validate().await.unwrap();
331
332 let mut backend = backend.lock().await;
333 let response = backend.issue_labels(request);
334
335 let response = match response {
336 Ok(response) => response,
337 Err(err) => {
338 error!("Error handling request: {:?}", err);
339 continue;
340 }
341 };
342 drop(backend);
343
344 let _result = send(
345 &mut socket,
346 MessageKind::Repository(RepositoryMessage {
347 target: repository.target.clone(),
348 command: RepositoryMessageKind::Response(
349 RepositoryResponse::IssueLabels(response),
350 ),
351 }),
352 )
353 .await;
354
355 continue;
356 }
357 RepositoryRequest::Issues(request) => {
358 let request = request.validate().await.unwrap();
359
360 let mut backend = backend.lock().await;
361 let response = backend.issues(&request);
362
363 let response = match response {
364 Ok(response) => response,
365 Err(err) => {
366 error!("Error handling request: {:?}", err);
367 continue;
368 }
369 };
370 drop(backend);
371
372 let _result = send(
373 &mut socket,
374 MessageKind::Repository(RepositoryMessage {
375 target: repository.target.clone(),
376 command: RepositoryMessageKind::Response(
377 RepositoryResponse::Issues(response),
378 ),
379 }),
380 )
381 .await;
382
383 continue;
384 }
385 },
386 RepositoryMessageKind::Response(_response) => {
387 unreachable!()
388 }
389 }
390 }
391 }
392
393 if let MessageKind::Authentication(authentication) = &message {
394 match authentication {
395 AuthenticationMessage::Request(request) => match request {
396 AuthenticationRequest::AuthenticationToken(token) => {
397 let mut granter = auth_granter.lock().await;
398
399 let response = granter.token_request(token.clone()).await.unwrap();
400 drop(granter);
401
402 let _result = send(
403 &mut socket,
404 MessageKind::Authentication(AuthenticationMessage::Response(
405 AuthenticationResponse::AuthenticationToken(response),
406 )),
407 )
408 .await;
409
410 continue;
411 }
412 AuthenticationRequest::TokenExtension(request) => {
413 let mut granter = auth_granter.lock().await;
414
415 let response = granter
416 .extension_request(request.clone())
417 .await
418 .unwrap_or(TokenExtensionResponse { new_token: None });
419 drop(granter);
420
421 let _result = send(
422 &mut socket,
423 MessageKind::Authentication(AuthenticationMessage::Response(
424 AuthenticationResponse::TokenExtension(response),
425 )),
426 )
427 .await;
428
429 continue;
430 }
431 AuthenticationRequest::RegisterAccount(request) => {
432 let request = request.inner().await.clone();
433
434 let mut user_backend = user_backend.lock().await;
435
436 let response = user_backend.register(request.clone()).await.unwrap();
437 drop(user_backend);
438
439 let _result = send(
440 &mut socket,
441 MessageKind::Authentication(AuthenticationMessage::Response(
442 AuthenticationResponse::RegisterAccount(response),
443 )),
444 )
445 .await;
446
447 continue;
448 }
449 },
450 AuthenticationMessage::Response(_) => unreachable!(),
451 }
452 }
453
454 if let MessageKind::Discovery(message) = &message {
455 let mut backend = discovery_backend.lock().await;
456 backend.try_handle(message).await.unwrap();
457
458 continue;
459 }
460
461 if let MessageKind::User(message) = &message {
462 match &message.message {
463 UserMessageKind::Request(request) => match request {
464 UserMessageRequest::DisplayName(request) => {
465 let mut user_backend = user_backend.lock().await;
466
467 let response = user_backend.display_name(request.clone()).await;
468
469 let response = match response {
470 Ok(response) => response,
471 Err(err) => {
472 error!("Error handling request: {:?}", err);
473 continue;
474 }
475 };
476 drop(user_backend);
477
478 let _result = send(
479 &mut socket,
480 MessageKind::User(UserMessage {
481 instance: message.instance.clone(),
482 message: UserMessageKind::Response(
483 UserMessageResponse::DisplayName(response),
484 ),
485 }),
486 )
487 .await;
488
489 continue;
490 }
491 UserMessageRequest::DisplayImage(request) => {
492 let mut user_backend = user_backend.lock().await;
493
494 let response = user_backend.display_image(request.clone()).await;
495
496 let response = match response {
497 Ok(response) => response,
498 Err(err) => {
499 error!("Error handling request: {:?}", err);
500 continue;
501 }
502 };
503 drop(user_backend);
504
505 let _result = send(
506 &mut socket,
507 MessageKind::User(UserMessage {
508 instance: message.instance.clone(),
509 message: UserMessageKind::Response(
510 UserMessageResponse::DisplayImage(response),
511 ),
512 }),
513 )
514 .await;
515
516 continue;
517 }
518 UserMessageRequest::Bio(request) => {
519 let mut user_backend = user_backend.lock().await;
520
521 let response = user_backend.bio(request.clone()).await;
522
523 let response = match response {
524 Ok(response) => response,
525 Err(err) => {
526 error!("Error handling request: {:?}", err);
527 continue;
528 }
529 };
530 drop(user_backend);
531
532 let _result = send(
533 &mut socket,
534 MessageKind::User(UserMessage {
535 instance: message.instance.clone(),
536 message: UserMessageKind::Response(UserMessageResponse::Bio(
537 response,
538 )),
539 }),
540 )
541 .await;
542
543 continue;
544 }
545 UserMessageRequest::Repositories(request) => {
546 let mut repository_backend = backend.lock().await;
547
548 let repositories = repository_backend
549 .repositories_for_user(&request.user)
550 .await;
551
552 let repositories = match repositories {
553 Ok(repositories) => repositories,
554 Err(err) => {
555 error!("Error handling request: {:?}", err);
556 continue;
557 }
558 };
559 drop(repository_backend);
560
561 let response = UserRepositoriesResponse { repositories };
562
563 let _result = send(
564 &mut socket,
565 MessageKind::User(UserMessage {
566 instance: message.instance.clone(),
567 message: UserMessageKind::Response(
568 UserMessageResponse::Repositories(response),
569 ),
570 }),
571 )
572 .await;
573
574 continue;
575 }
576 },
577 UserMessageKind::Response(_) => unreachable!(),
578 }
579 }
580 }
581
582 info!("Connection closed");
583 }
584
585 async fn send_and_get_listener(
586 message: MessageKind,
587 listeners: &Arc<Mutex<Listeners>>,
588 connections: &Arc<Mutex<Connections>>,
589 ) -> Receiver<MessageKind> {
590 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
591 match &message {
592 MessageKind::Handshake(_) => {
593 todo!()
594 }
595 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
596 MessageKind::Authentication(_) => todo!(),
597 MessageKind::Discovery(_) => todo!(),
598 MessageKind::User(user) => todo!(),
599 };
600
601 let target = match (&instance, &user, &repository) {
602 (Some(instance), _, _) => instance.clone(),
603 (_, Some(user), _) => user.instance.clone(),
604 (_, _, Some(repository)) => repository.instance.clone(),
605 _ => unreachable!(),
606 };
607
608 let mut listeners = listeners.lock().await;
609 let listener = listeners.add(instance, user, repository);
610 drop(listeners);
611
612 let connections = connections.lock().await;
613
614 if let Some(connection) = connections.instance_connections.get(&target) {
615 if let Err(_) = connection.sender.send(message) {
616 error!("Error sending message.");
617 }
618 } else {
619 error!("Unable to message {}, this is a bug.", target.url);
620
621 panic!();
622 }
623
624 drop(connections);
625
626 listener
627 }
628
629 async fn send<T: Serialize>(
630 socket: &mut WebSocketStream<TcpStream>,
631 message: T,
632 ) -> Result<(), Error> {
633 socket
634 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
635 .await?;
636
637 Ok(())
638 }
639