JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

Update to new structure

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨b49b7cd

⁨src/lib.rs⁩ - ⁨25796⁩ bytes
Raw
1 pub mod request;
2
3 pub mod model {
4 pub use giterated_models::model::*;
5 }
6
7 pub mod messages {
8 pub use giterated_models::messages::*;
9 }
10
11 use std::convert::Infallible;
12 use std::net::SocketAddr;
13 use std::str::FromStr;
14 use std::sync::Arc;
15
16 use deadpool::managed::{BuildError, Manager, Pool, RecycleError, RecycleResult};
17
18 use futures_util::{SinkExt, StreamExt};
19
20 use giterated_models::{
21 messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake},
22 model::instance::Instance,
23 };
24 use semver::Version;
25 use serde::Serialize;
26 use tokio::net::TcpStream;
27 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
28
29 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
30
31 #[macro_use]
32 extern crate tracing;
33
34 pub struct GiteratedApiBuilder {
35 our_instance: Instance,
36 our_private_key: Option<String>,
37 our_public_key: Option<String>,
38 target_instance: Option<Instance>,
39 }
40
41 pub trait AsInstance {
42 type Error: std::error::Error + Send + Sync + 'static;
43
44 fn into_instance(self) -> Result<Instance, Self::Error>;
45 }
46
47 impl AsInstance for &str {
48 type Error = <Instance as FromStr>::Err;
49
50 fn into_instance(self) -> Result<Instance, Self::Error> {
51 Instance::from_str(self)
52 }
53 }
54
55 impl AsInstance for Instance {
56 type Error = Infallible;
57
58 fn into_instance(self) -> Result<Instance, Self::Error> {
59 Ok(self)
60 }
61 }
62
63 impl GiteratedApiBuilder {
64 pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> {
65 Ok(Self {
66 our_instance: instance.into_instance()?,
67 our_private_key: None,
68 our_public_key: None,
69 target_instance: None,
70 })
71 }
72
73 pub fn from_local_for_other(
74 instance: impl AsInstance,
75 other: impl AsInstance,
76 ) -> Result<Self, anyhow::Error> {
77 Ok(Self {
78 our_instance: instance.into_instance()?,
79 our_private_key: None,
80 our_public_key: None,
81 target_instance: Some(other.into_instance()?),
82 })
83 }
84
85 pub fn private_key(&mut self, key: impl ToString) -> &mut Self {
86 self.our_private_key = Some(key.to_string());
87
88 self
89 }
90
91 pub fn public_key(&mut self, key: impl ToString) -> &mut Self {
92 self.our_public_key = Some(key.to_string());
93
94 self
95 }
96
97 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
98 Ok(GiteratedApi {
99 configuration: Arc::new(GiteratedApiConfiguration {
100 our_private_key: self.our_private_key.take().unwrap(),
101 our_public_key: self.our_public_key.take().unwrap(),
102 target_instance: self.target_instance.take(),
103 // todo
104 target_public_key: None,
105 }),
106 })
107 }
108 }
109
110 struct GiteratedConnectionPool {
111 target_instance: Instance,
112 socket_addr: Option<SocketAddr>,
113 }
114
115 #[async_trait::async_trait]
116 impl Manager for GiteratedConnectionPool {
117 type Type = Socket;
118 type Error = anyhow::Error;
119
120 async fn create(&self) -> Result<Socket, Self::Error> {
121 info!("Creating new Daemon connection");
122 let mut connection =
123 GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?;
124
125 // Handshake first!
126 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
127
128 Ok(connection)
129 }
130
131 async fn recycle(&self, socket: &mut Socket) -> RecycleResult<Self::Error> {
132 match socket.send(Message::Ping(vec![])).await {
133 Ok(_) => Ok(()),
134 Err(err) => {
135 info!("Socket died!");
136 Err(RecycleError::Backend(err.into()))
137 }
138 }
139 }
140 }
141
142 pub struct GiteratedApiConfiguration {
143 pub our_private_key: String,
144 pub our_public_key: String,
145 pub target_instance: Option<Instance>,
146 pub target_public_key: Option<String>,
147 }
148
149 #[derive(Clone)]
150 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
151
152 impl DaemonConnectionPool {
153 pub fn instance(&self) -> &Instance {
154 &self.0.manager().target_instance
155 }
156 }
157
158 impl DaemonConnectionPool {
159 pub fn connect(
160 instance: impl ToOwned<Owned = Instance>,
161 ) -> Result<Self, BuildError<anyhow::Error>> {
162 let instance = instance.to_owned();
163 Ok(Self(
164 Pool::builder(GiteratedConnectionPool {
165 socket_addr: None,
166 target_instance: instance.to_owned(),
167 })
168 .build()?,
169 ))
170 }
171
172 pub fn connect_other(
173 instance_identity: impl ToOwned<Owned = Instance>,
174 connection_addr: SocketAddr,
175 ) -> Result<Self, BuildError<anyhow::Error>> {
176 Ok(Self(
177 Pool::builder(GiteratedConnectionPool {
178 target_instance: instance_identity.to_owned(),
179 socket_addr: Some(connection_addr),
180 })
181 .build()?,
182 ))
183 }
184 }
185
186 // Keep this private
187 // seriousyl.
188 #[derive(Clone)]
189 struct GiteratedApi {
190 configuration: Arc<GiteratedApiConfiguration>,
191 }
192
193 impl GiteratedApi {
194 // pub async fn public_key(&self) -> String {
195 // if let Some(public_key) = &self.configuration.target_public_key {
196 // public_key.clone()
197 // } else {
198 // assert!(self.configuration.target_instance.is_none());
199
200 // self.configuration.our_public_key.clone()
201 // }
202 // }
203
204 // /// Register on an [`Instance`].
205 // ///
206 // /// # Authorization
207 // /// - Must be made by the same instance its being sent to
208 // pub async fn register(
209 // &self,
210 // username: String,
211 // email: Option<String>,
212 // password: String,
213 // pool: &DaemonConnectionPool,
214 // ) -> Result<RegisterAccountResponse, anyhow::Error> {
215 // let mut connection = pool.0.get().await.unwrap();
216
217 // let message = InstanceAuthenticated::new(
218 // RegisterAccountRequest {
219 // username,
220 // email,
221 // password,
222 // },
223 // pool.0.manager().target_instance.clone(),
224 // self.configuration.our_private_key.clone(),
225 // )
226 // .unwrap();
227
228 // Self::send_message(
229 // &MessageKind::Authentication(AuthenticationMessage::Request(
230 // AuthenticationRequest::RegisterAccount(message),
231 // )),
232 // &mut connection,
233 // )
234 // .await?;
235
236 // while let Ok(payload) = self.next_payload(&mut connection).await {
237 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
238 // AuthenticationResponse::RegisterAccount(response),
239 // ))) = serde_json::from_slice(&payload)
240 // {
241 // return Ok(response);
242 // }
243 // }
244
245 // connection.close(None).await?;
246
247 // Err(SocketClosedError.into())
248 // }
249
250 // /// Create repository on the target instance.
251 // pub async fn create_repository(
252 // &self,
253 // user_token: String,
254 // name: String,
255 // description: Option<String>,
256 // visibility: RepositoryVisibility,
257 // default_branch: String,
258 // owner: User,
259 // pool: &DaemonConnectionPool,
260 // ) -> Result<bool, anyhow::Error> {
261 // let mut connection = pool.0.get().await.unwrap();
262
263 // let target_respository = Repository {
264 // owner: owner.clone(),
265 // name: name.clone(),
266 // instance: self
267 // .configuration
268 // .target_instance
269 // .as_ref()
270 // .unwrap_or(&pool.0.manager().target_instance)
271 // .clone(),
272 // };
273
274 // let request = CreateRepositoryRequest {
275 // name,
276 // description,
277 // visibility,
278 // default_branch,
279 // owner,
280 // };
281
282 // let message = UnvalidatedUserAuthenticated::new(
283 // request,
284 // user_token,
285 // self.configuration.our_private_key.clone(),
286 // )
287 // .unwrap();
288
289 // Self::send_message(
290 // &MessageKind::Repository(RepositoryMessage {
291 // target: target_respository,
292 // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
293 // message,
294 // )),
295 // }),
296 // &mut connection,
297 // )
298 // .await?;
299
300 // while let Ok(payload) = self.next_payload(&mut connection).await {
301 // if let Ok(MessageKind::Repository(RepositoryMessage {
302 // command:
303 // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
304 // ..
305 // })) = serde_json::from_slice(&payload)
306 // {
307 // return Ok(true);
308 // }
309 // }
310
311 // connection.close(None).await?;
312
313 // Err(SocketClosedError.into())
314 // }
315
316 // pub async fn repository_info(
317 // &self,
318 // token: &str,
319 // request: &RepositoryInfoRequest,
320 // pool: &DaemonConnectionPool,
321 // ) -> Result<RepositoryView, Error> {
322 // let mut connection = pool.0.get().await.unwrap();
323
324 // let message = UnvalidatedUserAuthenticated::new(
325 // request.clone(),
326 // token.to_string(),
327 // self.configuration.our_private_key.clone(),
328 // )
329 // .unwrap();
330
331 // Self::send_message(
332 // &MessageKind::Repository(RepositoryMessage {
333 // target: request.repository.clone(),
334 // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
335 // }),
336 // &mut connection,
337 // )
338 // .await?;
339
340 // loop {
341 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
342 // let payload = match self.next_payload(&mut connection).await {
343 // Ok(payload) => payload,
344 // Err(err) => {
345 // error!("Error while fetching next payload: {:?}", err);
346 // continue;
347 // }
348 // };
349
350 // if let Ok(MessageKind::Repository(RepositoryMessage {
351 // command:
352 // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
353 // ..
354 // })) = serde_json::from_slice(&payload)
355 // {
356 // return Ok(response);
357 // }
358 // }
359 // }
360
361 // /// Requests an authentication token for the given login.
362 // ///
363 // /// # Authorization
364 // /// This request can only be sent to the same instance from which
365 // /// it is issued.
366 // pub async fn authentication_token(
367 // &self,
368 // secret_key: String,
369 // username: String,
370 // password: String,
371 // pool: &DaemonConnectionPool,
372 // ) -> Result<String, Error> {
373 // let mut connection = pool.0.get().await.unwrap();
374
375 // let request = InstanceAuthenticated::new(
376 // AuthenticationTokenRequest {
377 // username,
378 // password,
379 // },
380 // pool.0.manager().target_instance.clone(),
381 // include_str!("example_keys/giterated.key").to_string(),
382 // )
383 // .unwrap();
384
385 // Self::send_message(
386 // &MessageKind::Authentication(AuthenticationMessage::Request(
387 // AuthenticationRequest::AuthenticationToken(request),
388 // )),
389 // &mut connection,
390 // )
391 // .await?;
392
393 // loop {
394 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
395 // let payload = match self.next_payload(&mut connection).await {
396 // Ok(payload) => payload,
397 // Err(err) => {
398 // error!("Error while fetching next payload: {:?}", err);
399 // continue;
400 // }
401 // };
402
403 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
404 // AuthenticationResponse::AuthenticationToken(response),
405 // ))) = serde_json::from_slice(&payload)
406 // {
407 // return Ok(response.token);
408 // }
409 // }
410 // }
411
412 // /// Requests a new token for the given login.
413 // ///
414 // /// # Authorization
415 // /// This request can only be sent to the same instance from which
416 // /// it is issued.
417 // pub async fn extend_token(
418 // &self,
419 // secret_key: String,
420 // token: String,
421 // pool: &DaemonConnectionPool,
422 // ) -> Result<Option<String>, Error> {
423 // let mut connection = pool.0.get().await.unwrap();
424
425 // let request = InstanceAuthenticated::new(
426 // TokenExtensionRequest { token },
427 // pool.0.manager().target_instance.clone(),
428 // self.configuration.our_private_key.clone(),
429 // )
430 // .unwrap();
431
432 // Self::send_message(
433 // &MessageKind::Authentication(AuthenticationMessage::Request(
434 // AuthenticationRequest::TokenExtension(request),
435 // )),
436 // &mut connection,
437 // )
438 // .await?;
439
440 // while let Ok(payload) = self.next_payload(&mut connection).await {
441 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
442 // AuthenticationResponse::TokenExtension(response),
443 // ))) = serde_json::from_slice(&payload)
444 // {
445 // return Ok(response.new_token);
446 // }
447 // }
448
449 // todo!()
450 // }
451
452 // pub async fn user_display_name(
453 // &self,
454 // user: impl ToOwned<Owned = User>,
455 // pool: &DaemonConnectionPool,
456 // ) -> Result<Option<String>, Error> {
457 // let mut connection = pool.0.get().await.unwrap();
458
459 // let request = UserDisplayNameRequest {
460 // user: user.to_owned(),
461 // };
462
463 // Self::send_message(
464 // &MessageKind::User(UserMessage {
465 // instance: self
466 // .configuration
467 // .target_instance
468 // .as_ref()
469 // .unwrap_or(&pool.0.manager().target_instance)
470 // .clone(),
471 // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)),
472 // }),
473 // &mut connection,
474 // )
475 // .await?;
476
477 // while let Ok(payload) = self.next_payload(&mut connection).await {
478 // if let Ok(MessageKind::User(UserMessage {
479 // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)),
480 // ..
481 // })) = serde_json::from_slice(&payload)
482 // {
483 // return Ok(response.display_name);
484 // }
485 // }
486
487 // connection.close(None).await?;
488
489 // Err(SocketClosedError.into())
490 // }
491
492 // pub async fn user_display_image(
493 // &self,
494 // user: impl ToOwned<Owned = User>,
495 // pool: &DaemonConnectionPool,
496 // ) -> Result<Option<String>, Error> {
497 // let mut connection = pool.0.get().await.unwrap();
498
499 // let request = UserDisplayImageRequest {
500 // user: user.to_owned(),
501 // };
502
503 // Self::send_message(
504 // &MessageKind::User(UserMessage {
505 // instance: self
506 // .configuration
507 // .target_instance
508 // .as_ref()
509 // .unwrap_or(&pool.0.manager().target_instance)
510 // .clone(),
511 // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)),
512 // }),
513 // &mut connection,
514 // )
515 // .await?;
516
517 // while let Ok(payload) = self.next_payload(&mut connection).await {
518 // if let Ok(MessageKind::User(UserMessage {
519 // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)),
520 // ..
521 // })) = serde_json::from_slice(&payload)
522 // {
523 // return Ok(response.image_url);
524 // }
525 // }
526
527 // connection.close(None).await?;
528
529 // Err(SocketClosedError.into())
530 // }
531
532 // pub async fn user_bio(
533 // &self,
534 // user: impl ToOwned<Owned = User>,
535 // pool: &DaemonConnectionPool,
536 // ) -> Result<Option<String>, Error> {
537 // let mut connection = pool.0.get().await.unwrap();
538
539 // let request = UserBioRequest {
540 // user: user.to_owned(),
541 // };
542
543 // Self::send_message(
544 // &MessageKind::User(UserMessage {
545 // instance: self
546 // .configuration
547 // .target_instance
548 // .as_ref()
549 // .unwrap_or(&pool.0.manager().target_instance)
550 // .clone(),
551 // message: UserMessageKind::Request(UserMessageRequest::Bio(request)),
552 // }),
553 // &mut connection,
554 // )
555 // .await?;
556
557 // while let Ok(payload) = self.next_payload(&mut connection).await {
558 // if let Ok(MessageKind::User(UserMessage {
559 // message: UserMessageKind::Response(UserMessageResponse::Bio(response)),
560 // ..
561 // })) = serde_json::from_slice(&payload)
562 // {
563 // return Ok(response.bio);
564 // }
565 // }
566
567 // connection.close(None).await?;
568
569 // Err(SocketClosedError.into())
570 // }
571
572 // pub async fn user_repositories(
573 // &self,
574 // user: impl ToOwned<Owned = User>,
575 // pool: &DaemonConnectionPool,
576 // ) -> Result<Vec<RepositorySummary>, Error> {
577 // let mut connection = pool.0.get().await.unwrap();
578
579 // let request = UserRepositoriesRequest {
580 // user: user.to_owned(),
581 // };
582
583 // Self::send_message(
584 // &MessageKind::User(UserMessage {
585 // instance: self
586 // .configuration
587 // .target_instance
588 // .as_ref()
589 // .unwrap_or(&pool.0.manager().target_instance)
590 // .clone(),
591 // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)),
592 // }),
593 // &mut connection,
594 // )
595 // .await?;
596
597 // while let Ok(payload) = self.next_payload(&mut connection).await {
598 // if let Ok(MessageKind::User(UserMessage {
599 // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)),
600 // ..
601 // })) = serde_json::from_slice(&payload)
602 // {
603 // return Ok(response.repositories);
604 // }
605 // }
606
607 // connection.close(None).await?;
608
609 // Err(SocketClosedError.into())
610 // }
611
612 async fn connect_to(
613 instance: &Instance,
614 socket_addr: &Option<SocketAddr>,
615 ) -> Result<Socket, anyhow::Error> {
616 if let Some(addr) = socket_addr {
617 info!(
618 "Connecting to {}",
619 format!("ws://{}/.giterated/daemon/", addr)
620 );
621 let (websocket, _response) =
622 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
623 info!("Connection established with {}", addr);
624
625 Ok(websocket)
626 } else {
627 info!(
628 "Connecting to {}",
629 format!("wss://{}/.giterated/daemon/", instance.url)
630 );
631 let (websocket, _response) =
632 connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?;
633 info!("Connection established with {}", instance.url);
634
635 Ok(websocket)
636 }
637 }
638
639 async fn handle_handshake(
640 socket: &mut Socket,
641 _instance: &Instance,
642 ) -> Result<(), anyhow::Error> {
643 // Send handshake initiation
644 Self::send_message(
645 &InitiateHandshake {
646 version: Version::from_str("0.0.0").unwrap(),
647 },
648 socket,
649 )
650 .await?;
651
652 while let Some(message) = socket.next().await {
653 let message = match message {
654 Ok(message) => message,
655 Err(err) => {
656 error!("Error reading message: {:?}", err);
657 continue;
658 }
659 };
660
661 let payload = match message {
662 Message::Text(text) => text.into_bytes(),
663 Message::Binary(bytes) => bytes,
664 Message::Ping(_) => continue,
665 Message::Pong(_) => continue,
666 Message::Close(_) => {
667 socket.close(None).await?;
668
669 return Err(SocketClosedError.into());
670 }
671 _ => unreachable!(),
672 };
673
674 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
675
676 // We try deserializing the finalize response first as it is smaller and just as common
677 if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) {
678 if finalize.success {
679 info!("Handshake success!");
680
681 return Ok(());
682 } else {
683 panic!()
684 }
685 } else {
686 match serde_json::from_slice::<HandshakeResponse>(&payload) {
687 Ok(response) => {
688 // let message = if !validate_version(&response.version) {
689 // error!(
690 // "Version compatibility failure! Our Version: {}, Their Version: {}",
691 // version(),
692 // response.version
693 // );
694
695 // HandshakeFinalize { success: false }
696 // } else {
697 // info!("Connected with a compatible version");
698
699 // HandshakeFinalize { success: true }
700 // };
701
702 warn!("Version compatibility has been no-op'd");
703
704 let message = HandshakeFinalize { success: true };
705 // Send [`HandshakeFinalize`] to indicate if we're compatible or not
706 Self::send_message(&message, socket).await?;
707 }
708 Err(err) => {
709 error!("Error deserializing message: {:?}", err);
710 continue;
711 }
712 }
713 }
714 }
715
716 Ok(())
717 }
718
719 async fn send_message<T: Serialize>(
720 message: &T,
721 socket: &mut Socket,
722 ) -> Result<(), anyhow::Error> {
723 socket
724 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
725 .await?;
726
727 Ok(())
728 }
729
730 // async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
731 // while let Some(message) = socket.next().await {
732 // let message = message?;
733
734 // match message {
735 // Message::Text(text) => return Ok(text.into_bytes()),
736 // Message::Binary(bytes) => return Ok(bytes),
737 // Message::Ping(_) => continue,
738 // Message::Pong(_) => continue,
739 // Message::Close(_) => {
740 // socket.close(None).await?;
741
742 // return Err(SocketClosedError.into());
743 // }
744 // _ => unreachable!(),
745 // }
746 // }
747
748 // socket.close(None).await?;
749
750 // return Err(SocketClosedError.into());
751 // }
752 }
753
754 #[derive(Debug, thiserror::Error)]
755 #[error("closed socket")]
756 struct SocketClosedError;
757