JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Update version checks

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨1ed7c39

⁨src/connection.rs⁩ - ⁨19757⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc};
2
3 use anyhow::Error;
4 use futures_util::{stream::StreamExt, SinkExt};
5 use semver::Version;
6 use serde::Serialize;
7 use tokio::{
8 net::TcpStream,
9 sync::{
10 broadcast::{Receiver, Sender},
11 Mutex,
12 },
13 task::JoinHandle,
14 };
15 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
16
17 use crate::{
18 authentication::AuthenticationTokenGranter,
19 backend::{DiscoveryBackend, RepositoryBackend, UserBackend},
20 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
21 listener::Listeners,
22 messages::{
23 authentication::{
24 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
25 TokenExtensionResponse,
26 },
27 repository::{
28 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
29 },
30 MessageKind,
31 },
32 model::{
33 instance::{Instance, InstanceMeta},
34 repository::Repository,
35 user::User,
36 },
37 validate_version, version,
38 };
39
40 pub struct RawConnection {
41 pub task: JoinHandle<()>,
42 }
43
44 pub struct InstanceConnection {
45 pub instance: InstanceMeta,
46 pub sender: Sender<MessageKind>,
47 pub task: JoinHandle<()>,
48 }
49
50 /// Represents a connection which hasn't finished the handshake.
51 pub struct UnestablishedConnection {
52 pub socket: WebSocketStream<TcpStream>,
53 }
54
55 #[derive(Default)]
56 pub struct Connections {
57 pub connections: Vec<RawConnection>,
58 pub instance_connections: HashMap<Instance, InstanceConnection>,
59 }
60
61 pub async fn connection_worker(
62 mut socket: WebSocketStream<TcpStream>,
63 listeners: Arc<Mutex<Listeners>>,
64 connections: Arc<Mutex<Connections>>,
65 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
66 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
67 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
68 discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
69 addr: SocketAddr,
70 ) {
71 let mut handshaked = false;
72 let this_instance = Instance {
73 url: String::from("giterated.dev"),
74 };
75
76 while let Some(message) = socket.next().await {
77 let message = match message {
78 Ok(message) => message,
79 Err(err) => {
80 error!("Error reading message: {:?}", err);
81 continue;
82 }
83 };
84
85 let payload = match message {
86 Message::Text(text) => text.into_bytes(),
87 Message::Binary(bytes) => bytes,
88 Message::Ping(_) => continue,
89 Message::Pong(_) => continue,
90 Message::Close(_) => {
91 info!("Closing connection with {}.", addr);
92
93 return;
94 }
95 _ => unreachable!(),
96 };
97
98 let message = match serde_json::from_slice::<MessageKind>(&payload) {
99 Ok(message) => message,
100 Err(err) => {
101 error!("Error deserializing message from {}: {:?}", addr, err);
102 continue;
103 }
104 };
105
106 // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
107
108 if let MessageKind::Handshake(handshake) = message {
109 match handshake {
110 HandshakeMessage::Initiate(request) => {
111 // Send HandshakeMessage::Response
112 let message = HandshakeResponse {
113 identity: this_instance.clone(),
114 version: version(),
115 };
116
117 let version_check = validate_version(&request.version);
118
119 let _result = if !version_check {
120 error!(
121 "Version compatibility failure! Our Version: {}, Their Version: {}",
122 Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap())
123 .unwrap(),
124 request.version
125 );
126
127 send(
128 &mut socket,
129 MessageKind::Handshake(HandshakeMessage::Finalize(HandshakeFinalize {
130 success: false,
131 })),
132 )
133 .await
134 } else {
135 send(
136 &mut socket,
137 MessageKind::Handshake(HandshakeMessage::Response(message)),
138 )
139 .await
140 };
141
142 continue;
143 }
144 HandshakeMessage::Response(response) => {
145 // Check version
146 let message = if validate_version(&response.version) {
147 error!(
148 "Version compatibility failure! Our Version: {}, Their Version: {}",
149 version(),
150 response.version
151 );
152
153 HandshakeFinalize { success: false }
154 } else {
155 info!("Connected with a compatible version");
156
157 HandshakeFinalize { success: true }
158 };
159
160 let _result = send(
161 &mut socket,
162 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
163 )
164 .await;
165
166 continue;
167 }
168 HandshakeMessage::Finalize(response) => {
169 if !response.success {
170 error!("Error during handshake, aborting connection");
171 return;
172 }
173
174 handshaked = true;
175
176 // Send HandshakeMessage::Finalize
177 let message = HandshakeFinalize { success: true };
178
179 let _result = send(
180 &mut socket,
181 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
182 )
183 .await;
184
185 continue;
186 }
187 }
188 }
189
190 if !handshaked {
191 continue;
192 }
193
194 if let MessageKind::Repository(repository) = &message {
195 if repository.target.instance != this_instance {
196 info!("Forwarding command to {}", repository.target.instance.url);
197 // We need to send this command to a different instance
198
199 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
200
201 // Wait for response
202 while let Ok(message) = listener.recv().await {
203 if let MessageKind::Repository(RepositoryMessage {
204 command: RepositoryMessageKind::Response(_),
205 ..
206 }) = message
207 {
208 let _result = send(&mut socket, message).await;
209 }
210 }
211 continue;
212 } else {
213 // This message is targeting this instance
214 match &repository.command {
215 RepositoryMessageKind::Request(request) => match request.clone() {
216 RepositoryRequest::CreateRepository(request) => {
217 let mut backend = backend.lock().await;
218 let request = request.validate().await.unwrap();
219 let response = backend.create_repository(&request).await;
220
221 let response = match response {
222 Ok(response) => response,
223 Err(err) => {
224 error!("Error handling request: {:?}", err);
225 continue;
226 }
227 };
228 drop(backend);
229
230 let _result = send(
231 &mut socket,
232 MessageKind::Repository(RepositoryMessage {
233 target: repository.target.clone(),
234 command: RepositoryMessageKind::Response(
235 RepositoryResponse::CreateRepository(response),
236 ),
237 }),
238 )
239 .await;
240
241 continue;
242 }
243 RepositoryRequest::RepositoryFileInspect(request) => {
244 let mut backend = backend.lock().await;
245 let request = request.validate().await.unwrap();
246 let response = backend.repository_file_inspect(&request);
247
248 let response = match response {
249 Ok(response) => response,
250 Err(err) => {
251 error!("Error handling request: {:?}", err);
252 continue;
253 }
254 };
255 drop(backend);
256
257 let _result = send(
258 &mut socket,
259 MessageKind::Repository(RepositoryMessage {
260 target: repository.target.clone(),
261 command: RepositoryMessageKind::Response(
262 RepositoryResponse::RepositoryFileInspection(response),
263 ),
264 }),
265 )
266 .await;
267
268 continue;
269 }
270 RepositoryRequest::RepositoryInfo(request) => {
271 let mut backend = backend.lock().await;
272 let request = request.validate().await.unwrap();
273 let response = backend.repository_info(&request).await;
274
275 let response = match response {
276 Ok(response) => response,
277 Err(err) => {
278 error!("Error handling request: {:?}", err);
279 continue;
280 }
281 };
282 drop(backend);
283
284 let _result = send(
285 &mut socket,
286 MessageKind::Repository(RepositoryMessage {
287 target: repository.target.clone(),
288 command: RepositoryMessageKind::Response(
289 RepositoryResponse::RepositoryInfo(response),
290 ),
291 }),
292 )
293 .await;
294
295 continue;
296 }
297 RepositoryRequest::IssuesCount(request) => {
298 let request = &request.validate().await.unwrap();
299
300 let mut backend = backend.lock().await;
301 let response = backend.issues_count(request);
302
303 let response = match response {
304 Ok(response) => response,
305 Err(err) => {
306 error!("Error handling request: {:?}", err);
307 continue;
308 }
309 };
310 drop(backend);
311
312 let _result = send(
313 &mut socket,
314 MessageKind::Repository(RepositoryMessage {
315 target: repository.target.clone(),
316 command: RepositoryMessageKind::Response(
317 RepositoryResponse::IssuesCount(response),
318 ),
319 }),
320 )
321 .await;
322
323 continue;
324 }
325 RepositoryRequest::IssueLabels(request) => {
326 let request = &request.validate().await.unwrap();
327
328 let mut backend = backend.lock().await;
329 let response = backend.issue_labels(request);
330
331 let response = match response {
332 Ok(response) => response,
333 Err(err) => {
334 error!("Error handling request: {:?}", err);
335 continue;
336 }
337 };
338 drop(backend);
339
340 let _result = send(
341 &mut socket,
342 MessageKind::Repository(RepositoryMessage {
343 target: repository.target.clone(),
344 command: RepositoryMessageKind::Response(
345 RepositoryResponse::IssueLabels(response),
346 ),
347 }),
348 )
349 .await;
350
351 continue;
352 }
353 RepositoryRequest::Issues(request) => {
354 let request = request.validate().await.unwrap();
355
356 let mut backend = backend.lock().await;
357 let response = backend.issues(&request);
358
359 let response = match response {
360 Ok(response) => response,
361 Err(err) => {
362 error!("Error handling request: {:?}", err);
363 continue;
364 }
365 };
366 drop(backend);
367
368 let _result = send(
369 &mut socket,
370 MessageKind::Repository(RepositoryMessage {
371 target: repository.target.clone(),
372 command: RepositoryMessageKind::Response(
373 RepositoryResponse::Issues(response),
374 ),
375 }),
376 )
377 .await;
378
379 continue;
380 }
381 },
382 RepositoryMessageKind::Response(_response) => {
383 unreachable!()
384 }
385 }
386 }
387 }
388
389 if let MessageKind::Authentication(authentication) = &message {
390 match authentication {
391 AuthenticationMessage::Request(request) => match request {
392 AuthenticationRequest::AuthenticationToken(token) => {
393 let mut granter = auth_granter.lock().await;
394
395 let response = granter.token_request(token.clone()).await.unwrap();
396 drop(granter);
397
398 let _result = send(
399 &mut socket,
400 MessageKind::Authentication(AuthenticationMessage::Response(
401 AuthenticationResponse::AuthenticationToken(response),
402 )),
403 )
404 .await;
405
406 continue;
407 }
408 AuthenticationRequest::TokenExtension(request) => {
409 let mut granter = auth_granter.lock().await;
410
411 let response = granter
412 .extension_request(request.clone())
413 .await
414 .unwrap_or(TokenExtensionResponse { new_token: None });
415 drop(granter);
416
417 let _result = send(
418 &mut socket,
419 MessageKind::Authentication(AuthenticationMessage::Response(
420 AuthenticationResponse::TokenExtension(response),
421 )),
422 )
423 .await;
424
425 continue;
426 }
427 AuthenticationRequest::RegisterAccount(request) => {
428 let request = request.inner().await.clone();
429
430 let mut user_backend = user_backend.lock().await;
431
432 let response = user_backend.register(request.clone()).await.unwrap();
433 drop(user_backend);
434
435 let _result = send(
436 &mut socket,
437 MessageKind::Authentication(AuthenticationMessage::Response(
438 AuthenticationResponse::RegisterAccount(response),
439 )),
440 )
441 .await;
442
443 continue;
444 }
445 },
446 AuthenticationMessage::Response(_) => unreachable!(),
447 }
448 }
449
450 if let MessageKind::Discovery(message) = &message {
451 let mut backend = discovery_backend.lock().await;
452 backend.try_handle(message).await.unwrap();
453
454 continue;
455 }
456 }
457
458 info!("Connection closed");
459 }
460
461 async fn send_and_get_listener(
462 message: MessageKind,
463 listeners: &Arc<Mutex<Listeners>>,
464 connections: &Arc<Mutex<Connections>>,
465 ) -> Receiver<MessageKind> {
466 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
467 match &message {
468 MessageKind::Handshake(_) => {
469 todo!()
470 }
471 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
472 MessageKind::Authentication(_) => todo!(),
473 MessageKind::Discovery(_) => todo!(),
474 };
475
476 let target = match (&instance, &user, &repository) {
477 (Some(instance), _, _) => instance.clone(),
478 (_, Some(user), _) => user.instance.clone(),
479 (_, _, Some(repository)) => repository.instance.clone(),
480 _ => unreachable!(),
481 };
482
483 let mut listeners = listeners.lock().await;
484 let listener = listeners.add(instance, user, repository);
485 drop(listeners);
486
487 let connections = connections.lock().await;
488
489 if let Some(connection) = connections.instance_connections.get(&target) {
490 if let Err(_) = connection.sender.send(message) {
491 error!("Error sending message.");
492 }
493 } else {
494 error!("Unable to message {}, this is a bug.", target.url);
495
496 panic!();
497 }
498
499 drop(connections);
500
501 listener
502 }
503
504 async fn send<T: Serialize>(
505 socket: &mut WebSocketStream<TcpStream>,
506 message: T,
507 ) -> Result<(), Error> {
508 socket
509 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
510 .await?;
511
512 Ok(())
513 }
514