JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Fixes

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨8a111d7

⁨src/connection.rs⁩ - ⁨16064⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt};
4 use tokio::{
5 net::TcpStream,
6 sync::{
7 broadcast::{Receiver, Sender},
8 Mutex,
9 },
10 task::JoinHandle,
11 };
12 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
13
14 use crate::{
15 backend::{IssuesBackend, RepositoryBackend},
16 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
17 listener::Listeners,
18 messages::{
19 repository::{
20 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
21 },
22 MessageKind,
23 },
24 model::{
25 instance::{Instance, InstanceMeta},
26 repository::Repository,
27 user::User,
28 },
29 };
30
31 pub struct RawConnection {
32 pub task: JoinHandle<()>,
33 }
34
35 pub struct InstanceConnection {
36 pub instance: InstanceMeta,
37 pub sender: Sender<MessageKind>,
38 pub task: JoinHandle<()>,
39 }
40
41 /// Represents a connection which hasn't finished the handshake.
42 pub struct UnestablishedConnection {
43 pub socket: WebSocketStream<TcpStream>,
44 }
45
46 #[derive(Default)]
47 pub struct Connections {
48 pub connections: Vec<RawConnection>,
49 pub instance_connections: HashMap<Instance, InstanceConnection>,
50 }
51
52 pub async fn connection_worker(
53 mut socket: WebSocketStream<TcpStream>,
54 listeners: Arc<Mutex<Listeners>>,
55 connections: Arc<Mutex<Connections>>,
56 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
57 addr: SocketAddr,
58 ) {
59 let mut handshaked = false;
60 let this_instance = Instance {
61 url: String::from("127.0.0.1:8080"),
62 };
63
64 while let Some(message) = socket.next().await {
65 let message = match message {
66 Ok(message) => message,
67 Err(err) => {
68 error!("Error reading message: {:?}", err);
69 continue;
70 }
71 };
72
73 let payload = match message {
74 Message::Text(text) => text.into_bytes(),
75 Message::Binary(bytes) => bytes,
76 Message::Ping(_) => continue,
77 Message::Pong(_) => continue,
78 Message::Close(_) => {
79 info!("Closing connection with {}.", addr);
80
81 return;
82 }
83 _ => unreachable!(),
84 };
85
86 let message = match serde_json::from_slice::<MessageKind>(&payload) {
87 Ok(message) => message,
88 Err(err) => {
89 error!("Error deserializing message from {}: {:?}", addr, err);
90 continue;
91 }
92 };
93
94 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
95
96 if let MessageKind::Handshake(handshake) = message {
97 match handshake {
98 HandshakeMessage::Initiate(_) => {
99 // Send HandshakeMessage::Response
100 let message = HandshakeResponse {
101 identity: Instance {
102 url: String::from("foo.com"),
103 },
104 version: String::from("0.1.0"),
105 };
106
107 socket
108 .send(Message::Binary(
109 serde_json::to_vec(&MessageKind::Handshake(
110 HandshakeMessage::Response(message),
111 ))
112 .unwrap(),
113 ))
114 .await
115 .unwrap();
116
117 continue;
118 }
119 HandshakeMessage::Response(_) => {
120 // Send HandshakeMessage::Finalize
121 let message = HandshakeFinalize { success: true };
122
123 socket
124 .send(Message::Binary(
125 serde_json::to_vec(&MessageKind::Handshake(
126 HandshakeMessage::Finalize(message),
127 ))
128 .unwrap(),
129 ))
130 .await
131 .unwrap();
132
133 continue;
134 }
135 HandshakeMessage::Finalize(_) => {
136 handshaked = true;
137
138 // Send HandshakeMessage::Finalize
139 let message = HandshakeFinalize { success: true };
140
141 socket
142 .send(Message::Binary(
143 serde_json::to_vec(&MessageKind::Handshake(
144 HandshakeMessage::Finalize(message),
145 ))
146 .unwrap(),
147 ))
148 .await
149 .unwrap();
150
151 continue;
152 }
153 }
154 }
155
156 if !handshaked {
157 continue;
158 }
159
160 if let MessageKind::Repository(repository) = &message {
161 if repository.target.instance != this_instance {
162 info!("Forwarding command to {}", repository.target.instance.url);
163 // We need to send this command to a different instance
164
165 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
166
167 // Wait for response
168 while let Ok(message) = listener.recv().await {
169 if let MessageKind::Repository(RepositoryMessage {
170 command: RepositoryMessageKind::Response(_),
171 ..
172 }) = message
173 {
174 socket
175 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
176 .await
177 .unwrap();
178 }
179 }
180 } else {
181 // This message is targeting this instance
182 match &repository.command {
183 RepositoryMessageKind::Request(request) => match request {
184 RepositoryRequest::CreateRepository(request) => {
185 let mut backend = backend.lock().await;
186 let response = backend.create_repository(request);
187
188 let response = match response {
189 Ok(response) => response,
190 Err(err) => {
191 error!("Error handling request: {:?}", err);
192 continue;
193 }
194 };
195 drop(backend);
196
197 socket
198 .send(Message::Binary(
199 serde_json::to_vec(&MessageKind::Repository(
200 RepositoryMessage {
201 target: repository.target.clone(),
202 command: RepositoryMessageKind::Response(
203 RepositoryResponse::CreateRepository(response),
204 ),
205 },
206 ))
207 .unwrap(),
208 ))
209 .await
210 .unwrap();
211 }
212 RepositoryRequest::RepositoryFileInspect(request) => {
213 let mut backend = backend.lock().await;
214 let response = backend.repository_file_inspect(request);
215
216 let response = match response {
217 Ok(response) => response,
218 Err(err) => {
219 error!("Error handling request: {:?}", err);
220 continue;
221 }
222 };
223 drop(backend);
224
225 socket
226 .send(Message::Binary(
227 serde_json::to_vec(&MessageKind::Repository(
228 RepositoryMessage {
229 target: repository.target.clone(),
230 command: RepositoryMessageKind::Response(
231 RepositoryResponse::RepositoryFileInspection(
232 response,
233 ),
234 ),
235 },
236 ))
237 .unwrap(),
238 ))
239 .await
240 .unwrap();
241 }
242 RepositoryRequest::RepositoryInfo(request) => {
243 let mut backend = backend.lock().await;
244 let response = backend.repository_info(request);
245
246 let response = match response {
247 Ok(response) => response,
248 Err(err) => {
249 error!("Error handling request: {:?}", err);
250 continue;
251 }
252 };
253 drop(backend);
254
255 socket
256 .send(Message::Binary(
257 serde_json::to_vec(&MessageKind::Repository(
258 RepositoryMessage {
259 target: repository.target.clone(),
260 command: RepositoryMessageKind::Response(
261 RepositoryResponse::RepositoryInfo(response),
262 ),
263 },
264 ))
265 .unwrap(),
266 ))
267 .await
268 .unwrap();
269 }
270 RepositoryRequest::IssuesCount(request) => {
271 let mut backend = backend.lock().await;
272 let response = backend.issues_count(request);
273
274 let response = match response {
275 Ok(response) => response,
276 Err(err) => {
277 error!("Error handling request: {:?}", err);
278 continue;
279 }
280 };
281 drop(backend);
282
283 socket
284 .send(Message::Binary(
285 serde_json::to_vec(&MessageKind::Repository(
286 RepositoryMessage {
287 target: repository.target.clone(),
288 command: RepositoryMessageKind::Response(
289 RepositoryResponse::IssuesCount(response),
290 ),
291 },
292 ))
293 .unwrap(),
294 ))
295 .await
296 .unwrap();
297 }
298 RepositoryRequest::IssueLabels(request) => {
299 let mut backend = backend.lock().await;
300 let response = backend.issue_labels(request);
301
302 let response = match response {
303 Ok(response) => response,
304 Err(err) => {
305 error!("Error handling request: {:?}", err);
306 continue;
307 }
308 };
309 drop(backend);
310 socket
311 .send(Message::Binary(
312 serde_json::to_vec(&MessageKind::Repository(
313 RepositoryMessage {
314 target: repository.target.clone(),
315 command: RepositoryMessageKind::Response(
316 RepositoryResponse::IssueLabels(response),
317 ),
318 },
319 ))
320 .unwrap(),
321 ))
322 .await
323 .unwrap();
324 }
325 RepositoryRequest::Issues(request) => {
326 let mut backend = backend.lock().await;
327 let response = backend.issues(request);
328
329 let response = match response {
330 Ok(response) => response,
331 Err(err) => {
332 error!("Error handling request: {:?}", err);
333 continue;
334 }
335 };
336 drop(backend);
337
338 socket
339 .send(Message::Binary(
340 serde_json::to_vec(&MessageKind::Repository(
341 RepositoryMessage {
342 target: repository.target.clone(),
343 command: RepositoryMessageKind::Response(
344 RepositoryResponse::Issues(response),
345 ),
346 },
347 ))
348 .unwrap(),
349 ))
350 .await
351 .unwrap();
352 }
353 },
354 RepositoryMessageKind::Response(_response) => {
355 unreachable!()
356 }
357 }
358 }
359 }
360 }
361
362 info!("Connection closed");
363 }
364
365 async fn send_and_get_listener(
366 message: MessageKind,
367 listeners: &Arc<Mutex<Listeners>>,
368 connections: &Arc<Mutex<Connections>>,
369 ) -> Receiver<MessageKind> {
370 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
371 match &message {
372 MessageKind::Handshake(_) => {
373 todo!()
374 }
375 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
376 };
377
378 let target = match (&instance, &user, &repository) {
379 (Some(instance), _, _) => instance.clone(),
380 (_, Some(user), _) => user.instance.clone(),
381 (_, _, Some(repository)) => repository.instance.clone(),
382 _ => unreachable!(),
383 };
384
385 let mut listeners = listeners.lock().await;
386 let listener = listeners.add(instance, user, repository);
387 drop(listeners);
388
389 let connections = connections.lock().await;
390
391 if let Some(connection) = connections.instance_connections.get(&target) {
392 connection.sender.send(message);
393 } else {
394 error!("Unable to message {}, this is a bug.", target.url);
395
396 panic!();
397 }
398
399 drop(connections);
400
401 listener
402 }
403