JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Connection

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨415ff8d

Showing ⁨⁨12⁩ changed files⁩ with ⁨⁨696⁩ insertions⁩ and ⁨⁨14⁩ deletions⁩

Cargo.lock

View file
@@ -39,6 +39,12 @@ dependencies = [
39 39 ]
40 40
41 41 [[package]]
42 name = "bitflags"
43 version = "1.3.2"
44 source = "registry+https://github.com/rust-lang/crates.io-index"
45 checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
46
47 [[package]]
42 48 name = "block-buffer"
43 49 version = "0.10.4"
44 50 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -131,6 +137,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
131 137 checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
132 138
133 139 [[package]]
140 name = "futures-macro"
141 version = "0.3.28"
142 source = "registry+https://github.com/rust-lang/crates.io-index"
143 checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
144 dependencies = [
145 "proc-macro2",
146 "quote",
147 "syn",
148 ]
149
150 [[package]]
134 151 name = "futures-sink"
135 152 version = "0.3.28"
136 153 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -149,6 +166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
149 166 checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
150 167 dependencies = [
151 168 "futures-core",
169 "futures-macro",
152 170 "futures-sink",
153 171 "futures-task",
154 172 "pin-project-lite",
@@ -187,10 +205,21 @@ checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
187 205 name = "giterated-daemon"
188 206 version = "0.1.0"
189 207 dependencies = [
208 "futures-util",
209 "serde",
210 "serde_json",
211 "tokio",
190 212 "tokio-tungstenite",
213 "tracing",
191 214 ]
192 215
193 216 [[package]]
217 name = "hermit-abi"
218 version = "0.3.2"
219 source = "registry+https://github.com/rust-lang/crates.io-index"
220 checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
221
222 [[package]]
194 223 name = "http"
195 224 version = "0.2.9"
196 225 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -230,6 +259,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
230 259 checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
231 260
232 261 [[package]]
262 name = "lock_api"
263 version = "0.4.9"
264 source = "registry+https://github.com/rust-lang/crates.io-index"
265 checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
266 dependencies = [
267 "autocfg",
268 "scopeguard",
269 ]
270
271 [[package]]
233 272 name = "log"
234 273 version = "0.4.20"
235 274 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -262,6 +301,16 @@ dependencies = [
262 301 ]
263 302
264 303 [[package]]
304 name = "num_cpus"
305 version = "1.16.0"
306 source = "registry+https://github.com/rust-lang/crates.io-index"
307 checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
308 dependencies = [
309 "hermit-abi",
310 "libc",
311 ]
312
313 [[package]]
265 314 name = "object"
266 315 version = "0.32.0"
267 316 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -271,6 +320,35 @@ dependencies = [
271 320 ]
272 321
273 322 [[package]]
323 name = "once_cell"
324 version = "1.18.0"
325 source = "registry+https://github.com/rust-lang/crates.io-index"
326 checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
327
328 [[package]]
329 name = "parking_lot"
330 version = "0.12.1"
331 source = "registry+https://github.com/rust-lang/crates.io-index"
332 checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
333 dependencies = [
334 "lock_api",
335 "parking_lot_core",
336 ]
337
338 [[package]]
339 name = "parking_lot_core"
340 version = "0.9.8"
341 source = "registry+https://github.com/rust-lang/crates.io-index"
342 checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
343 dependencies = [
344 "cfg-if",
345 "libc",
346 "redox_syscall",
347 "smallvec",
348 "windows-targets",
349 ]
350
351 [[package]]
274 352 name = "percent-encoding"
275 353 version = "2.3.0"
276 354 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -343,12 +421,64 @@ dependencies = [
343 421 ]
344 422
345 423 [[package]]
424 name = "redox_syscall"
425 version = "0.3.5"
426 source = "registry+https://github.com/rust-lang/crates.io-index"
427 checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
428 dependencies = [
429 "bitflags",
430 ]
431
432 [[package]]
346 433 name = "rustc-demangle"
347 434 version = "0.1.23"
348 435 source = "registry+https://github.com/rust-lang/crates.io-index"
349 436 checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
350 437
351 438 [[package]]
439 name = "ryu"
440 version = "1.0.15"
441 source = "registry+https://github.com/rust-lang/crates.io-index"
442 checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
443
444 [[package]]
445 name = "scopeguard"
446 version = "1.1.0"
447 source = "registry+https://github.com/rust-lang/crates.io-index"
448 checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
449
450 [[package]]
451 name = "serde"
452 version = "1.0.186"
453 source = "registry+https://github.com/rust-lang/crates.io-index"
454 checksum = "9f5db24220c009de9bd45e69fb2938f4b6d2df856aa9304ce377b3180f83b7c1"
455 dependencies = [
456 "serde_derive",
457 ]
458
459 [[package]]
460 name = "serde_derive"
461 version = "1.0.186"
462 source = "registry+https://github.com/rust-lang/crates.io-index"
463 checksum = "5ad697f7e0b65af4983a4ce8f56ed5b357e8d3c36651bf6a7e13639c17b8e670"
464 dependencies = [
465 "proc-macro2",
466 "quote",
467 "syn",
468 ]
469
470 [[package]]
471 name = "serde_json"
472 version = "1.0.105"
473 source = "registry+https://github.com/rust-lang/crates.io-index"
474 checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
475 dependencies = [
476 "itoa",
477 "ryu",
478 "serde",
479 ]
480
481 [[package]]
352 482 name = "sha1"
353 483 version = "0.10.5"
354 484 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -360,6 +490,15 @@ dependencies = [
360 490 ]
361 491
362 492 [[package]]
493 name = "signal-hook-registry"
494 version = "1.4.1"
495 source = "registry+https://github.com/rust-lang/crates.io-index"
496 checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
497 dependencies = [
498 "libc",
499 ]
500
501 [[package]]
363 502 name = "slab"
364 503 version = "0.4.9"
365 504 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -369,6 +508,12 @@ dependencies = [
369 508 ]
370 509
371 510 [[package]]
511 name = "smallvec"
512 version = "1.11.0"
513 source = "registry+https://github.com/rust-lang/crates.io-index"
514 checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
515
516 [[package]]
372 517 name = "socket2"
373 518 version = "0.5.3"
374 519 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -434,12 +579,27 @@ dependencies = [
434 579 "bytes",
435 580 "libc",
436 581 "mio",
582 "num_cpus",
583 "parking_lot",
437 584 "pin-project-lite",
585 "signal-hook-registry",
438 586 "socket2",
587 "tokio-macros",
439 588 "windows-sys",
440 589 ]
441 590
442 591 [[package]]
592 name = "tokio-macros"
593 version = "2.1.0"
594 source = "registry+https://github.com/rust-lang/crates.io-index"
595 checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
596 dependencies = [
597 "proc-macro2",
598 "quote",
599 "syn",
600 ]
601
602 [[package]]
443 603 name = "tokio-tungstenite"
444 604 version = "0.20.0"
445 605 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -452,6 +612,38 @@ dependencies = [
452 612 ]
453 613
454 614 [[package]]
615 name = "tracing"
616 version = "0.1.37"
617 source = "registry+https://github.com/rust-lang/crates.io-index"
618 checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
619 dependencies = [
620 "cfg-if",
621 "pin-project-lite",
622 "tracing-attributes",
623 "tracing-core",
624 ]
625
626 [[package]]
627 name = "tracing-attributes"
628 version = "0.1.26"
629 source = "registry+https://github.com/rust-lang/crates.io-index"
630 checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
631 dependencies = [
632 "proc-macro2",
633 "quote",
634 "syn",
635 ]
636
637 [[package]]
638 name = "tracing-core"
639 version = "0.1.31"
640 source = "registry+https://github.com/rust-lang/crates.io-index"
641 checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
642 dependencies = [
643 "once_cell",
644 ]
645
646 [[package]]
455 647 name = "tungstenite"
456 648 version = "0.20.0"
457 649 source = "registry+https://github.com/rust-lang/crates.io-index"

Cargo.toml

View file
@@ -6,4 +6,9 @@ edition = "2021"
6 6 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7 7
8 8 [dependencies]
9 tokio-tungstenite = "*"
9 \ No newline at end of file
9 tokio-tungstenite = "*"
10 tokio = { version = "1.32.0", features = ["full"] }
11 tracing = "*"
12 futures-util = "*"
13 serde = { version = "1", features = ["derive"]}
14 serde_json = "1.0"
14 \ No newline at end of file

src/command/issues.rs

View file
@@ -1,13 +1,18 @@
1 use serde::{Deserialize, Serialize};
2
1 3 use crate::model::repository::Repository;
2 4
5 #[derive(Clone)]
3 6 pub struct IssuesCountCommand {
4 7 pub respository: Repository,
5 8 }
6 9
10 #[derive(Clone, Serialize, Deserialize)]
7 11 pub struct IssuesCountResponse {
8 12 pub count: u64,
9 13 }
10 14
15 #[derive(Clone)]
11 16 pub struct IssuesLabelsCommand {
12 17 pub repository: Repository,
13 18 }

src/command/mod.rs

View file
@@ -1,2 +1,14 @@
1 use serde::{Deserialize, Serialize};
2
3 use crate::handshake::HandshakeMessage;
4
5 use self::repository::RepositoryMessage;
6
1 7 pub mod issues;
2 8 pub mod repository;
9
10 #[derive(Clone, Serialize, Deserialize)]
11 pub enum MessageKind {
12 Handshake(HandshakeMessage),
13 Repository(RepositoryMessage),
14 }

src/command/repository.rs

View file
@@ -1,14 +1,26 @@
1 use serde::{Deserialize, Serialize};
2
1 3 use crate::model::{
2 4 repository::{CommitMetadata, Repository, RepositoryFile, RepositoryView},
3 5 user::User,
4 6 };
5 7
6 pub struct RepositoryCommand {
8 use super::issues::IssuesCountResponse;
9
10 #[derive(Clone, Serialize, Deserialize)]
11 pub struct RepositoryMessage {
7 12 pub target: Repository,
8 pub command: RepositoryCommandKind,
13 pub command: RepositoryMessageKind,
14 }
15
16 #[derive(Clone, Serialize, Deserialize)]
17 pub enum RepositoryMessageKind {
18 Request(RepositoryRequest),
19 Response(RepositoryResponse),
9 20 }
10 21
11 pub enum RepositoryCommandKind {
22 #[derive(Clone, Serialize, Deserialize)]
23 pub enum RepositoryRequest {
12 24 CreateRepository(CreateRepositoryCommand),
13 25 RepositoryFileInspection(RepositoryFileInspectionCommand),
14 26 RepositoryInfo(RepositoryInfoRequest),
@@ -17,6 +29,17 @@ pub enum RepositoryCommandKind {
17 29 Issues(RepositoryIssuesRequest),
18 30 }
19 31
32 #[derive(Clone, Serialize, Deserialize)]
33 pub enum RepositoryResponse {
34 CreateRepository(CreateRepositoryResponse),
35 RepositoryFileInspection(RepositoryFileInspectionResponse),
36 RepositoryInfo(RepositoryView),
37 IssuesCount(IssuesCountResponse),
38 IssueLabels(RepositoryIssueLabelsResponse),
39 Issues(RepositoryIssuesResponse),
40 }
41
42 #[derive(Clone, Serialize, Deserialize)]
20 43 pub struct CreateRepositoryCommand {
21 44 pub name: String,
22 45 pub description: String,
@@ -24,15 +47,18 @@ pub struct CreateRepositoryCommand {
24 47 pub owner: User,
25 48 }
26 49
50 #[derive(Clone, Serialize, Deserialize)]
27 51 pub enum CreateRepositoryResponse {
28 52 Created,
29 53 Failed,
30 54 }
31 55
56 #[derive(Clone, Serialize, Deserialize)]
32 57 pub struct RepositoryFileInspectionCommand {
33 58 pub path: RepositoryFile,
34 59 }
35 60
61 #[derive(Clone, Serialize, Deserialize)]
36 62 pub enum RepositoryFileInspectionResponse {
37 63 File {
38 64 commit_metadata: CommitMetadata,
@@ -46,29 +72,37 @@ pub enum RepositoryFileInspectionResponse {
46 72 },
47 73 }
48 74
75 #[derive(Clone, Serialize, Deserialize)]
49 76 pub struct RepositoryIssuesCountRequest;
50 77
78 #[derive(Clone, Serialize, Deserialize)]
51 79 pub struct RepositoryIssuesCountResponse {
52 80 pub count: u64,
53 81 }
54 82
83 #[derive(Clone, Serialize, Deserialize)]
55 84 pub struct RepositoryIssueLabelsRequest;
56 85
86 #[derive(Clone, Serialize, Deserialize)]
57 87 pub struct RepositoryIssueLabelsResponse {
58 pub labels: IssueLabel,
88 pub labels: Vec<IssueLabel>,
59 89 }
60 90
91 #[derive(Clone, Serialize, Deserialize)]
61 92 pub struct IssueLabel {
62 93 pub name: String,
63 94 pub color: String,
64 95 }
65 96
97 #[derive(Clone, Serialize, Deserialize)]
66 98 pub struct RepositoryIssuesRequest;
67 99
100 #[derive(Clone, Serialize, Deserialize)]
68 101 pub struct RepositoryIssuesResponse {
69 102 pub issues: Vec<RepositoryIssue>,
70 103 }
71 104
105 #[derive(Clone, Serialize, Deserialize)]
72 106 pub struct RepositoryIssue {
73 107 pub author: User,
74 108 pub id: u64,
@@ -77,6 +111,7 @@ pub struct RepositoryIssue {
77 111 pub labels: Vec<IssueLabel>,
78 112 }
79 113
114 #[derive(Clone, Serialize, Deserialize)]
80 115 pub struct RepositoryInfoRequest {
81 116 pub info: RepositoryView,
82 117 }

src/connection.rs

View file
@@ -1,5 +1,288 @@
1 use crate::model::instance::InstanceMeta;
1 use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2 2
3 pub struct Connection {
4 pub instance: InstanceMeta
5 }
5 \ No newline at end of file
3 use futures_util::{stream::StreamExt, SinkExt, TryStreamExt};
4 use tokio::{
5 io::{AsyncRead, AsyncWrite},
6 net::TcpStream,
7 sync::{
8 broadcast::{Receiver, Sender},
9 Mutex,
10 },
11 task::JoinHandle,
12 };
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14
15 use crate::{
16 command::{
17 issues::IssuesCountResponse,
18 repository::{
19 RepositoryFileInspectionResponse, RepositoryIssueLabelsResponse,
20 RepositoryIssuesResponse, RepositoryMessage, RepositoryMessageKind, RepositoryRequest,
21 RepositoryResponse,
22 },
23 MessageKind,
24 },
25 handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse, InitiateHandshake},
26 listener::Listeners,
27 model::{
28 instance::{Instance, InstanceMeta},
29 repository::{CommitMetadata, RepositoryView},
30 },
31 };
32
33 pub struct RawConnection {
34 pub task: JoinHandle<()>,
35 }
36
37 pub struct InstanceConnection {
38 pub instance: InstanceMeta,
39 pub sender: Sender<MessageKind>,
40 pub task: JoinHandle<()>,
41 }
42
43 /// Represents a connection which hasn't finished the handshake.
44 pub struct UnestablishedConnection {
45 pub socket: WebSocketStream<TcpStream>,
46 }
47
48 #[derive(Default)]
49 pub struct Connections {
50 pub connections: Vec<RawConnection>,
51 pub instance_connections: HashMap<Instance, InstanceConnection>,
52 }
53
54 pub async fn connection_worker(
55 mut socket: WebSocketStream<TcpStream>,
56 listeners: Arc<Mutex<Listeners>>,
57 mut connections: Arc<Mutex<Connections>>,
58 addr: SocketAddr,
59 ) {
60 let mut handshaked = false;
61 let this_instance = Instance {
62 url: String::from("FOO"),
63 };
64
65 while let Some(message) = socket.next().await {
66 let message = match message {
67 Ok(message) => message,
68 Err(err) => {
69 error!("Error reading message: {:?}", err);
70 continue;
71 }
72 };
73
74 let payload = match message {
75 Message::Text(text) => text.into_bytes(),
76 Message::Binary(bytes) => bytes,
77 Message::Ping(_) => continue,
78 Message::Pong(_) => continue,
79 Message::Close(_) => {
80 info!("Closing connection with {}.", addr);
81
82 return;
83 }
84 _ => unreachable!(),
85 };
86
87 let message = match serde_json::from_slice::<MessageKind>(&payload) {
88 Ok(message) => message,
89 Err(err) => {
90 error!("Error deserializing message from {}: {:?}", addr, err);
91 continue;
92 }
93 };
94
95 if let MessageKind::Handshake(handshake) = message {
96 match handshake {
97 HandshakeMessage::Initiate(_) => {
98 // Send HandshakeMessage::Response
99 let message = HandshakeResponse {
100 identity: Instance {
101 url: String::from("foo.com"),
102 },
103 version: String::from("0.1.0"),
104 };
105
106 socket
107 .send(Message::Binary(
108 serde_json::to_vec(&HandshakeMessage::Response(message)).unwrap(),
109 ))
110 .await
111 .unwrap();
112
113 continue;
114 }
115 HandshakeMessage::Response(_) => {
116 // Send HandshakeMessage::Finalize
117 let message = HandshakeFinalize { success: true };
118
119 socket
120 .send(Message::Binary(
121 serde_json::to_vec(&HandshakeMessage::Finalize(message)).unwrap(),
122 ))
123 .await
124 .unwrap();
125
126 continue;
127 }
128 HandshakeMessage::Finalize(_) => {
129 handshaked = true;
130
131 continue;
132 }
133 }
134 }
135
136 if !handshaked {
137 continue;
138 }
139
140 if let MessageKind::Repository(repository) = &message {
141 if repository.target.instance != this_instance {
142 // We need to send this command to a different instance
143
144 let mut listener = send_and_get_listener(message, &listeners, &connections).await;
145
146 // Wait for response
147 while let Ok(message) = listener.recv().await {
148 if let MessageKind::Repository(RepositoryMessage {
149 command: RepositoryMessageKind::Response(_),
150 ..
151 }) = message
152 {
153 socket
154 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
155 .await
156 .unwrap();
157 }
158 }
159 } else {
160 // This message is targeting this instance
161 match &repository.command {
162 RepositoryMessageKind::Request(request) => match request {
163 RepositoryRequest::CreateRepository(_) => todo!(),
164 RepositoryRequest::RepositoryFileInspection(_) => {
165 let response = RepositoryFileInspectionResponse::File {
166 commit_metadata: CommitMetadata::default(),
167 };
168 }
169 RepositoryRequest::RepositoryInfo(_) => {
170 let response = RepositoryView {
171 name: String::from("Nederland"),
172 description: String::from("ik hou van het nederland"),
173 default_branch: String::from("nederland"),
174 latest_commit: CommitMetadata::default(),
175 files: vec![],
176 };
177
178 socket
179 .send(Message::Binary(
180 serde_json::to_vec(&MessageKind::Repository(
181 RepositoryMessage {
182 target: repository.target.clone(),
183 command: RepositoryMessageKind::Response(
184 RepositoryResponse::RepositoryInfo(response),
185 ),
186 },
187 ))
188 .unwrap(),
189 ))
190 .await
191 .unwrap();
192 }
193 RepositoryRequest::IssuesCount(_) => {
194 let response: IssuesCountResponse = IssuesCountResponse { count: 727420 };
195
196 socket
197 .send(Message::Binary(
198 serde_json::to_vec(&MessageKind::Repository(
199 RepositoryMessage {
200 target: repository.target.clone(),
201 command: RepositoryMessageKind::Response(
202 RepositoryResponse::IssuesCount(response),
203 ),
204 },
205 ))
206 .unwrap(),
207 ))
208 .await
209 .unwrap();
210 }
211 RepositoryRequest::IssueLabels(_) => {
212 let response = RepositoryIssueLabelsResponse { labels: vec![] };
213
214 socket
215 .send(Message::Binary(
216 serde_json::to_vec(&MessageKind::Repository(
217 RepositoryMessage {
218 target: repository.target.clone(),
219 command: RepositoryMessageKind::Response(
220 RepositoryResponse::IssueLabels(response),
221 ),
222 },
223 ))
224 .unwrap(),
225 ))
226 .await
227 .unwrap();
228 }
229 RepositoryRequest::Issues(_) => {
230 let response = RepositoryIssuesResponse { issues: vec![] };
231
232 socket
233 .send(Message::Binary(
234 serde_json::to_vec(&MessageKind::Repository(
235 RepositoryMessage {
236 target: repository.target.clone(),
237 command: RepositoryMessageKind::Response(
238 RepositoryResponse::Issues(response),
239 ),
240 },
241 ))
242 .unwrap(),
243 ))
244 .await
245 .unwrap();
246 }
247 },
248 RepositoryMessageKind::Response(response) => {
249 unreachable!()
250 }
251 }
252 }
253 }
254 }
255 }
256
257 async fn send_and_get_listener(
258 message: MessageKind,
259 listeners: &Arc<Mutex<Listeners>>,
260 mut connections: &Arc<Mutex<Connections>>,
261 ) -> Receiver<MessageKind> {
262 let (instance, user, repository) = match &message {
263 MessageKind::Handshake(_) => {
264 todo!()
265 }
266 MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())),
267 };
268
269 let target = todo!();
270
271 let mut listeners = listeners.lock().await;
272 let mut listener = listeners.add(instance, user, repository);
273 drop(listeners);
274
275 let connections = connections.lock().await;
276
277 if let Some(connection) = connections.instance_connections.get(&target) {
278 connection.sender.send(message);
279 } else {
280 error!("Unable to message {}, this is a bug.", target.url);
281
282 panic!();
283 }
284
285 drop(connections);
286
287 listener
288 }

src/handshake.rs

View file
@@ -0,0 +1,29 @@
1 use serde::{Deserialize, Serialize};
2
3 use crate::model::instance::Instance;
4
5 /// Sent by the initiator of a new inter-daemon connection.
6 #[derive(Clone, Serialize, Deserialize)]
7 pub struct InitiateHandshake {
8 pub identity: Instance,
9 pub version: String,
10 }
11
12 /// Sent in response to [`InitiateHandshake`]
13 #[derive(Clone, Serialize, Deserialize)]
14 pub struct HandshakeResponse {
15 pub identity: Instance,
16 pub version: String,
17 }
18
19 #[derive(Clone, Serialize, Deserialize)]
20 pub struct HandshakeFinalize {
21 pub success: bool,
22 }
23
24 #[derive(Clone, Serialize, Deserialize)]
25 pub enum HandshakeMessage {
26 Initiate(InitiateHandshake),
27 Response(HandshakeResponse),
28 Finalize(HandshakeFinalize),
29 }

src/listener.rs

View file
@@ -0,0 +1,31 @@
1 use std::collections::HashMap;
2
3 use tokio::sync::broadcast::{Receiver, Sender};
4
5 use crate::{
6 command::{repository::RepositoryMessage, MessageKind},
7 model::{instance::Instance, repository::Repository, user::User},
8 };
9
10 #[derive(Default)]
11 pub struct Listeners {
12 listeners: HashMap<ListenerTarget, (Sender<RepositoryMessage>, Receiver<RepositoryMessage>)>,
13 }
14
15 impl Listeners {
16 pub fn add(
17 &mut self,
18 instance: Option<Instance>,
19 user: Option<User>,
20 repository: Option<Repository>,
21 ) -> Receiver<MessageKind> {
22 todo!()
23 }
24 }
25
26 #[derive(Hash)]
27 pub struct ListenerTarget {
28 pub instance: Option<Instance>,
29 pub user: Option<User>,
30 pub repository: Option<Repository>,
31 }

src/main.rs

View file
@@ -1,7 +1,78 @@
1 use std::{error::Error, net::SocketAddr, sync::Arc};
2
3 use connection::{connection_worker, Connections, RawConnection, UnestablishedConnection};
4 use listener::Listeners;
5 use tokio::{
6 io::{AsyncRead, AsyncWrite},
7 net::{TcpListener, TcpStream},
8 sync::Mutex,
9 };
10 use tokio_tungstenite::{accept_async, WebSocketStream};
11
1 12 pub mod command;
2 pub mod model;
3 13 pub mod connection;
14 pub mod handshake;
15 pub mod listener;
16 pub mod model;
17
18 #[macro_use]
19 extern crate tracing;
20
21 #[tokio::main]
22 async fn main() -> Result<(), Box<dyn Error>> {
23 let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
24 let mut connections: Arc<Mutex<Connections>> = Arc::default();
25 let mut listeners: Arc<Mutex<Listeners>> = Arc::default();
26
27 loop {
28 let stream = accept_stream(&mut listener).await;
29
30 let (stream, address) = match stream {
31 Ok(stream) => stream,
32 Err(err) => {
33 error!("Failed to accept connection. {:?}", err);
34 continue;
35 }
36 };
37
38 let connection = accept_websocket_connection(stream).await;
39
40 let connection = match connection {
41 Ok(connection) => connection,
42 Err(err) => {
43 error!(
44 "Failed to initiate Websocket connection from {}. {:?}",
45 address, err
46 );
47 continue;
48 }
49 };
50
51 let connection = RawConnection {
52 task: tokio::spawn(connection_worker(
53 connection,
54 listeners.clone(),
55 connections.clone(),
56 address,
57 )),
58 };
59
60 connections.lock().await.connections.push(connection);
61 }
62 }
63
64 async fn accept_stream(
65 listener: &mut TcpListener,
66 ) -> Result<(TcpStream, SocketAddr), Box<dyn Error>> {
67 let stream = listener.accept().await?;
68
69 Ok(stream)
70 }
71
72 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
73 stream: S,
74 ) -> Result<WebSocketStream<S>, Box<dyn Error>> {
75 let connection = accept_async(stream).await?;
4 76
5 fn main() {
6 println!("Hello, world!");
77 Ok(connection)
7 78 }

src/model/instance.rs

View file
@@ -1,4 +1,11 @@
1 use serde::{Deserialize, Serialize};
2
1 3 pub struct InstanceMeta {
2 4 pub url: String,
3 5 pub public_key: String,
4 6 }
7
8 #[derive(Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
9 pub struct Instance {
10 pub url: String,
11 }

src/model/repository.rs

View file
@@ -1,10 +1,16 @@
1 1 use std::time::Instant;
2 2
3 use serde::{Deserialize, Serialize};
4
5 use super::instance::Instance;
6
7 #[derive(Hash, Clone, Serialize, Deserialize)]
3 8 pub struct Repository {
4 9 pub name: String,
5 pub instance: String,
10 pub instance: Instance,
6 11 }
7 12
13 #[derive(Clone, Serialize, Deserialize)]
8 14 pub struct RepositoryView {
9 15 pub name: String,
10 16 pub description: String,
@@ -13,19 +19,22 @@ pub struct RepositoryView {
13 19 pub files: Vec<RepositoryFile>,
14 20 }
15 21
22 #[derive(Clone, Serialize, Deserialize)]
16 23 pub enum RepositoryFile {
17 24 Directory(String),
18 25 File(String),
19 26 }
20 27
28 #[derive(Clone, Serialize, Deserialize)]
21 29 pub struct RepositoryFileWithCommitMetadata {
22 30 pub file: RepositoryFile,
23 31 pub commit_metadata: CommitMetadata,
24 32 }
25 33
34 #[derive(Clone, Serialize, Deserialize, Default)]
26 35 pub struct CommitMetadata {
27 36 pub author: String,
28 37 pub message: String,
29 38 pub hash: String,
30 pub time: Instant,
39 pub time: (),
31 40 }

src/model/user.rs

View file
@@ -1,3 +1,6 @@
1 use serde::{Deserialize, Serialize};
2
3 #[derive(Clone, Hash, Serialize, Deserialize)]
1 4 pub struct User {
2 5 pub username: String,
3 6 pub instance: String,