JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Handshake

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨e3e5bf5

⁨src/connection.rs⁩ - ⁨19460⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc, str::FromStr};
2
3 use anyhow::Error;
4 use futures_util::{stream::StreamExt, SinkExt};
5 use semver::{Version, VersionReq};
6 use serde::Serialize;
7 use tokio::{
8 net::TcpStream,
9 sync::{
10 broadcast::{Receiver, Sender},
11 Mutex,
12 },
13 task::JoinHandle,
14 };
15 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
16
17 use crate::{
18 authentication::AuthenticationTokenGranter,
19 backend::{DiscoveryBackend, IssuesBackend, RepositoryBackend, UserBackend},
20 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
21 listener::Listeners,
22 messages::{
23 authentication::{
24 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
25 TokenExtensionResponse,
26 },
27 repository::{
28 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
29 },
30 MessageKind,
31 },
32 model::{
33 instance::{Instance, InstanceMeta},
34 repository::Repository,
35 user::User,
36 },
37 };
38
39 pub struct RawConnection {
40 pub task: JoinHandle<()>,
41 }
42
43 pub struct InstanceConnection {
44 pub instance: InstanceMeta,
45 pub sender: Sender<MessageKind>,
46 pub task: JoinHandle<()>,
47 }
48
49 /// Represents a connection which hasn't finished the handshake.
50 pub struct UnestablishedConnection {
51 pub socket: WebSocketStream<TcpStream>,
52 }
53
54 #[derive(Default)]
55 pub struct Connections {
56 pub connections: Vec<RawConnection>,
57 pub instance_connections: HashMap<Instance, InstanceConnection>,
58 }
59
60 pub async fn connection_worker(
61 mut socket: WebSocketStream<TcpStream>,
62 listeners: Arc<Mutex<Listeners>>,
63 connections: Arc<Mutex<Connections>>,
64 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
65 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
66 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
67 discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
68 addr: SocketAddr,
69 ) {
70 let mut handshaked = false;
71 let this_instance = Instance {
72 url: String::from("giterated.dev"),
73 };
74
75 while let Some(message) = socket.next().await {
76 let message = match message {
77 Ok(message) => message,
78 Err(err) => {
79 error!("Error reading message: {:?}", err);
80 continue;
81 }
82 };
83
84 let payload = match message {
85 Message::Text(text) => text.into_bytes(),
86 Message::Binary(bytes) => bytes,
87 Message::Ping(_) => continue,
88 Message::Pong(_) => continue,
89 Message::Close(_) => {
90 info!("Closing connection with {}.", addr);
91
92 return;
93 }
94 _ => unreachable!(),
95 };
96
97 let message = match serde_json::from_slice::<MessageKind>(&payload) {
98 Ok(message) => message,
99 Err(err) => {
100 error!("Error deserializing message from {}: {:?}", addr, err);
101 continue;
102 }
103 };
104
105 // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
106
107 if let MessageKind::Handshake(handshake) = message {
108 match handshake {
109 HandshakeMessage::Initiate(request) => {
110 // Send HandshakeMessage::Response
111 let message = HandshakeResponse {
112 identity: this_instance.clone(),
113 version: Version::from_str(&env!("CARGO_PKG_VERSION")).unwrap(),
114 };
115
116 let version_check = VersionReq::from_str("=0.0.1").unwrap();
117
118 let result = if !version_check.matches( &request.version) {
119 error!("Version compatibility failure! Our Version: {}, Their Version: {}", Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap()).unwrap(), request.version);
120
121 send(
122 &mut socket,
123 MessageKind::Handshake(HandshakeMessage::Finalize(HandshakeFinalize { success: false })),
124 ).await
125 } else {
126 send(
127 &mut socket,
128 MessageKind::Handshake(HandshakeMessage::Response(message)),
129 )
130 .await
131 };
132
133 continue;
134 }
135 HandshakeMessage::Response(response) => {
136 // Check version
137 let version_check = VersionReq::from_str("=0.0.1").unwrap();
138
139 let message = if !version_check.matches( &response.version) {
140 error!("Version compatibility failure! Our Version: {}, Their Version: {}", Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap()).unwrap(), response.version);
141
142 HandshakeFinalize { success: false }
143 } else {
144 info!("Connected with a compatible version");
145
146 HandshakeFinalize { success: true }
147 };
148
149 let _result = send(
150 &mut socket,
151 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
152 )
153 .await;
154
155 continue;
156 }
157 HandshakeMessage::Finalize(_) => {
158 handshaked = true;
159
160 // Send HandshakeMessage::Finalize
161 let message = HandshakeFinalize { success: true };
162
163 let _result = send(
164 &mut socket,
165 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
166 )
167 .await;
168
169 continue;
170 }
171 }
172 }
173
174 if !handshaked {
175 continue;
176 }
177
178 if let MessageKind::Repository(repository) = &message {
179 if repository.target.instance != this_instance {
180 info!("Forwarding command to {}", repository.target.instance.url);
181 // We need to send this command to a different instance
182
183 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
184
185 // Wait for response
186 while let Ok(message) = listener.recv().await {
187 if let MessageKind::Repository(RepositoryMessage {
188 command: RepositoryMessageKind::Response(_),
189 ..
190 }) = message
191 {
192 let _result = send(&mut socket, message).await;
193 }
194 }
195 continue;
196 } else {
197 // This message is targeting this instance
198 match &repository.command {
199 RepositoryMessageKind::Request(request) => match request.clone() {
200 RepositoryRequest::CreateRepository(request) => {
201 let mut backend = backend.lock().await;
202 let request = request.validate().await.unwrap();
203 let response = backend.create_repository(&request).await;
204
205 let response = match response {
206 Ok(response) => response,
207 Err(err) => {
208 error!("Error handling request: {:?}", err);
209 continue;
210 }
211 };
212 drop(backend);
213
214 let _result = send(
215 &mut socket,
216 MessageKind::Repository(RepositoryMessage {
217 target: repository.target.clone(),
218 command: RepositoryMessageKind::Response(
219 RepositoryResponse::CreateRepository(response),
220 ),
221 }),
222 )
223 .await;
224
225 continue;
226 }
227 RepositoryRequest::RepositoryFileInspect(request) => {
228 let mut backend = backend.lock().await;
229 let request = request.validate().await.unwrap();
230 let response = backend.repository_file_inspect(&request);
231
232 let response = match response {
233 Ok(response) => response,
234 Err(err) => {
235 error!("Error handling request: {:?}", err);
236 continue;
237 }
238 };
239 drop(backend);
240
241 let _result = send(
242 &mut socket,
243 MessageKind::Repository(RepositoryMessage {
244 target: repository.target.clone(),
245 command: RepositoryMessageKind::Response(
246 RepositoryResponse::RepositoryFileInspection(response),
247 ),
248 }),
249 )
250 .await;
251
252 continue;
253 }
254 RepositoryRequest::RepositoryInfo(request) => {
255 let mut backend = backend.lock().await;
256 let request = request.validate().await.unwrap();
257 let response = backend.repository_info(&request).await;
258
259 let response = match response {
260 Ok(response) => response,
261 Err(err) => {
262 error!("Error handling request: {:?}", err);
263 continue;
264 }
265 };
266 drop(backend);
267
268 let _result = send(
269 &mut socket,
270 MessageKind::Repository(RepositoryMessage {
271 target: repository.target.clone(),
272 command: RepositoryMessageKind::Response(
273 RepositoryResponse::RepositoryInfo(response),
274 ),
275 }),
276 )
277 .await;
278
279 continue;
280 }
281 RepositoryRequest::IssuesCount(request) => {
282 let request = &request.validate().await.unwrap();
283
284 let mut backend = backend.lock().await;
285 let response = backend.issues_count(request);
286
287 let response = match response {
288 Ok(response) => response,
289 Err(err) => {
290 error!("Error handling request: {:?}", err);
291 continue;
292 }
293 };
294 drop(backend);
295
296 let _result = send(
297 &mut socket,
298 MessageKind::Repository(RepositoryMessage {
299 target: repository.target.clone(),
300 command: RepositoryMessageKind::Response(
301 RepositoryResponse::IssuesCount(response),
302 ),
303 }),
304 )
305 .await;
306
307 continue;
308 }
309 RepositoryRequest::IssueLabels(request) => {
310 let request = &request.validate().await.unwrap();
311
312 let mut backend = backend.lock().await;
313 let response = backend.issue_labels(request);
314
315 let response = match response {
316 Ok(response) => response,
317 Err(err) => {
318 error!("Error handling request: {:?}", err);
319 continue;
320 }
321 };
322 drop(backend);
323
324 let _result = send(
325 &mut socket,
326 MessageKind::Repository(RepositoryMessage {
327 target: repository.target.clone(),
328 command: RepositoryMessageKind::Response(
329 RepositoryResponse::IssueLabels(response),
330 ),
331 }),
332 )
333 .await;
334
335 continue;
336 }
337 RepositoryRequest::Issues(request) => {
338 let request = request.validate().await.unwrap();
339
340 let mut backend = backend.lock().await;
341 let response = backend.issues(&request);
342
343 let response = match response {
344 Ok(response) => response,
345 Err(err) => {
346 error!("Error handling request: {:?}", err);
347 continue;
348 }
349 };
350 drop(backend);
351
352 let _result = send(
353 &mut socket,
354 MessageKind::Repository(RepositoryMessage {
355 target: repository.target.clone(),
356 command: RepositoryMessageKind::Response(
357 RepositoryResponse::Issues(response),
358 ),
359 }),
360 )
361 .await;
362
363 continue;
364 }
365 },
366 RepositoryMessageKind::Response(_response) => {
367 unreachable!()
368 }
369 }
370 }
371 }
372
373 if let MessageKind::Authentication(authentication) = &message {
374 match authentication {
375 AuthenticationMessage::Request(request) => match request {
376 AuthenticationRequest::AuthenticationToken(token) => {
377 let mut granter = auth_granter.lock().await;
378
379 let response = granter.token_request(token.clone()).await.unwrap();
380 drop(granter);
381
382 let _result = send(
383 &mut socket,
384 MessageKind::Authentication(AuthenticationMessage::Response(
385 AuthenticationResponse::AuthenticationToken(response),
386 )),
387 )
388 .await;
389
390 continue;
391 }
392 AuthenticationRequest::TokenExtension(request) => {
393 let mut granter = auth_granter.lock().await;
394
395 let response = granter
396 .extension_request(request.clone())
397 .await
398 .unwrap_or(TokenExtensionResponse { new_token: None });
399 drop(granter);
400
401 let _result = send(
402 &mut socket,
403 MessageKind::Authentication(AuthenticationMessage::Response(
404 AuthenticationResponse::TokenExtension(response),
405 )),
406 )
407 .await;
408
409 continue;
410 }
411 AuthenticationRequest::RegisterAccount(request) => {
412 let request = request.inner().await.clone();
413
414 let mut user_backend = user_backend.lock().await;
415
416 let response = user_backend.register(request.clone()).await.unwrap();
417 drop(user_backend);
418
419 let _result = send(
420 &mut socket,
421 MessageKind::Authentication(AuthenticationMessage::Response(
422 AuthenticationResponse::RegisterAccount(response),
423 )),
424 )
425 .await;
426
427 continue;
428 }
429 },
430 AuthenticationMessage::Response(_) => unreachable!(),
431 }
432 }
433
434 if let MessageKind::Discovery(message) = &message {
435 let mut backend = discovery_backend.lock().await;
436 backend.try_handle(message).await.unwrap();
437
438 continue;
439 }
440 }
441
442 info!("Connection closed");
443 }
444
445 async fn send_and_get_listener(
446 message: MessageKind,
447 listeners: &Arc<Mutex<Listeners>>,
448 connections: &Arc<Mutex<Connections>>,
449 ) -> Receiver<MessageKind> {
450 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
451 match &message {
452 MessageKind::Handshake(_) => {
453 todo!()
454 }
455 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
456 MessageKind::Authentication(_) => todo!(),
457 MessageKind::Discovery(_) => todo!(),
458 };
459
460 let target = match (&instance, &user, &repository) {
461 (Some(instance), _, _) => instance.clone(),
462 (_, Some(user), _) => user.instance.clone(),
463 (_, _, Some(repository)) => repository.instance.clone(),
464 _ => unreachable!(),
465 };
466
467 let mut listeners = listeners.lock().await;
468 let listener = listeners.add(instance, user, repository);
469 drop(listeners);
470
471 let connections = connections.lock().await;
472
473 if let Some(connection) = connections.instance_connections.get(&target) {
474 if let Err(_) = connection.sender.send(message) {
475 error!("Error sending message.");
476 }
477 } else {
478 error!("Unable to message {}, this is a bug.", target.url);
479
480 panic!();
481 }
482
483 drop(connections);
484
485 listener
486 }
487
488 async fn send<T: Serialize>(
489 socket: &mut WebSocketStream<TcpStream>,
490 message: T,
491 ) -> Result<(), Error> {
492 socket
493 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
494 .await?;
495
496 Ok(())
497 }
498