JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Move user settings to user settings

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨fd846f6

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨7250⁩ bytes
Raw
1 use std::{
2 collections::HashMap,
3 net::SocketAddr,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8 };
9
10 use anyhow::Error;
11 use futures_util::{SinkExt, StreamExt};
12 use giterated_models::{
13 messages::error::ConnectionError,
14 model::{
15 authenticated::{Authenticated, AuthenticatedPayload},
16 instance::Instance,
17 },
18 };
19 use rsa::RsaPublicKey;
20 use serde::Serialize;
21 use serde_json::Value;
22 use tokio::{
23 net::TcpStream,
24 sync::{Mutex, RwLock},
25 };
26 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
27
28 use crate::{
29 authentication::AuthenticationTokenGranter,
30 backend::{RepositoryBackend, UserBackend},
31 connection::forwarded::wrap_forwarded,
32 federation::connections::InstanceConnections,
33 message::NetworkMessage,
34 };
35
36 use super::{
37 authentication::authentication_handle, handshake::handshake_handle,
38 repository::repository_handle, user::user_handle, Connections,
39 };
40
41 pub async fn connection_wrapper(
42 socket: WebSocketStream<TcpStream>,
43 connections: Arc<Mutex<Connections>>,
44 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
45 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
46 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
47 addr: SocketAddr,
48 instance: impl ToOwned<Owned = Instance>,
49 instance_connections: Arc<Mutex<InstanceConnections>>,
50 ) {
51 let connection_state = ConnectionState {
52 socket: Arc::new(Mutex::new(socket)),
53 connections,
54 repository_backend,
55 user_backend,
56 auth_granter,
57 addr,
58 instance: instance.to_owned(),
59 handshaked: Arc::new(AtomicBool::new(false)),
60 cached_keys: Arc::default(),
61 };
62
63 let mut handshaked = false;
64
65 loop {
66 let mut socket = connection_state.socket.lock().await;
67 let message = socket.next().await;
68 drop(socket);
69
70 match message {
71 Some(Ok(message)) => {
72 let payload = match message {
73 Message::Binary(payload) => payload,
74 Message::Ping(_) => {
75 let mut socket = connection_state.socket.lock().await;
76 let _ = socket.send(Message::Pong(vec![])).await;
77 drop(socket);
78 continue;
79 }
80 Message::Close(_) => return,
81 _ => continue,
82 };
83
84 let message = NetworkMessage(payload.clone());
85
86 if !handshaked {
87 info!("im foo baring");
88 if handshake_handle(&message, &connection_state).await.is_ok() {
89 if connection_state.handshaked.load(Ordering::SeqCst) {
90 handshaked = true;
91 }
92 }
93 } else {
94 let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap();
95
96 if let Some(target_instance) = &raw.target_instance {
97 // Forward request
98 info!("Forwarding message to {}", target_instance.url);
99 let mut instance_connections = instance_connections.lock().await;
100 let pool = instance_connections.get_or_open(&target_instance).unwrap();
101 let pool_clone = pool.clone();
102 drop(pool);
103
104 let result = wrap_forwarded(&pool_clone, raw).await;
105
106 let mut socket = connection_state.socket.lock().await;
107 let _ = socket.send(result).await;
108
109 continue;
110 }
111
112 let message_type = &raw.message_type;
113
114 info!("Handling message with type: {}", message_type);
115
116 match authentication_handle(message_type, &message, &connection_state).await {
117 Err(e) => {
118 let _ = connection_state
119 .send_raw(ConnectionError(e.to_string()))
120 .await;
121 }
122 Ok(true) => continue,
123 Ok(false) => {}
124 }
125
126 match repository_handle(message_type, &message, &connection_state).await {
127 Err(e) => {
128 let _ = connection_state
129 .send_raw(ConnectionError(e.to_string()))
130 .await;
131 }
132 Ok(true) => continue,
133 Ok(false) => {}
134 }
135
136 match user_handle(message_type, &message, &connection_state).await {
137 Err(e) => {
138 let _ = connection_state
139 .send_raw(ConnectionError(e.to_string()))
140 .await;
141 }
142 Ok(true) => continue,
143 Ok(false) => {}
144 }
145
146 match authentication_handle(message_type, &message, &connection_state).await {
147 Err(e) => {
148 let _ = connection_state
149 .send_raw(ConnectionError(e.to_string()))
150 .await;
151 }
152 Ok(true) => continue,
153 Ok(false) => {}
154 }
155
156 error!(
157 "Message completely unhandled: {}",
158 std::str::from_utf8(&payload).unwrap()
159 );
160 }
161 }
162 Some(Err(e)) => {
163 error!("Closing connection for {:?} for {}", e, addr);
164 return;
165 }
166 _ => {
167 info!("Unhandled");
168 continue;
169 }
170 }
171 }
172 }
173
174 #[derive(Clone)]
175 pub struct ConnectionState {
176 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
177 pub connections: Arc<Mutex<Connections>>,
178 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
179 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
180 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
181 pub addr: SocketAddr,
182 pub instance: Instance,
183 pub handshaked: Arc<AtomicBool>,
184 pub cached_keys: Arc<RwLock<HashMap<Instance, RsaPublicKey>>>,
185 }
186
187 impl ConnectionState {
188 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
189 let payload = serde_json::to_string(&message)?;
190 info!("Sending payload: {}", &payload);
191 self.socket
192 .lock()
193 .await
194 .send(Message::Binary(payload.into_bytes()))
195 .await?;
196
197 Ok(())
198 }
199
200 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
201 let payload = serde_json::to_string(&message)?;
202 info!("Sending payload: {}", &payload);
203 self.socket
204 .lock()
205 .await
206 .send(Message::Binary(payload.into_bytes()))
207 .await?;
208
209 Ok(())
210 }
211 }
212