JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Functionaltiy

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨5646846

⁨src/connection.rs⁩ - ⁨12169⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt, TryStreamExt};
4 use tokio::{
5 io::{AsyncRead, AsyncWrite},
6 net::TcpStream,
7 sync::{
8 broadcast::{Receiver, Sender},
9 Mutex,
10 },
11 task::JoinHandle,
12 };
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 use crate::{
16 command::{
17 issues::IssuesCountResponse,
18 repository::{
19 RepositoryFileInspectionResponse, RepositoryIssueLabelsResponse,
20 RepositoryIssuesResponse, RepositoryMessage, RepositoryMessageKind, RepositoryRequest,
21 RepositoryResponse,
22 },
23 MessageKind,
24 },
25 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse, InitiateHandshake},
26 listener::Listeners,
27 model::{
28 instance::{Instance, InstanceMeta},
29 repository::{CommitMetadata, Repository, RepositoryView},
30 user::User,
31 },
32 };
33
34 pub struct RawConnection {
35 pub task: JoinHandle<()>,
36 }
37
38 pub struct InstanceConnection {
39 pub instance: InstanceMeta,
40 pub sender: Sender<MessageKind>,
41 pub task: JoinHandle<()>,
42 }
43
44 /// Represents a connection which hasn't finished the handshake.
45 pub struct UnestablishedConnection {
46 pub socket: WebSocketStream<TcpStream>,
47 }
48
49 #[derive(Default)]
50 pub struct Connections {
51 pub connections: Vec<RawConnection>,
52 pub instance_connections: HashMap<Instance, InstanceConnection>,
53 }
54
55 pub async fn connection_worker(
56 mut socket: WebSocketStream<TcpStream>,
57 listeners: Arc<Mutex<Listeners>>,
58 mut connections: Arc<Mutex<Connections>>,
59 addr: SocketAddr,
60 ) {
61 let mut handshaked = false;
62 let this_instance = Instance {
63 url: String::from("127.0.0.1:8080"),
64 };
65
66 while let Some(message) = socket.next().await {
67 let message = match message {
68 Ok(message) => message,
69 Err(err) => {
70 error!("Error reading message: {:?}", err);
71 continue;
72 }
73 };
74
75 let payload = match message {
76 Message::Text(text) => text.into_bytes(),
77 Message::Binary(bytes) => bytes,
78 Message::Ping(_) => continue,
79 Message::Pong(_) => continue,
80 Message::Close(_) => {
81 info!("Closing connection with {}.", addr);
82
83 return;
84 }
85 _ => unreachable!(),
86 };
87
88 let message = match serde_json::from_slice::<MessageKind>(&payload) {
89 Ok(message) => message,
90 Err(err) => {
91 error!("Error deserializing message from {}: {:?}", addr, err);
92 continue;
93 }
94 };
95
96 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
97
98 if let MessageKind::Handshake(handshake) = message {
99 match handshake {
100 HandshakeMessage::Initiate(_) => {
101 // Send HandshakeMessage::Response
102 let message = HandshakeResponse {
103 identity: Instance {
104 url: String::from("foo.com"),
105 },
106 version: String::from("0.1.0"),
107 };
108
109 socket
110 .send(Message::Binary(
111 serde_json::to_vec(&MessageKind::Handshake(
112 HandshakeMessage::Response(message),
113 ))
114 .unwrap(),
115 ))
116 .await
117 .unwrap();
118
119 continue;
120 }
121 HandshakeMessage::Response(_) => {
122 // Send HandshakeMessage::Finalize
123 let message = HandshakeFinalize { success: true };
124
125 socket
126 .send(Message::Binary(
127 serde_json::to_vec(&MessageKind::Handshake(
128 HandshakeMessage::Finalize(message),
129 ))
130 .unwrap(),
131 ))
132 .await
133 .unwrap();
134
135 continue;
136 }
137 HandshakeMessage::Finalize(_) => {
138 handshaked = true;
139
140 // Send HandshakeMessage::Finalize
141 let message = HandshakeFinalize { success: true };
142
143 socket
144 .send(Message::Binary(
145 serde_json::to_vec(&MessageKind::Handshake(
146 HandshakeMessage::Finalize(message),
147 ))
148 .unwrap(),
149 ))
150 .await
151 .unwrap();
152
153 continue;
154 }
155 }
156 }
157
158 if !handshaked {
159 continue;
160 }
161
162 if let MessageKind::Repository(repository) = &message {
163 if repository.target.instance != this_instance {
164 info!("Forwarding command to {}", repository.target.instance.url);
165 // We need to send this command to a different instance
166
167 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
168
169 // Wait for response
170 while let Ok(message) = listener.recv().await {
171 if let MessageKind::Repository(RepositoryMessage {
172 command: RepositoryMessageKind::Response(_),
173 ..
174 }) = message
175 {
176 socket
177 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
178 .await
179 .unwrap();
180 }
181 }
182 } else {
183 // This message is targeting this instance
184 match &repository.command {
185 RepositoryMessageKind::Request(request) => match request {
186 RepositoryRequest::CreateRepository(_) => todo!(),
187 RepositoryRequest::RepositoryFileInspection(_) => {
188 let response = RepositoryFileInspectionResponse::File {
189 commit_metadata: CommitMetadata::default(),
190 };
191 }
192 RepositoryRequest::RepositoryInfo(_) => {
193 let response = RepositoryView {
194 name: String::from("Nederland"),
195 description: String::from("ik hou van het nederland"),
196 default_branch: String::from("nederland"),
197 latest_commit: CommitMetadata::default(),
198 files: vec![],
199 };
200
201 socket
202 .send(Message::Binary(
203 serde_json::to_vec(&MessageKind::Repository(
204 RepositoryMessage {
205 target: repository.target.clone(),
206 command: RepositoryMessageKind::Response(
207 RepositoryResponse::RepositoryInfo(response),
208 ),
209 },
210 ))
211 .unwrap(),
212 ))
213 .await
214 .unwrap();
215 }
216 RepositoryRequest::IssuesCount(_) => {
217 let response: IssuesCountResponse =
218 IssuesCountResponse { count: 727420 };
219
220 socket
221 .send(Message::Binary(
222 serde_json::to_vec(&MessageKind::Repository(
223 RepositoryMessage {
224 target: repository.target.clone(),
225 command: RepositoryMessageKind::Response(
226 RepositoryResponse::IssuesCount(response),
227 ),
228 },
229 ))
230 .unwrap(),
231 ))
232 .await
233 .unwrap();
234 }
235 RepositoryRequest::IssueLabels(_) => {
236 let response = RepositoryIssueLabelsResponse { labels: vec![] };
237
238 socket
239 .send(Message::Binary(
240 serde_json::to_vec(&MessageKind::Repository(
241 RepositoryMessage {
242 target: repository.target.clone(),
243 command: RepositoryMessageKind::Response(
244 RepositoryResponse::IssueLabels(response),
245 ),
246 },
247 ))
248 .unwrap(),
249 ))
250 .await
251 .unwrap();
252 }
253 RepositoryRequest::Issues(_) => {
254 let response = RepositoryIssuesResponse { issues: vec![] };
255
256 socket
257 .send(Message::Binary(
258 serde_json::to_vec(&MessageKind::Repository(
259 RepositoryMessage {
260 target: repository.target.clone(),
261 command: RepositoryMessageKind::Response(
262 RepositoryResponse::Issues(response),
263 ),
264 },
265 ))
266 .unwrap(),
267 ))
268 .await
269 .unwrap();
270 }
271 },
272 RepositoryMessageKind::Response(response) => {
273 unreachable!()
274 }
275 }
276 }
277 }
278 }
279
280 info!("Connection closed");
281 }
282
283 async fn send_and_get_listener(
284 message: MessageKind,
285 listeners: &Arc<Mutex<Listeners>>,
286 mut connections: &Arc<Mutex<Connections>>,
287 ) -> Receiver<MessageKind> {
288 let (instance, user, repository): (Option<Instance>, Option<User>, Option<Repository>) =
289 match &message {
290 MessageKind::Handshake(_) => {
291 todo!()
292 }
293 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
294 };
295
296 let target = match (&instance, &user, &repository) {
297 (Some(instance), _, _) => instance.clone(),
298 (_, Some(user), _) => user.instance.clone(),
299 (_, _, Some(repository)) => repository.instance.clone(),
300 _ => unreachable!(),
301 };
302
303 let mut listeners = listeners.lock().await;
304 let mut listener = listeners.add(instance, user, repository);
305 drop(listeners);
306
307 let connections = connections.lock().await;
308
309 if let Some(connection) = connections.instance_connections.get(&target) {
310 connection.sender.send(message);
311 } else {
312 error!("Unable to message {}, this is a bug.", target.url);
313
314 panic!();
315 }
316
317 drop(connections);
318
319 listener
320 }
321