JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add users table

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨9f36e3f

⁨src/connection.rs⁩ - ⁨19297⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt};
4 use tokio::{
5 net::TcpStream,
6 sync::{
7 broadcast::{Receiver, Sender},
8 Mutex,
9 },
10 task::JoinHandle,
11 };
12 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
13
14 use crate::{
15 authentication::AuthenticationTokenGranter,
16 backend::{IssuesBackend, RepositoryBackend, UserBackend},
17 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
18 listener::Listeners,
19 messages::{
20 authentication::{AuthenticationMessage, AuthenticationRequest, TokenExtensionResponse},
21 repository::{
22 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
23 },
24 MessageKind,
25 },
26 model::{
27 instance::{Instance, InstanceMeta},
28 repository::Repository,
29 user::User,
30 },
31 };
32
33 pub struct RawConnection {
34 pub task: JoinHandle<()>,
35 }
36
37 pub struct InstanceConnection {
38 pub instance: InstanceMeta,
39 pub sender: Sender<MessageKind>,
40 pub task: JoinHandle<()>,
41 }
42
43 /// Represents a connection which hasn't finished the handshake.
44 pub struct UnestablishedConnection {
45 pub socket: WebSocketStream<TcpStream>,
46 }
47
48 #[derive(Default)]
49 pub struct Connections {
50 pub connections: Vec<RawConnection>,
51 pub instance_connections: HashMap<Instance, InstanceConnection>,
52 }
53
54 pub async fn connection_worker(
55 mut socket: WebSocketStream<TcpStream>,
56 listeners: Arc<Mutex<Listeners>>,
57 connections: Arc<Mutex<Connections>>,
58 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
59 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
60 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
61 addr: SocketAddr,
62 ) {
63 let mut handshaked = false;
64 let this_instance = Instance {
65 url: String::from("giterated.dev"),
66 };
67
68 while let Some(message) = socket.next().await {
69 let message = match message {
70 Ok(message) => message,
71 Err(err) => {
72 error!("Error reading message: {:?}", err);
73 continue;
74 }
75 };
76
77 let payload = match message {
78 Message::Text(text) => text.into_bytes(),
79 Message::Binary(bytes) => bytes,
80 Message::Ping(_) => continue,
81 Message::Pong(_) => continue,
82 Message::Close(_) => {
83 info!("Closing connection with {}.", addr);
84
85 return;
86 }
87 _ => unreachable!(),
88 };
89
90 let message = match serde_json::from_slice::<MessageKind>(&payload) {
91 Ok(message) => message,
92 Err(err) => {
93 error!("Error deserializing message from {}: {:?}", addr, err);
94 continue;
95 }
96 };
97
98 // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
99
100 if let MessageKind::Handshake(handshake) = message {
101 match handshake {
102 HandshakeMessage::Initiate(_) => {
103 // Send HandshakeMessage::Response
104 let message = HandshakeResponse {
105 identity: Instance {
106 url: String::from("foo.com"),
107 },
108 version: String::from("0.1.0"),
109 };
110
111 socket
112 .send(Message::Binary(
113 serde_json::to_vec(&MessageKind::Handshake(
114 HandshakeMessage::Response(message),
115 ))
116 .unwrap(),
117 ))
118 .await
119 .unwrap();
120
121 continue;
122 }
123 HandshakeMessage::Response(_) => {
124 // Send HandshakeMessage::Finalize
125 let message = HandshakeFinalize { success: true };
126
127 socket
128 .send(Message::Binary(
129 serde_json::to_vec(&MessageKind::Handshake(
130 HandshakeMessage::Finalize(message),
131 ))
132 .unwrap(),
133 ))
134 .await
135 .unwrap();
136
137 continue;
138 }
139 HandshakeMessage::Finalize(_) => {
140 handshaked = true;
141
142 // Send HandshakeMessage::Finalize
143 let message = HandshakeFinalize { success: true };
144
145 socket
146 .send(Message::Binary(
147 serde_json::to_vec(&MessageKind::Handshake(
148 HandshakeMessage::Finalize(message),
149 ))
150 .unwrap(),
151 ))
152 .await
153 .unwrap();
154
155 continue;
156 }
157 }
158 }
159
160 if !handshaked {
161 continue;
162 }
163
164 if let MessageKind::Repository(repository) = &message {
165 if repository.target.instance != this_instance {
166 info!("Forwarding command to {}", repository.target.instance.url);
167 // We need to send this command to a different instance
168
169 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
170
171 // Wait for response
172 while let Ok(message) = listener.recv().await {
173 if let MessageKind::Repository(RepositoryMessage {
174 command: RepositoryMessageKind::Response(_),
175 ..
176 }) = message
177 {
178 socket
179 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
180 .await
181 .unwrap();
182 }
183 }
184 continue;
185 } else {
186 // This message is targeting this instance
187 match &repository.command {
188 RepositoryMessageKind::Request(request) => match request.clone() {
189 RepositoryRequest::CreateRepository(request) => {
190 let mut backend = backend.lock().await;
191 let request = request.validate().await.unwrap();
192 let response = backend.create_repository(&request).await;
193
194 let response = match response {
195 Ok(response) => response,
196 Err(err) => {
197 error!("Error handling request: {:?}", err);
198 continue;
199 }
200 };
201 drop(backend);
202
203 socket
204 .send(Message::Binary(
205 serde_json::to_vec(&MessageKind::Repository(
206 RepositoryMessage {
207 target: repository.target.clone(),
208 command: RepositoryMessageKind::Response(
209 RepositoryResponse::CreateRepository(response),
210 ),
211 },
212 ))
213 .unwrap(),
214 ))
215 .await
216 .unwrap();
217
218 continue;
219 }
220 RepositoryRequest::RepositoryFileInspect(request) => {
221 let mut backend = backend.lock().await;
222 let request = request.validate().await.unwrap();
223 let response = backend.repository_file_inspect(&request);
224
225 let response = match response {
226 Ok(response) => response,
227 Err(err) => {
228 error!("Error handling request: {:?}", err);
229 continue;
230 }
231 };
232 drop(backend);
233
234 socket
235 .send(Message::Binary(
236 serde_json::to_vec(&MessageKind::Repository(
237 RepositoryMessage {
238 target: repository.target.clone(),
239 command: RepositoryMessageKind::Response(
240 RepositoryResponse::RepositoryFileInspection(
241 response,
242 ),
243 ),
244 },
245 ))
246 .unwrap(),
247 ))
248 .await
249 .unwrap();
250 continue;
251 }
252 RepositoryRequest::RepositoryInfo(request) => {
253 let mut backend = backend.lock().await;
254 let request = request.validate().await.unwrap();
255 let response = backend.repository_info(&request).await;
256
257 let response = match response {
258 Ok(response) => response,
259 Err(err) => {
260 error!("Error handling request: {:?}", err);
261 continue;
262 }
263 };
264 drop(backend);
265
266 socket
267 .send(Message::Binary(
268 serde_json::to_vec(&MessageKind::Repository(
269 RepositoryMessage {
270 target: repository.target.clone(),
271 command: RepositoryMessageKind::Response(
272 RepositoryResponse::RepositoryInfo(response),
273 ),
274 },
275 ))
276 .unwrap(),
277 ))
278 .await
279 .unwrap();
280 continue;
281 }
282 RepositoryRequest::IssuesCount(request) => {
283 let request = &request.validate().await.unwrap();
284
285 let mut backend = backend.lock().await;
286 let response = backend.issues_count(request);
287
288 let response = match response {
289 Ok(response) => response,
290 Err(err) => {
291 error!("Error handling request: {:?}", err);
292 continue;
293 }
294 };
295 drop(backend);
296
297 socket
298 .send(Message::Binary(
299 serde_json::to_vec(&MessageKind::Repository(
300 RepositoryMessage {
301 target: repository.target.clone(),
302 command: RepositoryMessageKind::Response(
303 RepositoryResponse::IssuesCount(response),
304 ),
305 },
306 ))
307 .unwrap(),
308 ))
309 .await
310 .unwrap();
311 continue;
312 }
313 RepositoryRequest::IssueLabels(request) => {
314 let request = &request.validate().await.unwrap();
315
316 let mut backend = backend.lock().await;
317 let response = backend.issue_labels(&request);
318
319 let response = match response {
320 Ok(response) => response,
321 Err(err) => {
322 error!("Error handling request: {:?}", err);
323 continue;
324 }
325 };
326 drop(backend);
327 socket
328 .send(Message::Binary(
329 serde_json::to_vec(&MessageKind::Repository(
330 RepositoryMessage {
331 target: repository.target.clone(),
332 command: RepositoryMessageKind::Response(
333 RepositoryResponse::IssueLabels(response),
334 ),
335 },
336 ))
337 .unwrap(),
338 ))
339 .await
340 .unwrap();
341 continue;
342 }
343 RepositoryRequest::Issues(request) => {
344 let request = request.validate().await.unwrap();
345
346 let mut backend = backend.lock().await;
347 let response = backend.issues(&request);
348
349 let response = match response {
350 Ok(response) => response,
351 Err(err) => {
352 error!("Error handling request: {:?}", err);
353 continue;
354 }
355 };
356 drop(backend);
357
358 socket
359 .send(Message::Binary(
360 serde_json::to_vec(&MessageKind::Repository(
361 RepositoryMessage {
362 target: repository.target.clone(),
363 command: RepositoryMessageKind::Response(
364 RepositoryResponse::Issues(response),
365 ),
366 },
367 ))
368 .unwrap(),
369 ))
370 .await
371 .unwrap();
372 continue;
373 }
374 },
375 RepositoryMessageKind::Response(_response) => {
376 unreachable!()
377 }
378 }
379 }
380 }
381
382 if let MessageKind::Authentication(authentication) = &message {
383 match authentication {
384 AuthenticationMessage::Request(request) => match request {
385 AuthenticationRequest::AuthenticationToken(token) => {
386 let mut granter = auth_granter.lock().await;
387
388 let response = granter.token_request(token.clone()).await.unwrap();
389 drop(granter);
390
391 socket
392 .send(Message::Binary(
393 serde_json::to_vec(&MessageKind::Authentication(
394 AuthenticationMessage::Response(crate::messages::authentication::AuthenticationResponse::AuthenticationToken(response))
395 ))
396 .unwrap(),
397 ))
398 .await
399 .unwrap();
400 continue;
401 }
402 AuthenticationRequest::TokenExtension(request) => {
403 let mut granter = auth_granter.lock().await;
404
405 let response = granter
406 .extension_request(request.clone())
407 .await
408 .unwrap_or(TokenExtensionResponse { new_token: None });
409 drop(granter);
410
411 socket
412 .send(Message::Binary(
413 serde_json::to_vec(&MessageKind::Authentication(
414 AuthenticationMessage::Response(crate::messages::authentication::AuthenticationResponse::TokenExtension(response))
415 ))
416 .unwrap(),
417 ))
418 .await
419 .unwrap();
420 continue;
421 }
422 },
423 AuthenticationMessage::Response(_) => unreachable!(),
424 }
425 }
426 }
427
428 info!("Connection closed");
429 }
430
431 async fn send_and_get_listener(
432 message: MessageKind,
433 listeners: &Arc<Mutex<Listeners>>,
434 connections: &Arc<Mutex<Connections>>,
435 ) -> Receiver<MessageKind> {
436 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
437 match &message {
438 MessageKind::Handshake(_) => {
439 todo!()
440 }
441 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
442 MessageKind::Authentication(_) => todo!(),
443 };
444
445 let target = match (&instance, &user, &repository) {
446 (Some(instance), _, _) => instance.clone(),
447 (_, Some(user), _) => user.instance.clone(),
448 (_, _, Some(repository)) => repository.instance.clone(),
449 _ => unreachable!(),
450 };
451
452 let mut listeners = listeners.lock().await;
453 let listener = listeners.add(instance, user, repository);
454 drop(listeners);
455
456 let connections = connections.lock().await;
457
458 if let Some(connection) = connections.instance_connections.get(&target) {
459 connection.sender.send(message);
460 } else {
461 error!("Unable to message {}, this is a bug.", target.url);
462
463 panic!();
464 }
465
466 drop(connections);
467
468 listener
469 }
470