JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add docs

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨51aad53

⁨src/connection.rs⁩ - ⁨17875⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use anyhow::Error;
4 use futures_util::{stream::StreamExt, SinkExt};
5 use serde::Serialize;
6 use tokio::{
7 net::TcpStream,
8 sync::{
9 broadcast::{Receiver, Sender},
10 Mutex,
11 },
12 task::JoinHandle,
13 };
14 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
15
16 use crate::{
17 authentication::AuthenticationTokenGranter,
18 backend::{IssuesBackend, RepositoryBackend, UserBackend},
19 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
20 listener::Listeners,
21 messages::{
22 authentication::{
23 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
24 TokenExtensionResponse,
25 },
26 repository::{
27 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
28 },
29 MessageKind,
30 },
31 model::{
32 instance::{Instance, InstanceMeta},
33 repository::Repository,
34 user::User,
35 },
36 };
37
38 pub struct RawConnection {
39 pub task: JoinHandle<()>,
40 }
41
42 pub struct InstanceConnection {
43 pub instance: InstanceMeta,
44 pub sender: Sender<MessageKind>,
45 pub task: JoinHandle<()>,
46 }
47
48 /// Represents a connection which hasn't finished the handshake.
49 pub struct UnestablishedConnection {
50 pub socket: WebSocketStream<TcpStream>,
51 }
52
53 #[derive(Default)]
54 pub struct Connections {
55 pub connections: Vec<RawConnection>,
56 pub instance_connections: HashMap<Instance, InstanceConnection>,
57 }
58
59 pub async fn connection_worker(
60 mut socket: WebSocketStream<TcpStream>,
61 listeners: Arc<Mutex<Listeners>>,
62 connections: Arc<Mutex<Connections>>,
63 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
64 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
65 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
66 addr: SocketAddr,
67 ) {
68 let mut handshaked = false;
69 let this_instance = Instance {
70 url: String::from("giterated.dev"),
71 };
72
73 while let Some(message) = socket.next().await {
74 let message = match message {
75 Ok(message) => message,
76 Err(err) => {
77 error!("Error reading message: {:?}", err);
78 continue;
79 }
80 };
81
82 let payload = match message {
83 Message::Text(text) => text.into_bytes(),
84 Message::Binary(bytes) => bytes,
85 Message::Ping(_) => continue,
86 Message::Pong(_) => continue,
87 Message::Close(_) => {
88 info!("Closing connection with {}.", addr);
89
90 return;
91 }
92 _ => unreachable!(),
93 };
94
95 let message = match serde_json::from_slice::<MessageKind>(&payload) {
96 Ok(message) => message,
97 Err(err) => {
98 error!("Error deserializing message from {}: {:?}", addr, err);
99 continue;
100 }
101 };
102
103 // info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
104
105 if let MessageKind::Handshake(handshake) = message {
106 match handshake {
107 HandshakeMessage::Initiate(_) => {
108 // Send HandshakeMessage::Response
109 let message = HandshakeResponse {
110 identity: Instance {
111 url: String::from("foo.com"),
112 },
113 version: String::from("0.1.0"),
114 };
115
116 let _result = send(
117 &mut socket,
118 MessageKind::Handshake(HandshakeMessage::Response(message)),
119 )
120 .await;
121
122 continue;
123 }
124 HandshakeMessage::Response(_) => {
125 // Send HandshakeMessage::Finalize
126 let message = HandshakeFinalize { success: true };
127
128 let _result = send(
129 &mut socket,
130 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
131 )
132 .await;
133
134 continue;
135 }
136 HandshakeMessage::Finalize(_) => {
137 handshaked = true;
138
139 // Send HandshakeMessage::Finalize
140 let message = HandshakeFinalize { success: true };
141
142 let _result = send(
143 &mut socket,
144 MessageKind::Handshake(HandshakeMessage::Finalize(message)),
145 )
146 .await;
147
148 continue;
149 }
150 }
151 }
152
153 if !handshaked {
154 continue;
155 }
156
157 if let MessageKind::Repository(repository) = &message {
158 if repository.target.instance != this_instance {
159 info!("Forwarding command to {}", repository.target.instance.url);
160 // We need to send this command to a different instance
161
162 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
163
164 // Wait for response
165 while let Ok(message) = listener.recv().await {
166 if let MessageKind::Repository(RepositoryMessage {
167 command: RepositoryMessageKind::Response(_),
168 ..
169 }) = message
170 {
171 let _result = send(&mut socket, message).await;
172 }
173 }
174 continue;
175 } else {
176 // This message is targeting this instance
177 match &repository.command {
178 RepositoryMessageKind::Request(request) => match request.clone() {
179 RepositoryRequest::CreateRepository(request) => {
180 let mut backend = backend.lock().await;
181 let request = request.validate().await.unwrap();
182 let response = backend.create_repository(&request).await;
183
184 let response = match response {
185 Ok(response) => response,
186 Err(err) => {
187 error!("Error handling request: {:?}", err);
188 continue;
189 }
190 };
191 drop(backend);
192
193 let _result = send(
194 &mut socket,
195 MessageKind::Repository(RepositoryMessage {
196 target: repository.target.clone(),
197 command: RepositoryMessageKind::Response(
198 RepositoryResponse::CreateRepository(response),
199 ),
200 }),
201 )
202 .await;
203
204 continue;
205 }
206 RepositoryRequest::RepositoryFileInspect(request) => {
207 let mut backend = backend.lock().await;
208 let request = request.validate().await.unwrap();
209 let response = backend.repository_file_inspect(&request);
210
211 let response = match response {
212 Ok(response) => response,
213 Err(err) => {
214 error!("Error handling request: {:?}", err);
215 continue;
216 }
217 };
218 drop(backend);
219
220 let _result = send(
221 &mut socket,
222 MessageKind::Repository(RepositoryMessage {
223 target: repository.target.clone(),
224 command: RepositoryMessageKind::Response(
225 RepositoryResponse::RepositoryFileInspection(response),
226 ),
227 }),
228 )
229 .await;
230
231 continue;
232 }
233 RepositoryRequest::RepositoryInfo(request) => {
234 let mut backend = backend.lock().await;
235 let request = request.validate().await.unwrap();
236 let response = backend.repository_info(&request).await;
237
238 let response = match response {
239 Ok(response) => response,
240 Err(err) => {
241 error!("Error handling request: {:?}", err);
242 continue;
243 }
244 };
245 drop(backend);
246
247 let _result = send(
248 &mut socket,
249 MessageKind::Repository(RepositoryMessage {
250 target: repository.target.clone(),
251 command: RepositoryMessageKind::Response(
252 RepositoryResponse::RepositoryInfo(response),
253 ),
254 }),
255 )
256 .await;
257
258 continue;
259 }
260 RepositoryRequest::IssuesCount(request) => {
261 let request = &request.validate().await.unwrap();
262
263 let mut backend = backend.lock().await;
264 let response = backend.issues_count(request);
265
266 let response = match response {
267 Ok(response) => response,
268 Err(err) => {
269 error!("Error handling request: {:?}", err);
270 continue;
271 }
272 };
273 drop(backend);
274
275 let _result = send(
276 &mut socket,
277 MessageKind::Repository(RepositoryMessage {
278 target: repository.target.clone(),
279 command: RepositoryMessageKind::Response(
280 RepositoryResponse::IssuesCount(response),
281 ),
282 }),
283 )
284 .await;
285
286 continue;
287 }
288 RepositoryRequest::IssueLabels(request) => {
289 let request = &request.validate().await.unwrap();
290
291 let mut backend = backend.lock().await;
292 let response = backend.issue_labels(request);
293
294 let response = match response {
295 Ok(response) => response,
296 Err(err) => {
297 error!("Error handling request: {:?}", err);
298 continue;
299 }
300 };
301 drop(backend);
302
303 let _result = send(
304 &mut socket,
305 MessageKind::Repository(RepositoryMessage {
306 target: repository.target.clone(),
307 command: RepositoryMessageKind::Response(
308 RepositoryResponse::IssueLabels(response),
309 ),
310 }),
311 )
312 .await;
313
314 continue;
315 }
316 RepositoryRequest::Issues(request) => {
317 let request = request.validate().await.unwrap();
318
319 let mut backend = backend.lock().await;
320 let response = backend.issues(&request);
321
322 let response = match response {
323 Ok(response) => response,
324 Err(err) => {
325 error!("Error handling request: {:?}", err);
326 continue;
327 }
328 };
329 drop(backend);
330
331 let _result = send(
332 &mut socket,
333 MessageKind::Repository(RepositoryMessage {
334 target: repository.target.clone(),
335 command: RepositoryMessageKind::Response(
336 RepositoryResponse::Issues(response),
337 ),
338 }),
339 )
340 .await;
341
342 continue;
343 }
344 },
345 RepositoryMessageKind::Response(_response) => {
346 unreachable!()
347 }
348 }
349 }
350 }
351
352 if let MessageKind::Authentication(authentication) = &message {
353 match authentication {
354 AuthenticationMessage::Request(request) => match request {
355 AuthenticationRequest::AuthenticationToken(token) => {
356 let mut granter = auth_granter.lock().await;
357
358 let response = granter.token_request(token.clone()).await.unwrap();
359 drop(granter);
360
361 let _result = send(
362 &mut socket,
363 MessageKind::Authentication(AuthenticationMessage::Response(
364 AuthenticationResponse::AuthenticationToken(response),
365 )),
366 )
367 .await;
368
369 continue;
370 }
371 AuthenticationRequest::TokenExtension(request) => {
372 let mut granter = auth_granter.lock().await;
373
374 let response = granter
375 .extension_request(request.clone())
376 .await
377 .unwrap_or(TokenExtensionResponse { new_token: None });
378 drop(granter);
379
380 let _result = send(
381 &mut socket,
382 MessageKind::Authentication(AuthenticationMessage::Response(
383 AuthenticationResponse::TokenExtension(response),
384 )),
385 )
386 .await;
387
388 continue;
389 }
390 AuthenticationRequest::RegisterAccount(request) => {
391 let request = request.inner().await.clone();
392
393 let mut user_backend = user_backend.lock().await;
394
395 let response = user_backend.register(request.clone()).await.unwrap();
396 drop(user_backend);
397
398 let _result = send(
399 &mut socket,
400 MessageKind::Authentication(AuthenticationMessage::Response(
401 AuthenticationResponse::RegisterAccount(response),
402 )),
403 )
404 .await;
405
406 continue;
407 }
408 },
409 AuthenticationMessage::Response(_) => unreachable!(),
410 }
411 }
412 }
413
414 info!("Connection closed");
415 }
416
417 async fn send_and_get_listener(
418 message: MessageKind,
419 listeners: &Arc<Mutex<Listeners>>,
420 connections: &Arc<Mutex<Connections>>,
421 ) -> Receiver<MessageKind> {
422 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
423 match &message {
424 MessageKind::Handshake(_) => {
425 todo!()
426 }
427 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
428 MessageKind::Authentication(_) => todo!(),
429 };
430
431 let target = match (&instance, &user, &repository) {
432 (Some(instance), _, _) => instance.clone(),
433 (_, Some(user), _) => user.instance.clone(),
434 (_, _, Some(repository)) => repository.instance.clone(),
435 _ => unreachable!(),
436 };
437
438 let mut listeners = listeners.lock().await;
439 let listener = listeners.add(instance, user, repository);
440 drop(listeners);
441
442 let connections = connections.lock().await;
443
444 if let Some(connection) = connections.instance_connections.get(&target) {
445 connection.sender.send(message);
446 } else {
447 error!("Unable to message {}, this is a bug.", target.url);
448
449 panic!();
450 }
451
452 drop(connections);
453
454 listener
455 }
456
457 async fn send<T: Serialize>(
458 socket: &mut WebSocketStream<TcpStream>,
459 message: T,
460 ) -> Result<(), Error> {
461 socket
462 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
463 .await?;
464
465 Ok(())
466 }
467