JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Update for auth

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨3ef0383

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨7832⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 };
8
9 use anyhow::Error;
10 use futures_util::{SinkExt, StreamExt};
11 use giterated_models::{
12 messages::error::ConnectionError,
13 model::{authenticated::AuthenticatedPayload, instance::Instance},
14 };
15
16 use serde::Serialize;
17
18 use tokio::{net::TcpStream, sync::Mutex};
19 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
20 use toml::Table;
21
22 use crate::{
23 authentication::AuthenticationTokenGranter,
24 backend::{RepositoryBackend, SettingsBackend, UserBackend},
25 connection::forwarded::wrap_forwarded,
26 federation::connections::InstanceConnections,
27 keys::PublicKeyCache,
28 message::NetworkMessage,
29 };
30
31 use super::{
32 authentication::authentication_handle, handshake::handshake_handle,
33 repository::repository_handle, user::user_handle, Connections,
34 };
35
36 pub async fn connection_wrapper(
37 socket: WebSocketStream<TcpStream>,
38 connections: Arc<Mutex<Connections>>,
39 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
40 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
41 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
42 settings_backend: Arc<Mutex<dyn SettingsBackend>>,
43 addr: SocketAddr,
44 instance: impl ToOwned<Owned = Instance>,
45 instance_connections: Arc<Mutex<InstanceConnections>>,
46 config: Table,
47 ) {
48 let connection_state = ConnectionState {
49 socket: Arc::new(Mutex::new(socket)),
50 connections,
51 repository_backend,
52 user_backend,
53 auth_granter,
54 settings_backend,
55 addr,
56 instance: instance.to_owned(),
57 handshaked: Arc::new(AtomicBool::new(false)),
58 key_cache: Arc::default(),
59 instance_connections: instance_connections.clone(),
60 config,
61 };
62
63 let mut handshaked = false;
64
65 loop {
66 let mut socket = connection_state.socket.lock().await;
67 let message = socket.next().await;
68 drop(socket);
69
70 match message {
71 Some(Ok(message)) => {
72 let payload = match message {
73 Message::Binary(payload) => payload,
74 Message::Ping(_) => {
75 let mut socket = connection_state.socket.lock().await;
76 let _ = socket.send(Message::Pong(vec![])).await;
77 drop(socket);
78 continue;
79 }
80 Message::Close(_) => return,
81 _ => continue,
82 };
83 info!("one payload");
84
85 let message = NetworkMessage(payload.clone());
86
87 if !handshaked {
88 info!("im foo baring");
89 if handshake_handle(&message, &connection_state).await.is_ok() {
90 if connection_state.handshaked.load(Ordering::SeqCst) {
91 handshaked = true;
92 }
93 }
94 } else {
95 let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap();
96
97 if let Some(target_instance) = &raw.target_instance {
98 if connection_state.instance != *target_instance {
99 // Forward request
100 info!("Forwarding message to {}", target_instance.url);
101 let mut instance_connections = instance_connections.lock().await;
102 let pool = instance_connections.get_or_open(&target_instance).unwrap();
103 let pool_clone = pool.clone();
104 drop(pool);
105
106 let result = wrap_forwarded(&pool_clone, raw).await;
107
108 let mut socket = connection_state.socket.lock().await;
109 let _ = socket.send(result).await;
110
111 continue;
112 }
113 }
114
115 let message_type = &raw.message_type;
116
117 info!("Handling message with type: {}", message_type);
118
119 match authentication_handle(message_type, &message, &connection_state).await {
120 Err(e) => {
121 let _ = connection_state
122 .send_raw(ConnectionError(e.to_string()))
123 .await;
124 }
125 Ok(true) => continue,
126 Ok(false) => {}
127 }
128
129 match repository_handle(message_type, &message, &connection_state).await {
130 Err(e) => {
131 let _ = connection_state
132 .send_raw(ConnectionError(e.to_string()))
133 .await;
134 }
135 Ok(true) => continue,
136 Ok(false) => {}
137 }
138
139 match user_handle(message_type, &message, &connection_state).await {
140 Err(e) => {
141 let _ = connection_state
142 .send_raw(ConnectionError(e.to_string()))
143 .await;
144 }
145 Ok(true) => continue,
146 Ok(false) => {}
147 }
148
149 match authentication_handle(message_type, &message, &connection_state).await {
150 Err(e) => {
151 let _ = connection_state
152 .send_raw(ConnectionError(e.to_string()))
153 .await;
154 }
155 Ok(true) => continue,
156 Ok(false) => {}
157 }
158
159 error!(
160 "Message completely unhandled: {}",
161 std::str::from_utf8(&payload).unwrap()
162 );
163 }
164 }
165 Some(Err(e)) => {
166 error!("Closing connection for {:?} for {}", e, addr);
167 return;
168 }
169 _ => {
170 info!("Unhandled");
171 continue;
172 }
173 }
174 }
175 }
176
177 #[derive(Clone)]
178 pub struct ConnectionState {
179 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
180 pub connections: Arc<Mutex<Connections>>,
181 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
182 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
183 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
184 pub settings_backend: Arc<Mutex<dyn SettingsBackend>>,
185 pub addr: SocketAddr,
186 pub instance: Instance,
187 pub handshaked: Arc<AtomicBool>,
188 pub key_cache: Arc<Mutex<PublicKeyCache>>,
189 pub instance_connections: Arc<Mutex<InstanceConnections>>,
190 pub config: Table,
191 }
192
193 impl ConnectionState {
194 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
195 let payload = serde_json::to_string(&message)?;
196 info!("Sending payload: {}", &payload);
197 self.socket
198 .lock()
199 .await
200 .send(Message::Binary(payload.into_bytes()))
201 .await?;
202
203 Ok(())
204 }
205
206 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
207 let payload = serde_json::to_string(&message)?;
208 info!("Sending payload: {}", &payload);
209 self.socket
210 .lock()
211 .await
212 .send(Message::Binary(payload.into_bytes()))
213 .await?;
214
215 Ok(())
216 }
217
218 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
219 let mut keys = self.key_cache.lock().await;
220 keys.get(instance).await
221 }
222 }
223