JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add more backend logic

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨5060ca2

⁨src/connection.rs⁩ - ⁨16327⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt, TryStreamExt};
4 use tokio::{
5 io::{AsyncRead, AsyncWrite},
6 net::TcpStream,
7 sync::{
8 broadcast::{Receiver, Sender},
9 Mutex,
10 },
11 task::JoinHandle,
12 };
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 use crate::{
16 backend::{IssuesBackend, RepositoryBackend},
17 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse, InitiateHandshake},
18 listener::Listeners,
19 messages::{
20 issues::IssuesCountResponse,
21 repository::{
22 RepositoryFileInspectionResponse, RepositoryIssueLabelsResponse,
23 RepositoryIssuesResponse, RepositoryMessage, RepositoryMessageKind, RepositoryRequest,
24 RepositoryResponse,
25 },
26 MessageKind,
27 },
28 model::{
29 instance::{Instance, InstanceMeta},
30 repository::{CommitMetadata, Repository, RepositoryView},
31 user::User,
32 },
33 };
34
35 pub struct RawConnection {
36 pub task: JoinHandle<()>,
37 }
38
39 pub struct InstanceConnection {
40 pub instance: InstanceMeta,
41 pub sender: Sender<MessageKind>,
42 pub task: JoinHandle<()>,
43 }
44
45 /// Represents a connection which hasn't finished the handshake.
46 pub struct UnestablishedConnection {
47 pub socket: WebSocketStream<TcpStream>,
48 }
49
50 #[derive(Default)]
51 pub struct Connections {
52 pub connections: Vec<RawConnection>,
53 pub instance_connections: HashMap<Instance, InstanceConnection>,
54 }
55
56 pub async fn connection_worker(
57 mut socket: WebSocketStream<TcpStream>,
58 listeners: Arc<Mutex<Listeners>>,
59 mut connections: Arc<Mutex<Connections>>,
60 backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
61 addr: SocketAddr,
62 ) {
63 let mut handshaked = false;
64 let this_instance = Instance {
65 url: String::from("127.0.0.1:8080"),
66 };
67
68 while let Some(message) = socket.next().await {
69 let message = match message {
70 Ok(message) => message,
71 Err(err) => {
72 error!("Error reading message: {:?}", err);
73 continue;
74 }
75 };
76
77 let payload = match message {
78 Message::Text(text) => text.into_bytes(),
79 Message::Binary(bytes) => bytes,
80 Message::Ping(_) => continue,
81 Message::Pong(_) => continue,
82 Message::Close(_) => {
83 info!("Closing connection with {}.", addr);
84
85 return;
86 }
87 _ => unreachable!(),
88 };
89
90 let message = match serde_json::from_slice::<MessageKind>(&payload) {
91 Ok(message) => message,
92 Err(err) => {
93 error!("Error deserializing message from {}: {:?}", addr, err);
94 continue;
95 }
96 };
97
98 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
99
100 if let MessageKind::Handshake(handshake) = message {
101 match handshake {
102 HandshakeMessage::Initiate(_) => {
103 // Send HandshakeMessage::Response
104 let message = HandshakeResponse {
105 identity: Instance {
106 url: String::from("foo.com"),
107 },
108 version: String::from("0.1.0"),
109 };
110
111 socket
112 .send(Message::Binary(
113 serde_json::to_vec(&MessageKind::Handshake(
114 HandshakeMessage::Response(message),
115 ))
116 .unwrap(),
117 ))
118 .await
119 .unwrap();
120
121 continue;
122 }
123 HandshakeMessage::Response(_) => {
124 // Send HandshakeMessage::Finalize
125 let message = HandshakeFinalize { success: true };
126
127 socket
128 .send(Message::Binary(
129 serde_json::to_vec(&MessageKind::Handshake(
130 HandshakeMessage::Finalize(message),
131 ))
132 .unwrap(),
133 ))
134 .await
135 .unwrap();
136
137 continue;
138 }
139 HandshakeMessage::Finalize(_) => {
140 handshaked = true;
141
142 // Send HandshakeMessage::Finalize
143 let message = HandshakeFinalize { success: true };
144
145 socket
146 .send(Message::Binary(
147 serde_json::to_vec(&MessageKind::Handshake(
148 HandshakeMessage::Finalize(message),
149 ))
150 .unwrap(),
151 ))
152 .await
153 .unwrap();
154
155 continue;
156 }
157 }
158 }
159
160 if !handshaked {
161 continue;
162 }
163
164 if let MessageKind::Repository(repository) = &message {
165 if repository.target.instance != this_instance {
166 info!("Forwarding command to {}", repository.target.instance.url);
167 // We need to send this command to a different instance
168
169 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
170
171 // Wait for response
172 while let Ok(message) = listener.recv().await {
173 if let MessageKind::Repository(RepositoryMessage {
174 command: RepositoryMessageKind::Response(_),
175 ..
176 }) = message
177 {
178 socket
179 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
180 .await
181 .unwrap();
182 }
183 }
184 } else {
185 // This message is targeting this instance
186 match &repository.command {
187 RepositoryMessageKind::Request(request) => match request {
188 RepositoryRequest::CreateRepository(request) => {
189 let mut backend = backend.lock().await;
190 let response = backend.create_repository(request);
191
192 let response = match response {
193 Ok(response) => response,
194 Err(err) => {
195 error!("Error handling request: {:?}", err);
196 continue;
197 }
198 };
199 drop(backend);
200
201 socket
202 .send(Message::Binary(
203 serde_json::to_vec(&MessageKind::Repository(
204 RepositoryMessage {
205 target: repository.target.clone(),
206 command: RepositoryMessageKind::Response(
207 RepositoryResponse::CreateRepository(response),
208 ),
209 },
210 ))
211 .unwrap(),
212 ))
213 .await
214 .unwrap();
215 }
216 RepositoryRequest::RepositoryFileInspect(request) => {
217 let mut backend = backend.lock().await;
218 let response = backend.repository_file_inspect(request);
219
220 let response = match response {
221 Ok(response) => response,
222 Err(err) => {
223 error!("Error handling request: {:?}", err);
224 continue;
225 }
226 };
227 drop(backend);
228
229 socket
230 .send(Message::Binary(
231 serde_json::to_vec(&MessageKind::Repository(
232 RepositoryMessage {
233 target: repository.target.clone(),
234 command: RepositoryMessageKind::Response(
235 RepositoryResponse::RepositoryFileInspection(
236 response,
237 ),
238 ),
239 },
240 ))
241 .unwrap(),
242 ))
243 .await
244 .unwrap();
245 }
246 RepositoryRequest::RepositoryInfo(request) => {
247 let mut backend = backend.lock().await;
248 let response = backend.repository_info(request);
249
250 let response = match response {
251 Ok(response) => response,
252 Err(err) => {
253 error!("Error handling request: {:?}", err);
254 continue;
255 }
256 };
257 drop(backend);
258
259 socket
260 .send(Message::Binary(
261 serde_json::to_vec(&MessageKind::Repository(
262 RepositoryMessage {
263 target: repository.target.clone(),
264 command: RepositoryMessageKind::Response(
265 RepositoryResponse::RepositoryInfo(response),
266 ),
267 },
268 ))
269 .unwrap(),
270 ))
271 .await
272 .unwrap();
273 }
274 RepositoryRequest::IssuesCount(request) => {
275 let mut backend = backend.lock().await;
276 let response = backend.issues_count(request);
277
278 let response = match response {
279 Ok(response) => response,
280 Err(err) => {
281 error!("Error handling request: {:?}", err);
282 continue;
283 }
284 };
285 drop(backend);
286
287 socket
288 .send(Message::Binary(
289 serde_json::to_vec(&MessageKind::Repository(
290 RepositoryMessage {
291 target: repository.target.clone(),
292 command: RepositoryMessageKind::Response(
293 RepositoryResponse::IssuesCount(response),
294 ),
295 },
296 ))
297 .unwrap(),
298 ))
299 .await
300 .unwrap();
301 }
302 RepositoryRequest::IssueLabels(request) => {
303 let mut backend = backend.lock().await;
304 let response = backend.issue_labels(request);
305
306 let response = match response {
307 Ok(response) => response,
308 Err(err) => {
309 error!("Error handling request: {:?}", err);
310 continue;
311 }
312 };
313 drop(backend);
314 socket
315 .send(Message::Binary(
316 serde_json::to_vec(&MessageKind::Repository(
317 RepositoryMessage {
318 target: repository.target.clone(),
319 command: RepositoryMessageKind::Response(
320 RepositoryResponse::IssueLabels(response),
321 ),
322 },
323 ))
324 .unwrap(),
325 ))
326 .await
327 .unwrap();
328 }
329 RepositoryRequest::Issues(request) => {
330 let mut backend = backend.lock().await;
331 let response = backend.issues(request);
332
333 let response = match response {
334 Ok(response) => response,
335 Err(err) => {
336 error!("Error handling request: {:?}", err);
337 continue;
338 }
339 };
340 drop(backend);
341
342 socket
343 .send(Message::Binary(
344 serde_json::to_vec(&MessageKind::Repository(
345 RepositoryMessage {
346 target: repository.target.clone(),
347 command: RepositoryMessageKind::Response(
348 RepositoryResponse::Issues(response),
349 ),
350 },
351 ))
352 .unwrap(),
353 ))
354 .await
355 .unwrap();
356 }
357 },
358 RepositoryMessageKind::Response(response) => {
359 unreachable!()
360 }
361 }
362 }
363 }
364 }
365
366 info!("Connection closed");
367 }
368
369 async fn send_and_get_listener(
370 message: MessageKind,
371 listeners: &Arc<Mutex<Listeners>>,
372 mut connections: &Arc<Mutex<Connections>>,
373 ) -> Receiver<MessageKind> {
374 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
375 match &message {
376 MessageKind::Handshake(_) => {
377 todo!()
378 }
379 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
380 };
381
382 let target = match (&instance, &user, &repository) {
383 (Some(instance), _, _) => instance.clone(),
384 (_, Some(user), _) => user.instance.clone(),
385 (_, _, Some(repository)) => repository.instance.clone(),
386 _ => unreachable!(),
387 };
388
389 let mut listeners = listeners.lock().await;
390 let mut listener = listeners.add(instance, user, repository);
391 drop(listeners);
392
393 let connections = connections.lock().await;
394
395 if let Some(connection) = connections.instance_connections.get(&target) {
396 connection.sender.send(message);
397 } else {
398 error!("Unable to message {}, this is a bug.", target.url);
399
400 panic!();
401 }
402
403 drop(connections);
404
405 listener
406 }
407