JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Finish connection refactor!

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨bd675cd

⁨src/connection/wrapper.rs⁩ - ⁨4204⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 };
8
9 use anyhow::Error;
10 use futures_util::{SinkExt, StreamExt};
11 use serde::Serialize;
12 use tokio::{net::TcpStream, sync::Mutex};
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 use crate::{
16 authentication::AuthenticationTokenGranter,
17 backend::{DiscoveryBackend, RepositoryBackend, UserBackend},
18 connection::ConnectionError,
19 listener::Listeners,
20 model::{authenticated::NetworkMessage, instance::Instance},
21 };
22
23 use super::{
24 authentication::authentication_handle, connection_worker, handshake::handshake_handle,
25 repository::repository_handle, user::user_handle, Connections,
26 };
27
28 pub async fn connection_wrapper(
29 mut socket: WebSocketStream<TcpStream>,
30 listeners: Arc<Mutex<Listeners>>,
31 connections: Arc<Mutex<Connections>>,
32 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
33 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
34 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
35 discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
36 addr: SocketAddr,
37 instance: impl ToOwned<Owned = Instance>,
38 ) {
39 let mut connection_state = ConnectionState {
40 socket: Arc::new(Mutex::new(socket)),
41 listeners,
42 connections,
43 repository_backend,
44 user_backend,
45 auth_granter,
46 discovery_backend,
47 addr,
48 instance: instance.to_owned(),
49 handshaked: Arc::new(AtomicBool::new(false)),
50 };
51
52 let mut handshaked = false;
53
54 loop {
55 let mut socket = connection_state.socket.lock().await;
56 let message = socket.next().await;
57 drop(socket);
58
59 match message {
60 Some(Ok(message)) => {
61 let payload = match message {
62 Message::Binary(payload) => payload,
63 Message::Ping(_) => {
64 let mut socket = connection_state.socket.lock().await;
65 socket.send(Message::Pong(vec![])).await;
66 drop(socket);
67 continue;
68 }
69 Message::Close(_) => return,
70 _ => continue,
71 };
72
73 let message = NetworkMessage(payload);
74
75 if !handshaked {
76 if handshake_handle(&message, &connection_state).await.is_ok() {
77 if connection_state.handshaked.load(Ordering::SeqCst) {
78 handshaked = true;
79 }
80 }
81 } else {
82 if authentication_handle(&message, &connection_state)
83 .await
84 .is_ok()
85 {
86 continue;
87 } else if repository_handle(&message, &connection_state).await.is_ok() {
88 continue;
89 } else if user_handle(&message, &connection_state).await.is_ok() {
90 continue;
91 } else {
92 error!("Message completely unhandled");
93 continue;
94 }
95 }
96 }
97 _ => {
98 error!("Closing connection for {}", addr);
99 return;
100 }
101 }
102 }
103 }
104
105 #[derive(Clone)]
106 pub struct ConnectionState {
107 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
108 pub listeners: Arc<Mutex<Listeners>>,
109 pub connections: Arc<Mutex<Connections>>,
110 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
111 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
112 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
113 pub discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
114 pub addr: SocketAddr,
115 pub instance: Instance,
116 pub handshaked: Arc<AtomicBool>,
117 }
118
119 impl ConnectionState {
120 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
121 self.socket
122 .lock()
123 .await
124 .send(Message::Binary(serde_json::to_vec(&message)?))
125 .await?;
126
127 Ok(())
128 }
129 }
130