JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Changes

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨5ede041

⁨src/connection.rs⁩ - ⁨10877⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt, TryStreamExt};
4 use tokio::{
5 io::{AsyncRead, AsyncWrite},
6 net::TcpStream,
7 sync::{
8 broadcast::{Receiver, Sender},
9 Mutex,
10 },
11 task::JoinHandle,
12 };
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 use crate::{
16 command::{
17 issues::IssuesCountResponse,
18 repository::{
19 RepositoryFileInspectionResponse, RepositoryIssueLabelsResponse,
20 RepositoryIssuesResponse, RepositoryMessage, RepositoryMessageKind, RepositoryRequest,
21 RepositoryResponse,
22 },
23 MessageKind,
24 },
25 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse, InitiateHandshake},
26 listener::Listeners,
27 model::{
28 instance::{Instance, InstanceMeta},
29 repository::{CommitMetadata, RepositoryView},
30 },
31 };
32
33 pub struct RawConnection {
34 pub task: JoinHandle<()>,
35 }
36
37 pub struct InstanceConnection {
38 pub instance: InstanceMeta,
39 pub sender: Sender<MessageKind>,
40 pub task: JoinHandle<()>,
41 }
42
43 /// Represents a connection which hasn't finished the handshake.
44 pub struct UnestablishedConnection {
45 pub socket: WebSocketStream<TcpStream>,
46 }
47
48 #[derive(Default)]
49 pub struct Connections {
50 pub connections: Vec<RawConnection>,
51 pub instance_connections: HashMap<Instance, InstanceConnection>,
52 }
53
54 pub async fn connection_worker(
55 mut socket: WebSocketStream<TcpStream>,
56 listeners: Arc<Mutex<Listeners>>,
57 mut connections: Arc<Mutex<Connections>>,
58 addr: SocketAddr,
59 ) {
60 let mut handshaked = false;
61 let this_instance = Instance {
62 url: String::from("FOO"),
63 };
64
65 while let Some(message) = socket.next().await {
66 let message = match message {
67 Ok(message) => message,
68 Err(err) => {
69 error!("Error reading message: {:?}", err);
70 continue;
71 }
72 };
73
74 let payload = match message {
75 Message::Text(text) => text.into_bytes(),
76 Message::Binary(bytes) => bytes,
77 Message::Ping(_) => continue,
78 Message::Pong(_) => continue,
79 Message::Close(_) => {
80 info!("Closing connection with {}.", addr);
81
82 return;
83 }
84 _ => unreachable!(),
85 };
86
87 let message = match serde_json::from_slice::<MessageKind>(&payload) {
88 Ok(message) => message,
89 Err(err) => {
90 error!("Error deserializing message from {}: {:?}", addr, err);
91 continue;
92 }
93 };
94
95 if let MessageKind::Handshake(handshake) = message {
96 match handshake {
97 HandshakeMessage::Initiate(_) => {
98 // Send HandshakeMessage::Response
99 let message = HandshakeResponse {
100 identity: Instance {
101 url: String::from("foo.com"),
102 },
103 version: String::from("0.1.0"),
104 };
105
106 socket
107 .send(Message::Binary(
108 serde_json::to_vec(&HandshakeMessage::Response(message)).unwrap(),
109 ))
110 .await
111 .unwrap();
112
113 continue;
114 }
115 HandshakeMessage::Response(_) => {
116 // Send HandshakeMessage::Finalize
117 let message = HandshakeFinalize { success: true };
118
119 socket
120 .send(Message::Binary(
121 serde_json::to_vec(&HandshakeMessage::Finalize(message)).unwrap(),
122 ))
123 .await
124 .unwrap();
125
126 continue;
127 }
128 HandshakeMessage::Finalize(_) => {
129 handshaked = true;
130
131 continue;
132 }
133 }
134 }
135
136 if !handshaked {
137 continue;
138 }
139
140 if let MessageKind::Repository(repository) = &message {
141 if repository.target.instance != this_instance {
142 // We need to send this command to a different instance
143
144 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
145
146 // Wait for response
147 while let Ok(message) = listener.recv().await {
148 if let MessageKind::Repository(RepositoryMessage {
149 command: RepositoryMessageKind::Response(_),
150 ..
151 }) = message
152 {
153 socket
154 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
155 .await
156 .unwrap();
157 }
158 }
159 } else {
160 // This message is targeting this instance
161 match &repository.command {
162 RepositoryMessageKind::Request(request) => match request {
163 RepositoryRequest::CreateRepository(_) => todo!(),
164 RepositoryRequest::RepositoryFileInspection(_) => {
165 let response = RepositoryFileInspectionResponse::File {
166 commit_metadata: CommitMetadata::default(),
167 };
168 }
169 RepositoryRequest::RepositoryInfo(_) => {
170 let response = RepositoryView {
171 name: String::from("Nederland"),
172 description: String::from("ik hou van het nederland"),
173 default_branch: String::from("nederland"),
174 latest_commit: CommitMetadata::default(),
175 files: vec![],
176 };
177
178 socket
179 .send(Message::Binary(
180 serde_json::to_vec(&MessageKind::Repository(
181 RepositoryMessage {
182 target: repository.target.clone(),
183 command: RepositoryMessageKind::Response(
184 RepositoryResponse::RepositoryInfo(response),
185 ),
186 },
187 ))
188 .unwrap(),
189 ))
190 .await
191 .unwrap();
192 }
193 RepositoryRequest::IssuesCount(_) => {
194 let response: IssuesCountResponse =
195 IssuesCountResponse { count: 727420 };
196
197 socket
198 .send(Message::Binary(
199 serde_json::to_vec(&MessageKind::Repository(
200 RepositoryMessage {
201 target: repository.target.clone(),
202 command: RepositoryMessageKind::Response(
203 RepositoryResponse::IssuesCount(response),
204 ),
205 },
206 ))
207 .unwrap(),
208 ))
209 .await
210 .unwrap();
211 }
212 RepositoryRequest::IssueLabels(_) => {
213 let response = RepositoryIssueLabelsResponse { labels: vec![] };
214
215 socket
216 .send(Message::Binary(
217 serde_json::to_vec(&MessageKind::Repository(
218 RepositoryMessage {
219 target: repository.target.clone(),
220 command: RepositoryMessageKind::Response(
221 RepositoryResponse::IssueLabels(response),
222 ),
223 },
224 ))
225 .unwrap(),
226 ))
227 .await
228 .unwrap();
229 }
230 RepositoryRequest::Issues(_) => {
231 let response = RepositoryIssuesResponse { issues: vec![] };
232
233 socket
234 .send(Message::Binary(
235 serde_json::to_vec(&MessageKind::Repository(
236 RepositoryMessage {
237 target: repository.target.clone(),
238 command: RepositoryMessageKind::Response(
239 RepositoryResponse::Issues(response),
240 ),
241 },
242 ))
243 .unwrap(),
244 ))
245 .await
246 .unwrap();
247 }
248 },
249 RepositoryMessageKind::Response(response) => {
250 unreachable!()
251 }
252 }
253 }
254 }
255 }
256 }
257
258 async fn send_and_get_listener(
259 message: MessageKind,
260 listeners: &Arc<Mutex<Listeners>>,
261 mut connections: &Arc<Mutex<Connections>>,
262 ) -> Receiver<MessageKind> {
263 let (instance, user, repository) = match &message {
264 MessageKind::Handshake(_) => {
265 todo!()
266 }
267 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
268 };
269
270 let target = todo!();
271
272 let mut listeners = listeners.lock().await;
273 let mut listener = listeners.add(instance, user, repository);
274 drop(listeners);
275
276 let connections = connections.lock().await;
277
278 if let Some(connection) = connections.instance_connections.get(&target) {
279 connection.sender.send(message);
280 } else {
281 error!("Unable to message {}, this is a bug.", target.url);
282
283 panic!();
284 }
285
286 drop(connections);
287
288 listener
289 }
290