JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

begin new protocol refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨2091700

⁨old/lib.rs_old⁩ - ⁨25974⁩ bytes
Raw
1 pub mod request;
2
3 pub mod model {
4 pub use giterated_models::model::*;
5 }
6
7 pub mod messages {
8 pub use giterated_models::messages::*;
9 }
10
11 use std::net::SocketAddr;
12 use std::str::FromStr;
13 use std::sync::Arc;
14 use std::{convert::Infallible, ops::Deref};
15
16 use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult};
17
18 use futures_util::{SinkExt, StreamExt};
19
20 use giterated_models::{
21 messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake},
22 model::instance::Instance,
23 };
24 use semver::Version;
25 use serde::Serialize;
26 use tokio::net::TcpStream;
27 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
28
29 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
30
31 #[macro_use]
32 extern crate tracing;
33
34 pub struct GiteratedApiBuilder {
35 our_instance: Instance,
36 our_private_key: Option<String>,
37 our_public_key: Option<String>,
38 target_instance: Option<Instance>,
39 }
40
41 pub trait AsInstance {
42 type Error: std::error::Error + Send + Sync + 'static;
43
44 fn into_instance(self) -> Result<Instance, Self::Error>;
45 }
46
47 impl AsInstance for &str {
48 type Error = <Instance as FromStr>::Err;
49
50 fn into_instance(self) -> Result<Instance, Self::Error> {
51 Instance::from_str(self)
52 }
53 }
54
55 impl AsInstance for Instance {
56 type Error = Infallible;
57
58 fn into_instance(self) -> Result<Instance, Self::Error> {
59 Ok(self)
60 }
61 }
62
63 impl GiteratedApiBuilder {
64 pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> {
65 Ok(Self {
66 our_instance: instance.into_instance()?,
67 our_private_key: None,
68 our_public_key: None,
69 target_instance: None,
70 })
71 }
72
73 pub fn from_local_for_other(
74 instance: impl AsInstance,
75 other: impl AsInstance,
76 ) -> Result<Self, anyhow::Error> {
77 Ok(Self {
78 our_instance: instance.into_instance()?,
79 our_private_key: None,
80 our_public_key: None,
81 target_instance: Some(other.into_instance()?),
82 })
83 }
84
85 pub fn private_key(&mut self, key: impl ToString) -> &mut Self {
86 self.our_private_key = Some(key.to_string());
87
88 self
89 }
90
91 pub fn public_key(&mut self, key: impl ToString) -> &mut Self {
92 self.our_public_key = Some(key.to_string());
93
94 self
95 }
96
97 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
98 Ok(GiteratedApi {
99 configuration: Arc::new(GiteratedApiConfiguration {
100 our_private_key: self.our_private_key.take().unwrap(),
101 our_public_key: self.our_public_key.take().unwrap(),
102 target_instance: self.target_instance.take(),
103 // todo
104 target_public_key: None,
105 }),
106 })
107 }
108 }
109
110 pub struct GiteratedConnectionPool {
111 target_instance: Instance,
112 socket_addr: Option<SocketAddr>,
113 }
114
115 #[async_trait::async_trait]
116 impl Manager for GiteratedConnectionPool {
117 type Type = Socket;
118 type Error = anyhow::Error;
119
120 async fn create(&self) -> Result<Socket, Self::Error> {
121 info!("Creating new Daemon connection");
122 let mut connection =
123 GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?;
124
125 // Handshake first!
126 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
127
128 Ok(connection)
129 }
130
131 async fn recycle(&self, socket: &mut Socket) -> RecycleResult<Self::Error> {
132 match socket.send(Message::Ping(vec![])).await {
133 Ok(_) => Ok(()),
134 Err(err) => {
135 info!("Socket died!");
136 Err(RecycleError::Backend(err.into()))
137 }
138 }
139 }
140 }
141
142 pub struct GiteratedApiConfiguration {
143 pub our_private_key: String,
144 pub our_public_key: String,
145 pub target_instance: Option<Instance>,
146 pub target_public_key: Option<String>,
147 }
148
149 #[derive(Clone)]
150 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
151
152 impl DaemonConnectionPool {
153 pub fn instance(&self) -> &Instance {
154 &self.0.manager().target_instance
155 }
156 }
157
158 impl Deref for DaemonConnectionPool {
159 type Target = Pool<GiteratedConnectionPool>;
160
161 fn deref(&self) -> &Self::Target {
162 &self.0
163 }
164 }
165
166 impl DaemonConnectionPool {
167 pub fn connect(
168 instance: impl ToOwned<Owned = Instance>,
169 ) -> Result<Self, BuildError<anyhow::Error>> {
170 let instance = instance.to_owned();
171 Ok(Self(
172 Pool::builder(GiteratedConnectionPool {
173 socket_addr: None,
174 target_instance: instance.to_owned(),
175 })
176 .build()?,
177 ))
178 }
179
180 pub fn connect_other(
181 instance_identity: impl ToOwned<Owned = Instance>,
182 connection_addr: SocketAddr,
183 ) -> Result<Self, BuildError<anyhow::Error>> {
184 Ok(Self(
185 Pool::builder(GiteratedConnectionPool {
186 target_instance: instance_identity.to_owned(),
187 socket_addr: Some(connection_addr),
188 })
189 .build()?,
190 ))
191 }
192 }
193
194 // Keep this private
195 // seriousyl.
196 #[derive(Clone)]
197 struct GiteratedApi {
198 configuration: Arc<GiteratedApiConfiguration>,
199 }
200
201 impl GiteratedApi {
202 // pub async fn public_key(&self) -> String {
203 // if let Some(public_key) = &self.configuration.target_public_key {
204 // public_key.clone()
205 // } else {
206 // assert!(self.configuration.target_instance.is_none());
207
208 // self.configuration.our_public_key.clone()
209 // }
210 // }
211
212 // /// Register on an [`Instance`].
213 // ///
214 // /// # Authorization
215 // /// - Must be made by the same instance its being sent to
216 // pub async fn register(
217 // &self,
218 // username: String,
219 // email: Option<String>,
220 // password: String,
221 // pool: &DaemonConnectionPool,
222 // ) -> Result<RegisterAccountResponse, anyhow::Error> {
223 // let mut connection = pool.0.get().await.unwrap();
224
225 // let message = InstanceAuthenticated::new(
226 // RegisterAccountRequest {
227 // username,
228 // email,
229 // password,
230 // },
231 // pool.0.manager().target_instance.clone(),
232 // self.configuration.our_private_key.clone(),
233 // )
234 // .unwrap();
235
236 // Self::send_message(
237 // &MessageKind::Authentication(AuthenticationMessage::Request(
238 // AuthenticationRequest::RegisterAccount(message),
239 // )),
240 // &mut connection,
241 // )
242 // .await?;
243
244 // while let Ok(payload) = self.next_payload(&mut connection).await {
245 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
246 // AuthenticationResponse::RegisterAccount(response),
247 // ))) = serde_json::from_slice(&payload)
248 // {
249 // return Ok(response);
250 // }
251 // }
252
253 // connection.close(None).await?;
254
255 // Err(SocketClosedError.into())
256 // }
257
258 // /// Create repository on the target instance.
259 // pub async fn create_repository(
260 // &self,
261 // user_token: String,
262 // name: String,
263 // description: Option<String>,
264 // visibility: RepositoryVisibility,
265 // default_branch: String,
266 // owner: User,
267 // pool: &DaemonConnectionPool,
268 // ) -> Result<bool, anyhow::Error> {
269 // let mut connection = pool.0.get().await.unwrap();
270
271 // let target_respository = Repository {
272 // owner: owner.clone(),
273 // name: name.clone(),
274 // instance: self
275 // .configuration
276 // .target_instance
277 // .as_ref()
278 // .unwrap_or(&pool.0.manager().target_instance)
279 // .clone(),
280 // };
281
282 // let request = CreateRepositoryRequest {
283 // name,
284 // description,
285 // visibility,
286 // default_branch,
287 // owner,
288 // };
289
290 // let message = UnvalidatedUserAuthenticated::new(
291 // request,
292 // user_token,
293 // self.configuration.our_private_key.clone(),
294 // )
295 // .unwrap();
296
297 // Self::send_message(
298 // &MessageKind::Repository(RepositoryMessage {
299 // target: target_respository,
300 // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
301 // message,
302 // )),
303 // }),
304 // &mut connection,
305 // )
306 // .await?;
307
308 // while let Ok(payload) = self.next_payload(&mut connection).await {
309 // if let Ok(MessageKind::Repository(RepositoryMessage {
310 // command:
311 // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
312 // ..
313 // })) = serde_json::from_slice(&payload)
314 // {
315 // return Ok(true);
316 // }
317 // }
318
319 // connection.close(None).await?;
320
321 // Err(SocketClosedError.into())
322 // }
323
324 // pub async fn repository_info(
325 // &self,
326 // token: &str,
327 // request: &RepositoryInfoRequest,
328 // pool: &DaemonConnectionPool,
329 // ) -> Result<RepositoryView, Error> {
330 // let mut connection = pool.0.get().await.unwrap();
331
332 // let message = UnvalidatedUserAuthenticated::new(
333 // request.clone(),
334 // token.to_string(),
335 // self.configuration.our_private_key.clone(),
336 // )
337 // .unwrap();
338
339 // Self::send_message(
340 // &MessageKind::Repository(RepositoryMessage {
341 // target: request.repository.clone(),
342 // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
343 // }),
344 // &mut connection,
345 // )
346 // .await?;
347
348 // loop {
349 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
350 // let payload = match self.next_payload(&mut connection).await {
351 // Ok(payload) => payload,
352 // Err(err) => {
353 // error!("Error while fetching next payload: {:?}", err);
354 // continue;
355 // }
356 // };
357
358 // if let Ok(MessageKind::Repository(RepositoryMessage {
359 // command:
360 // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
361 // ..
362 // })) = serde_json::from_slice(&payload)
363 // {
364 // return Ok(response);
365 // }
366 // }
367 // }
368
369 // /// Requests an authentication token for the given login.
370 // ///
371 // /// # Authorization
372 // /// This request can only be sent to the same instance from which
373 // /// it is issued.
374 // pub async fn authentication_token(
375 // &self,
376 // secret_key: String,
377 // username: String,
378 // password: String,
379 // pool: &DaemonConnectionPool,
380 // ) -> Result<String, Error> {
381 // let mut connection = pool.0.get().await.unwrap();
382
383 // let request = InstanceAuthenticated::new(
384 // AuthenticationTokenRequest {
385 // username,
386 // password,
387 // },
388 // pool.0.manager().target_instance.clone(),
389 // include_str!("example_keys/giterated.key").to_string(),
390 // )
391 // .unwrap();
392
393 // Self::send_message(
394 // &MessageKind::Authentication(AuthenticationMessage::Request(
395 // AuthenticationRequest::AuthenticationToken(request),
396 // )),
397 // &mut connection,
398 // )
399 // .await?;
400
401 // loop {
402 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
403 // let payload = match self.next_payload(&mut connection).await {
404 // Ok(payload) => payload,
405 // Err(err) => {
406 // error!("Error while fetching next payload: {:?}", err);
407 // continue;
408 // }
409 // };
410
411 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
412 // AuthenticationResponse::AuthenticationToken(response),
413 // ))) = serde_json::from_slice(&payload)
414 // {
415 // return Ok(response.token);
416 // }
417 // }
418 // }
419
420 // /// Requests a new token for the given login.
421 // ///
422 // /// # Authorization
423 // /// This request can only be sent to the same instance from which
424 // /// it is issued.
425 // pub async fn extend_token(
426 // &self,
427 // secret_key: String,
428 // token: String,
429 // pool: &DaemonConnectionPool,
430 // ) -> Result<Option<String>, Error> {
431 // let mut connection = pool.0.get().await.unwrap();
432
433 // let request = InstanceAuthenticated::new(
434 // TokenExtensionRequest { token },
435 // pool.0.manager().target_instance.clone(),
436 // self.configuration.our_private_key.clone(),
437 // )
438 // .unwrap();
439
440 // Self::send_message(
441 // &MessageKind::Authentication(AuthenticationMessage::Request(
442 // AuthenticationRequest::TokenExtension(request),
443 // )),
444 // &mut connection,
445 // )
446 // .await?;
447
448 // while let Ok(payload) = self.next_payload(&mut connection).await {
449 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
450 // AuthenticationResponse::TokenExtension(response),
451 // ))) = serde_json::from_slice(&payload)
452 // {
453 // return Ok(response.new_token);
454 // }
455 // }
456
457 // todo!()
458 // }
459
460 // pub async fn user_display_name(
461 // &self,
462 // user: impl ToOwned<Owned = User>,
463 // pool: &DaemonConnectionPool,
464 // ) -> Result<Option<String>, Error> {
465 // let mut connection = pool.0.get().await.unwrap();
466
467 // let request = UserDisplayNameRequest {
468 // user: user.to_owned(),
469 // };
470
471 // Self::send_message(
472 // &MessageKind::User(UserMessage {
473 // instance: self
474 // .configuration
475 // .target_instance
476 // .as_ref()
477 // .unwrap_or(&pool.0.manager().target_instance)
478 // .clone(),
479 // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)),
480 // }),
481 // &mut connection,
482 // )
483 // .await?;
484
485 // while let Ok(payload) = self.next_payload(&mut connection).await {
486 // if let Ok(MessageKind::User(UserMessage {
487 // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)),
488 // ..
489 // })) = serde_json::from_slice(&payload)
490 // {
491 // return Ok(response.display_name);
492 // }
493 // }
494
495 // connection.close(None).await?;
496
497 // Err(SocketClosedError.into())
498 // }
499
500 // pub async fn user_display_image(
501 // &self,
502 // user: impl ToOwned<Owned = User>,
503 // pool: &DaemonConnectionPool,
504 // ) -> Result<Option<String>, Error> {
505 // let mut connection = pool.0.get().await.unwrap();
506
507 // let request = UserDisplayImageRequest {
508 // user: user.to_owned(),
509 // };
510
511 // Self::send_message(
512 // &MessageKind::User(UserMessage {
513 // instance: self
514 // .configuration
515 // .target_instance
516 // .as_ref()
517 // .unwrap_or(&pool.0.manager().target_instance)
518 // .clone(),
519 // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)),
520 // }),
521 // &mut connection,
522 // )
523 // .await?;
524
525 // while let Ok(payload) = self.next_payload(&mut connection).await {
526 // if let Ok(MessageKind::User(UserMessage {
527 // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)),
528 // ..
529 // })) = serde_json::from_slice(&payload)
530 // {
531 // return Ok(response.image_url);
532 // }
533 // }
534
535 // connection.close(None).await?;
536
537 // Err(SocketClosedError.into())
538 // }
539
540 // pub async fn user_bio(
541 // &self,
542 // user: impl ToOwned<Owned = User>,
543 // pool: &DaemonConnectionPool,
544 // ) -> Result<Option<String>, Error> {
545 // let mut connection = pool.0.get().await.unwrap();
546
547 // let request = UserBioRequest {
548 // user: user.to_owned(),
549 // };
550
551 // Self::send_message(
552 // &MessageKind::User(UserMessage {
553 // instance: self
554 // .configuration
555 // .target_instance
556 // .as_ref()
557 // .unwrap_or(&pool.0.manager().target_instance)
558 // .clone(),
559 // message: UserMessageKind::Request(UserMessageRequest::Bio(request)),
560 // }),
561 // &mut connection,
562 // )
563 // .await?;
564
565 // while let Ok(payload) = self.next_payload(&mut connection).await {
566 // if let Ok(MessageKind::User(UserMessage {
567 // message: UserMessageKind::Response(UserMessageResponse::Bio(response)),
568 // ..
569 // })) = serde_json::from_slice(&payload)
570 // {
571 // return Ok(response.bio);
572 // }
573 // }
574
575 // connection.close(None).await?;
576
577 // Err(SocketClosedError.into())
578 // }
579
580 // pub async fn user_repositories(
581 // &self,
582 // user: impl ToOwned<Owned = User>,
583 // pool: &DaemonConnectionPool,
584 // ) -> Result<Vec<RepositorySummary>, Error> {
585 // let mut connection = pool.0.get().await.unwrap();
586
587 // let request = UserRepositoriesRequest {
588 // user: user.to_owned(),
589 // };
590
591 // Self::send_message(
592 // &MessageKind::User(UserMessage {
593 // instance: self
594 // .configuration
595 // .target_instance
596 // .as_ref()
597 // .unwrap_or(&pool.0.manager().target_instance)
598 // .clone(),
599 // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)),
600 // }),
601 // &mut connection,
602 // )
603 // .await?;
604
605 // while let Ok(payload) = self.next_payload(&mut connection).await {
606 // if let Ok(MessageKind::User(UserMessage {
607 // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)),
608 // ..
609 // })) = serde_json::from_slice(&payload)
610 // {
611 // return Ok(response.repositories);
612 // }
613 // }
614
615 // connection.close(None).await?;
616
617 // Err(SocketClosedError.into())
618 // }
619
620 async fn connect_to(
621 instance: &Instance,
622 socket_addr: &Option<SocketAddr>,
623 ) -> Result<Socket, anyhow::Error> {
624 if let Some(addr) = socket_addr {
625 info!(
626 "Connecting to {}",
627 format!("ws://{}/.giterated/daemon/", addr)
628 );
629 let (websocket, _response) =
630 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
631 info!("Connection established with {}", addr);
632
633 Ok(websocket)
634 } else {
635 info!(
636 "Connecting to {}",
637 format!("wss://{}/.giterated/daemon/", instance.url)
638 );
639 let (websocket, _response) =
640 connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?;
641 info!("Connection established with {}", instance.url);
642
643 Ok(websocket)
644 }
645 }
646
647 async fn handle_handshake(
648 socket: &mut Socket,
649 _instance: &Instance,
650 ) -> Result<(), anyhow::Error> {
651 // Send handshake initiation
652 Self::send_message(
653 &InitiateHandshake {
654 version: Version::from_str("0.0.0").unwrap(),
655 },
656 socket,
657 )
658 .await?;
659
660 while let Some(message) = socket.next().await {
661 let message = match message {
662 Ok(message) => message,
663 Err(err) => {
664 error!("Error reading message: {:?}", err);
665 continue;
666 }
667 };
668
669 let payload = match message {
670 Message::Text(text) => text.into_bytes(),
671 Message::Binary(bytes) => bytes,
672 Message::Ping(_) => continue,
673 Message::Pong(_) => continue,
674 Message::Close(_) => {
675 socket.close(None).await?;
676
677 return Err(SocketClosedError.into());
678 }
679 _ => unreachable!(),
680 };
681
682 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
683
684 // We try deserializing the finalize response first as it is smaller and just as common
685 if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) {
686 if finalize.success {
687 info!("Handshake success!");
688
689 return Ok(());
690 } else {
691 panic!()
692 }
693 } else {
694 match serde_json::from_slice::<HandshakeResponse>(&payload) {
695 Ok(response) => {
696 // let message = if !validate_version(&response.version) {
697 // error!(
698 // "Version compatibility failure! Our Version: {}, Their Version: {}",
699 // version(),
700 // response.version
701 // );
702
703 // HandshakeFinalize { success: false }
704 // } else {
705 // info!("Connected with a compatible version");
706
707 // HandshakeFinalize { success: true }
708 // };
709
710 warn!("Version compatibility has been no-op'd");
711
712 let message = HandshakeFinalize { success: true };
713 // Send [`HandshakeFinalize`] to indicate if we're compatible or not
714 Self::send_message(&message, socket).await?;
715 }
716 Err(err) => {
717 error!("Error deserializing message: {:?}", err);
718 continue;
719 }
720 }
721 }
722 }
723
724 Ok(())
725 }
726
727 async fn send_message<T: Serialize>(
728 message: &T,
729 socket: &mut Socket,
730 ) -> Result<(), anyhow::Error> {
731 socket
732 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
733 .await?;
734
735 Ok(())
736 }
737
738 // async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
739 // while let Some(message) = socket.next().await {
740 // let message = message?;
741
742 // match message {
743 // Message::Text(text) => return Ok(text.into_bytes()),
744 // Message::Binary(bytes) => return Ok(bytes),
745 // Message::Ping(_) => continue,
746 // Message::Pong(_) => continue,
747 // Message::Close(_) => {
748 // socket.close(None).await?;
749
750 // return Err(SocketClosedError.into());
751 // }
752 // _ => unreachable!(),
753 // }
754 // }
755
756 // socket.close(None).await?;
757
758 // return Err(SocketClosedError.into());
759 // }
760 }
761
762 #[derive(Debug, thiserror::Error)]
763 #[error("closed socket")]
764 struct SocketClosedError;