JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Connection

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨415ff8d

⁨src/connection.rs⁩ - ⁨10845⁩ bytes
Raw
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3 use futures_util::{stream::StreamExt, SinkExt, TryStreamExt};
4 use tokio::{
5 io::{AsyncRead, AsyncWrite},
6 net::TcpStream,
7 sync::{
8 broadcast::{Receiver, Sender},
9 Mutex,
10 },
11 task::JoinHandle,
12 };
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 use crate::{
16 command::{
17 issues::IssuesCountResponse,
18 repository::{
19 RepositoryFileInspectionResponse, RepositoryIssueLabelsResponse,
20 RepositoryIssuesResponse, RepositoryMessage, RepositoryMessageKind, RepositoryRequest,
21 RepositoryResponse,
22 },
23 MessageKind,
24 },
25 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse, InitiateHandshake},
26 listener::Listeners,
27 model::{
28 instance::{Instance, InstanceMeta},
29 repository::{CommitMetadata, RepositoryView},
30 },
31 };
32
33 pub struct RawConnection {
34 pub task: JoinHandle<()>,
35 }
36
37 pub struct InstanceConnection {
38 pub instance: InstanceMeta,
39 pub sender: Sender<MessageKind>,
40 pub task: JoinHandle<()>,
41 }
42
43 /// Represents a connection which hasn't finished the handshake.
44 pub struct UnestablishedConnection {
45 pub socket: WebSocketStream<TcpStream>,
46 }
47
48 #[derive(Default)]
49 pub struct Connections {
50 pub connections: Vec<RawConnection>,
51 pub instance_connections: HashMap<Instance, InstanceConnection>,
52 }
53
54 pub async fn connection_worker(
55 mut socket: WebSocketStream<TcpStream>,
56 listeners: Arc<Mutex<Listeners>>,
57 mut connections: Arc<Mutex<Connections>>,
58 addr: SocketAddr,
59 ) {
60 let mut handshaked = false;
61 let this_instance = Instance {
62 url: String::from("FOO"),
63 };
64
65 while let Some(message) = socket.next().await {
66 let message = match message {
67 Ok(message) => message,
68 Err(err) => {
69 error!("Error reading message: {:?}", err);
70 continue;
71 }
72 };
73
74 let payload = match message {
75 Message::Text(text) => text.into_bytes(),
76 Message::Binary(bytes) => bytes,
77 Message::Ping(_) => continue,
78 Message::Pong(_) => continue,
79 Message::Close(_) => {
80 info!("Closing connection with {}.", addr);
81
82 return;
83 }
84 _ => unreachable!(),
85 };
86
87 let message = match serde_json::from_slice::<MessageKind>(&payload) {
88 Ok(message) => message,
89 Err(err) => {
90 error!("Error deserializing message from {}: {:?}", addr, err);
91 continue;
92 }
93 };
94
95 if let MessageKind::Handshake(handshake) = message {
96 match handshake {
97 HandshakeMessage::Initiate(_) => {
98 // Send HandshakeMessage::Response
99 let message = HandshakeResponse {
100 identity: Instance {
101 url: String::from("foo.com"),
102 },
103 version: String::from("0.1.0"),
104 };
105
106 socket
107 .send(Message::Binary(
108 serde_json::to_vec(&HandshakeMessage::Response(message)).unwrap(),
109 ))
110 .await
111 .unwrap();
112
113 continue;
114 }
115 HandshakeMessage::Response(_) => {
116 // Send HandshakeMessage::Finalize
117 let message = HandshakeFinalize { success: true };
118
119 socket
120 .send(Message::Binary(
121 serde_json::to_vec(&HandshakeMessage::Finalize(message)).unwrap(),
122 ))
123 .await
124 .unwrap();
125
126 continue;
127 }
128 HandshakeMessage::Finalize(_) => {
129 handshaked = true;
130
131 continue;
132 }
133 }
134 }
135
136 if !handshaked {
137 continue;
138 }
139
140 if let MessageKind::Repository(repository) = &message {
141 if repository.target.instance != this_instance {
142 // We need to send this command to a different instance
143
144 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
145
146 // Wait for response
147 while let Ok(message) = listener.recv().await {
148 if let MessageKind::Repository(RepositoryMessage {
149 command: RepositoryMessageKind::Response(_),
150 ..
151 }) = message
152 {
153 socket
154 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
155 .await
156 .unwrap();
157 }
158 }
159 } else {
160 // This message is targeting this instance
161 match &repository.command {
162 RepositoryMessageKind::Request(request) => match request {
163 RepositoryRequest::CreateRepository(_) => todo!(),
164 RepositoryRequest::RepositoryFileInspection(_) => {
165 let response = RepositoryFileInspectionResponse::File {
166 commit_metadata: CommitMetadata::default(),
167 };
168 }
169 RepositoryRequest::RepositoryInfo(_) => {
170 let response = RepositoryView {
171 name: String::from("Nederland"),
172 description: String::from("ik hou van het nederland"),
173 default_branch: String::from("nederland"),
174 latest_commit: CommitMetadata::default(),
175 files: vec![],
176 };
177
178 socket
179 .send(Message::Binary(
180 serde_json::to_vec(&MessageKind::Repository(
181 RepositoryMessage {
182 target: repository.target.clone(),
183 command: RepositoryMessageKind::Response(
184 RepositoryResponse::RepositoryInfo(response),
185 ),
186 },
187 ))
188 .unwrap(),
189 ))
190 .await
191 .unwrap();
192 }
193 RepositoryRequest::IssuesCount(_) => {
194 let response: IssuesCountResponse = IssuesCountResponse { count: 727420 };
195
196 socket
197 .send(Message::Binary(
198 serde_json::to_vec(&MessageKind::Repository(
199 RepositoryMessage {
200 target: repository.target.clone(),
201 command: RepositoryMessageKind::Response(
202 RepositoryResponse::IssuesCount(response),
203 ),
204 },
205 ))
206 .unwrap(),
207 ))
208 .await
209 .unwrap();
210 }
211 RepositoryRequest::IssueLabels(_) => {
212 let response = RepositoryIssueLabelsResponse { labels: vec![] };
213
214 socket
215 .send(Message::Binary(
216 serde_json::to_vec(&MessageKind::Repository(
217 RepositoryMessage {
218 target: repository.target.clone(),
219 command: RepositoryMessageKind::Response(
220 RepositoryResponse::IssueLabels(response),
221 ),
222 },
223 ))
224 .unwrap(),
225 ))
226 .await
227 .unwrap();
228 }
229 RepositoryRequest::Issues(_) => {
230 let response = RepositoryIssuesResponse { issues: vec![] };
231
232 socket
233 .send(Message::Binary(
234 serde_json::to_vec(&MessageKind::Repository(
235 RepositoryMessage {
236 target: repository.target.clone(),
237 command: RepositoryMessageKind::Response(
238 RepositoryResponse::Issues(response),
239 ),
240 },
241 ))
242 .unwrap(),
243 ))
244 .await
245 .unwrap();
246 }
247 },
248 RepositoryMessageKind::Response(response) => {
249 unreachable!()
250 }
251 }
252 }
253 }
254 }
255 }
256
257 async fn send_and_get_listener(
258 message: MessageKind,
259 listeners: &Arc<Mutex<Listeners>>,
260 mut connections: &Arc<Mutex<Connections>>,
261 ) -> Receiver<MessageKind> {
262 let (instance, user, repository) = match &message {
263 MessageKind::Handshake(_) => {
264 todo!()
265 }
266 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
267 };
268
269 let target = todo!();
270
271 let mut listeners = listeners.lock().await;
272 let mut listener = listeners.add(instance, user, repository);
273 drop(listeners);
274
275 let connections = connections.lock().await;
276
277 if let Some(connection) = connections.instance_connections.get(&target) {
278 connection.sender.send(message);
279 } else {
280 error!("Unable to message {}, this is a bug.", target.url);
281
282 panic!();
283 }
284
285 drop(connections);
286
287 listener
288 }
289