JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Major connection refactor base

Type: Refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨8dcc111

⁨src/connection/wrapper.rs⁩ - ⁨2329⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{atomic::AtomicBool, Arc},
4 };
5
6 use anyhow::Error;
7 use futures_util::SinkExt;
8 use serde::Serialize;
9 use tokio::{net::TcpStream, sync::Mutex};
10 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
11
12 use crate::{
13 authentication::AuthenticationTokenGranter,
14 backend::{DiscoveryBackend, RepositoryBackend, UserBackend},
15 connection::ConnectionError,
16 listener::Listeners,
17 model::instance::Instance,
18 };
19
20 use super::{connection_worker, Connections};
21
22 pub async fn connection_wrapper(
23 mut socket: WebSocketStream<TcpStream>,
24 listeners: Arc<Mutex<Listeners>>,
25 connections: Arc<Mutex<Connections>>,
26 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
27 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
28 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
29 discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
30 addr: SocketAddr,
31 ) {
32 let mut handshaked = false;
33 loop {
34 if let Err(e) = connection_worker(
35 &mut socket,
36 &mut handshaked,
37 &listeners,
38 &connections,
39 &repository_backend,
40 &user_backend,
41 &auth_granter,
42 &discovery_backend,
43 &addr,
44 )
45 .await
46 {
47 error!("Error handling message: {:?}", e);
48
49 if let ConnectionError::Shutdown = &e {
50 info!("Closing connection {}", addr);
51 return;
52 }
53 }
54 }
55 }
56
57 #[derive(Clone)]
58 pub struct ConnectionState {
59 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
60 pub listeners: Arc<Mutex<Listeners>>,
61 pub connections: Arc<Mutex<Connections>>,
62 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
63 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
64 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
65 pub discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
66 pub addr: SocketAddr,
67 pub instance: Instance,
68 pub handshaked: Arc<AtomicBool>,
69 }
70
71 impl ConnectionState {
72 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
73 self.socket
74 .lock()
75 .await
76 .send(Message::Binary(serde_json::to_vec(&message)?))
77 .await?;
78
79 Ok(())
80 }
81 }
82