JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Changes

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨0b2a26d

⁨src/connection.rs⁩ - ⁨16182⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt};
4 use tokio::{
5 net::TcpStream,
6 sync::{
7 broadcast::{Receiver, Sender},
8 Mutex,
9 },
10 task::JoinHandle,
11 };
12 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
13
14 use crate::{
15 authentication::AuthenticationTokenGranter,
16 backend::{IssuesBackend, RepositoryBackend},
17 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse},
18 listener::Listeners,
19 messages::{
20 repository::{
21 RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
22 },
23 MessageKind,
24 },
25 model::{
26 instance::{Instance, InstanceMeta},
27 repository::Repository,
28 user::User,
29 },
30 };
31
32 pub struct RawConnection {
33 pub task: JoinHandle<()>,
34 }
35
36 pub struct InstanceConnection {
37 pub instance: InstanceMeta,
38 pub sender: Sender<MessageKind>,
39 pub task: JoinHandle<()>,
40 }
41
42 /// Represents a connection which hasn't finished the handshake.
43 pub struct UnestablishedConnection {
44 pub socket: WebSocketStream<TcpStream>,
45 }
46
47 #[derive(Default)]
48 pub struct Connections {
49 pub connections: Vec<RawConnection>,
50 pub instance_connections: HashMap<Instance, InstanceConnection>,
51 }
52
53 pub async fn connection_worker(
54 mut socket: WebSocketStream<TcpStream>,
55 listeners: Arc<Mutex<Listeners>>,
56 connections: Arc<Mutex<Connections>>,
57 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
58 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
59 addr: SocketAddr,
60 ) {
61 let mut handshaked = false;
62 let this_instance = Instance {
63 url: String::from("127.0.0.1:8080"),
64 };
65
66 while let Some(message) = socket.next().await {
67 let message = match message {
68 Ok(message) => message,
69 Err(err) => {
70 error!("Error reading message: {:?}", err);
71 continue;
72 }
73 };
74
75 let payload = match message {
76 Message::Text(text) => text.into_bytes(),
77 Message::Binary(bytes) => bytes,
78 Message::Ping(_) => continue,
79 Message::Pong(_) => continue,
80 Message::Close(_) => {
81 info!("Closing connection with {}.", addr);
82
83 return;
84 }
85 _ => unreachable!(),
86 };
87
88 let message = match serde_json::from_slice::<MessageKind>(&payload) {
89 Ok(message) => message,
90 Err(err) => {
91 error!("Error deserializing message from {}: {:?}", addr, err);
92 continue;
93 }
94 };
95
96 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
97
98 if let MessageKind::Handshake(handshake) = message {
99 match handshake {
100 HandshakeMessage::Initiate(_) => {
101 // Send HandshakeMessage::Response
102 let message = HandshakeResponse {
103 identity: Instance {
104 url: String::from("foo.com"),
105 },
106 version: String::from("0.1.0"),
107 };
108
109 socket
110 .send(Message::Binary(
111 serde_json::to_vec(&MessageKind::Handshake(
112 HandshakeMessage::Response(message),
113 ))
114 .unwrap(),
115 ))
116 .await
117 .unwrap();
118
119 continue;
120 }
121 HandshakeMessage::Response(_) => {
122 // Send HandshakeMessage::Finalize
123 let message = HandshakeFinalize { success: true };
124
125 socket
126 .send(Message::Binary(
127 serde_json::to_vec(&MessageKind::Handshake(
128 HandshakeMessage::Finalize(message),
129 ))
130 .unwrap(),
131 ))
132 .await
133 .unwrap();
134
135 continue;
136 }
137 HandshakeMessage::Finalize(_) => {
138 handshaked = true;
139
140 // Send HandshakeMessage::Finalize
141 let message = HandshakeFinalize { success: true };
142
143 socket
144 .send(Message::Binary(
145 serde_json::to_vec(&MessageKind::Handshake(
146 HandshakeMessage::Finalize(message),
147 ))
148 .unwrap(),
149 ))
150 .await
151 .unwrap();
152
153 continue;
154 }
155 }
156 }
157
158 if !handshaked {
159 continue;
160 }
161
162 if let MessageKind::Repository(repository) = &message {
163 if repository.target.instance != this_instance {
164 info!("Forwarding command to {}", repository.target.instance.url);
165 // We need to send this command to a different instance
166
167 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
168
169 // Wait for response
170 while let Ok(message) = listener.recv().await {
171 if let MessageKind::Repository(RepositoryMessage {
172 command: RepositoryMessageKind::Response(_),
173 ..
174 }) = message
175 {
176 socket
177 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
178 .await
179 .unwrap();
180 }
181 }
182 } else {
183 // This message is targeting this instance
184 match &repository.command {
185 RepositoryMessageKind::Request(request) => match request {
186 RepositoryRequest::CreateRepository(request) => {
187 let mut backend = backend.lock().await;
188 let response = backend.create_repository(request).await;
189
190 let response = match response {
191 Ok(response) => response,
192 Err(err) => {
193 error!("Error handling request: {:?}", err);
194 continue;
195 }
196 };
197 drop(backend);
198
199 socket
200 .send(Message::Binary(
201 serde_json::to_vec(&MessageKind::Repository(
202 RepositoryMessage {
203 target: repository.target.clone(),
204 command: RepositoryMessageKind::Response(
205 RepositoryResponse::CreateRepository(response),
206 ),
207 },
208 ))
209 .unwrap(),
210 ))
211 .await
212 .unwrap();
213 }
214 RepositoryRequest::RepositoryFileInspect(request) => {
215 let mut backend = backend.lock().await;
216 let response = backend.repository_file_inspect(request);
217
218 let response = match response {
219 Ok(response) => response,
220 Err(err) => {
221 error!("Error handling request: {:?}", err);
222 continue;
223 }
224 };
225 drop(backend);
226
227 socket
228 .send(Message::Binary(
229 serde_json::to_vec(&MessageKind::Repository(
230 RepositoryMessage {
231 target: repository.target.clone(),
232 command: RepositoryMessageKind::Response(
233 RepositoryResponse::RepositoryFileInspection(
234 response,
235 ),
236 ),
237 },
238 ))
239 .unwrap(),
240 ))
241 .await
242 .unwrap();
243 }
244 RepositoryRequest::RepositoryInfo(request) => {
245 let mut backend = backend.lock().await;
246 let response = backend.repository_info(request).await;
247
248 let response = match response {
249 Ok(response) => response,
250 Err(err) => {
251 error!("Error handling request: {:?}", err);
252 continue;
253 }
254 };
255 drop(backend);
256
257 socket
258 .send(Message::Binary(
259 serde_json::to_vec(&MessageKind::Repository(
260 RepositoryMessage {
261 target: repository.target.clone(),
262 command: RepositoryMessageKind::Response(
263 RepositoryResponse::RepositoryInfo(response),
264 ),
265 },
266 ))
267 .unwrap(),
268 ))
269 .await
270 .unwrap();
271 }
272 RepositoryRequest::IssuesCount(request) => {
273 let mut backend = backend.lock().await;
274 let response = backend.issues_count(request);
275
276 let response = match response {
277 Ok(response) => response,
278 Err(err) => {
279 error!("Error handling request: {:?}", err);
280 continue;
281 }
282 };
283 drop(backend);
284
285 socket
286 .send(Message::Binary(
287 serde_json::to_vec(&MessageKind::Repository(
288 RepositoryMessage {
289 target: repository.target.clone(),
290 command: RepositoryMessageKind::Response(
291 RepositoryResponse::IssuesCount(response),
292 ),
293 },
294 ))
295 .unwrap(),
296 ))
297 .await
298 .unwrap();
299 }
300 RepositoryRequest::IssueLabels(request) => {
301 let mut backend = backend.lock().await;
302 let response = backend.issue_labels(request);
303
304 let response = match response {
305 Ok(response) => response,
306 Err(err) => {
307 error!("Error handling request: {:?}", err);
308 continue;
309 }
310 };
311 drop(backend);
312 socket
313 .send(Message::Binary(
314 serde_json::to_vec(&MessageKind::Repository(
315 RepositoryMessage {
316 target: repository.target.clone(),
317 command: RepositoryMessageKind::Response(
318 RepositoryResponse::IssueLabels(response),
319 ),
320 },
321 ))
322 .unwrap(),
323 ))
324 .await
325 .unwrap();
326 }
327 RepositoryRequest::Issues(request) => {
328 let mut backend = backend.lock().await;
329 let response = backend.issues(request);
330
331 let response = match response {
332 Ok(response) => response,
333 Err(err) => {
334 error!("Error handling request: {:?}", err);
335 continue;
336 }
337 };
338 drop(backend);
339
340 socket
341 .send(Message::Binary(
342 serde_json::to_vec(&MessageKind::Repository(
343 RepositoryMessage {
344 target: repository.target.clone(),
345 command: RepositoryMessageKind::Response(
346 RepositoryResponse::Issues(response),
347 ),
348 },
349 ))
350 .unwrap(),
351 ))
352 .await
353 .unwrap();
354 }
355 },
356 RepositoryMessageKind::Response(_response) => {
357 unreachable!()
358 }
359 }
360 }
361 }
362 }
363
364 info!("Connection closed");
365 }
366
367 async fn send_and_get_listener(
368 message: MessageKind,
369 listeners: &Arc<Mutex<Listeners>>,
370 connections: &Arc<Mutex<Connections>>,
371 ) -> Receiver<MessageKind> {
372 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
373 match &message {
374 MessageKind::Handshake(_) => {
375 todo!()
376 }
377 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
378 };
379
380 let target = match (&instance, &user, &repository) {
381 (Some(instance), _, _) => instance.clone(),
382 (_, Some(user), _) => user.instance.clone(),
383 (_, _, Some(repository)) => repository.instance.clone(),
384 _ => unreachable!(),
385 };
386
387 let mut listeners = listeners.lock().await;
388 let listener = listeners.add(instance, user, repository);
389 drop(listeners);
390
391 let connections = connections.lock().await;
392
393 if let Some(connection) = connections.instance_connections.get(&target) {
394 connection.sender.send(message);
395 } else {
396 error!("Unable to message {}, this is a bug.", target.url);
397
398 panic!();
399 }
400
401 drop(connections);
402
403 listener
404 }
405