JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

begin new protocol refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨2091700

Showing ⁨⁨11⁩ changed files⁩ with ⁨⁨1274⁩ insertions⁩ and ⁨⁨984⁩ deletions⁩

old/lib.rs_old

View file
@@ -0,0 +1,764 @@
1 pub mod request;
2
3 pub mod model {
4 pub use giterated_models::model::*;
5 }
6
7 pub mod messages {
8 pub use giterated_models::messages::*;
9 }
10
11 use std::net::SocketAddr;
12 use std::str::FromStr;
13 use std::sync::Arc;
14 use std::{convert::Infallible, ops::Deref};
15
16 use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult};
17
18 use futures_util::{SinkExt, StreamExt};
19
20 use giterated_models::{
21 messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake},
22 model::instance::Instance,
23 };
24 use semver::Version;
25 use serde::Serialize;
26 use tokio::net::TcpStream;
27 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
28
29 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
30
31 #[macro_use]
32 extern crate tracing;
33
34 pub struct GiteratedApiBuilder {
35 our_instance: Instance,
36 our_private_key: Option<String>,
37 our_public_key: Option<String>,
38 target_instance: Option<Instance>,
39 }
40
41 pub trait AsInstance {
42 type Error: std::error::Error + Send + Sync + 'static;
43
44 fn into_instance(self) -> Result<Instance, Self::Error>;
45 }
46
47 impl AsInstance for &str {
48 type Error = <Instance as FromStr>::Err;
49
50 fn into_instance(self) -> Result<Instance, Self::Error> {
51 Instance::from_str(self)
52 }
53 }
54
55 impl AsInstance for Instance {
56 type Error = Infallible;
57
58 fn into_instance(self) -> Result<Instance, Self::Error> {
59 Ok(self)
60 }
61 }
62
63 impl GiteratedApiBuilder {
64 pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> {
65 Ok(Self {
66 our_instance: instance.into_instance()?,
67 our_private_key: None,
68 our_public_key: None,
69 target_instance: None,
70 })
71 }
72
73 pub fn from_local_for_other(
74 instance: impl AsInstance,
75 other: impl AsInstance,
76 ) -> Result<Self, anyhow::Error> {
77 Ok(Self {
78 our_instance: instance.into_instance()?,
79 our_private_key: None,
80 our_public_key: None,
81 target_instance: Some(other.into_instance()?),
82 })
83 }
84
85 pub fn private_key(&mut self, key: impl ToString) -> &mut Self {
86 self.our_private_key = Some(key.to_string());
87
88 self
89 }
90
91 pub fn public_key(&mut self, key: impl ToString) -> &mut Self {
92 self.our_public_key = Some(key.to_string());
93
94 self
95 }
96
97 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
98 Ok(GiteratedApi {
99 configuration: Arc::new(GiteratedApiConfiguration {
100 our_private_key: self.our_private_key.take().unwrap(),
101 our_public_key: self.our_public_key.take().unwrap(),
102 target_instance: self.target_instance.take(),
103 // todo
104 target_public_key: None,
105 }),
106 })
107 }
108 }
109
110 pub struct GiteratedConnectionPool {
111 target_instance: Instance,
112 socket_addr: Option<SocketAddr>,
113 }
114
115 #[async_trait::async_trait]
116 impl Manager for GiteratedConnectionPool {
117 type Type = Socket;
118 type Error = anyhow::Error;
119
120 async fn create(&self) -> Result<Socket, Self::Error> {
121 info!("Creating new Daemon connection");
122 let mut connection =
123 GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?;
124
125 // Handshake first!
126 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
127
128 Ok(connection)
129 }
130
131 async fn recycle(&self, socket: &mut Socket) -> RecycleResult<Self::Error> {
132 match socket.send(Message::Ping(vec![])).await {
133 Ok(_) => Ok(()),
134 Err(err) => {
135 info!("Socket died!");
136 Err(RecycleError::Backend(err.into()))
137 }
138 }
139 }
140 }
141
142 pub struct GiteratedApiConfiguration {
143 pub our_private_key: String,
144 pub our_public_key: String,
145 pub target_instance: Option<Instance>,
146 pub target_public_key: Option<String>,
147 }
148
149 #[derive(Clone)]
150 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
151
152 impl DaemonConnectionPool {
153 pub fn instance(&self) -> &Instance {
154 &self.0.manager().target_instance
155 }
156 }
157
158 impl Deref for DaemonConnectionPool {
159 type Target = Pool<GiteratedConnectionPool>;
160
161 fn deref(&self) -> &Self::Target {
162 &self.0
163 }
164 }
165
166 impl DaemonConnectionPool {
167 pub fn connect(
168 instance: impl ToOwned<Owned = Instance>,
169 ) -> Result<Self, BuildError<anyhow::Error>> {
170 let instance = instance.to_owned();
171 Ok(Self(
172 Pool::builder(GiteratedConnectionPool {
173 socket_addr: None,
174 target_instance: instance.to_owned(),
175 })
176 .build()?,
177 ))
178 }
179
180 pub fn connect_other(
181 instance_identity: impl ToOwned<Owned = Instance>,
182 connection_addr: SocketAddr,
183 ) -> Result<Self, BuildError<anyhow::Error>> {
184 Ok(Self(
185 Pool::builder(GiteratedConnectionPool {
186 target_instance: instance_identity.to_owned(),
187 socket_addr: Some(connection_addr),
188 })
189 .build()?,
190 ))
191 }
192 }
193
194 // Keep this private
195 // seriousyl.
196 #[derive(Clone)]
197 struct GiteratedApi {
198 configuration: Arc<GiteratedApiConfiguration>,
199 }
200
201 impl GiteratedApi {
202 // pub async fn public_key(&self) -> String {
203 // if let Some(public_key) = &self.configuration.target_public_key {
204 // public_key.clone()
205 // } else {
206 // assert!(self.configuration.target_instance.is_none());
207
208 // self.configuration.our_public_key.clone()
209 // }
210 // }
211
212 // /// Register on an [`Instance`].
213 // ///
214 // /// # Authorization
215 // /// - Must be made by the same instance its being sent to
216 // pub async fn register(
217 // &self,
218 // username: String,
219 // email: Option<String>,
220 // password: String,
221 // pool: &DaemonConnectionPool,
222 // ) -> Result<RegisterAccountResponse, anyhow::Error> {
223 // let mut connection = pool.0.get().await.unwrap();
224
225 // let message = InstanceAuthenticated::new(
226 // RegisterAccountRequest {
227 // username,
228 // email,
229 // password,
230 // },
231 // pool.0.manager().target_instance.clone(),
232 // self.configuration.our_private_key.clone(),
233 // )
234 // .unwrap();
235
236 // Self::send_message(
237 // &MessageKind::Authentication(AuthenticationMessage::Request(
238 // AuthenticationRequest::RegisterAccount(message),
239 // )),
240 // &mut connection,
241 // )
242 // .await?;
243
244 // while let Ok(payload) = self.next_payload(&mut connection).await {
245 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
246 // AuthenticationResponse::RegisterAccount(response),
247 // ))) = serde_json::from_slice(&payload)
248 // {
249 // return Ok(response);
250 // }
251 // }
252
253 // connection.close(None).await?;
254
255 // Err(SocketClosedError.into())
256 // }
257
258 // /// Create repository on the target instance.
259 // pub async fn create_repository(
260 // &self,
261 // user_token: String,
262 // name: String,
263 // description: Option<String>,
264 // visibility: RepositoryVisibility,
265 // default_branch: String,
266 // owner: User,
267 // pool: &DaemonConnectionPool,
268 // ) -> Result<bool, anyhow::Error> {
269 // let mut connection = pool.0.get().await.unwrap();
270
271 // let target_respository = Repository {
272 // owner: owner.clone(),
273 // name: name.clone(),
274 // instance: self
275 // .configuration
276 // .target_instance
277 // .as_ref()
278 // .unwrap_or(&pool.0.manager().target_instance)
279 // .clone(),
280 // };
281
282 // let request = CreateRepositoryRequest {
283 // name,
284 // description,
285 // visibility,
286 // default_branch,
287 // owner,
288 // };
289
290 // let message = UnvalidatedUserAuthenticated::new(
291 // request,
292 // user_token,
293 // self.configuration.our_private_key.clone(),
294 // )
295 // .unwrap();
296
297 // Self::send_message(
298 // &MessageKind::Repository(RepositoryMessage {
299 // target: target_respository,
300 // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
301 // message,
302 // )),
303 // }),
304 // &mut connection,
305 // )
306 // .await?;
307
308 // while let Ok(payload) = self.next_payload(&mut connection).await {
309 // if let Ok(MessageKind::Repository(RepositoryMessage {
310 // command:
311 // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
312 // ..
313 // })) = serde_json::from_slice(&payload)
314 // {
315 // return Ok(true);
316 // }
317 // }
318
319 // connection.close(None).await?;
320
321 // Err(SocketClosedError.into())
322 // }
323
324 // pub async fn repository_info(
325 // &self,
326 // token: &str,
327 // request: &RepositoryInfoRequest,
328 // pool: &DaemonConnectionPool,
329 // ) -> Result<RepositoryView, Error> {
330 // let mut connection = pool.0.get().await.unwrap();
331
332 // let message = UnvalidatedUserAuthenticated::new(
333 // request.clone(),
334 // token.to_string(),
335 // self.configuration.our_private_key.clone(),
336 // )
337 // .unwrap();
338
339 // Self::send_message(
340 // &MessageKind::Repository(RepositoryMessage {
341 // target: request.repository.clone(),
342 // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
343 // }),
344 // &mut connection,
345 // )
346 // .await?;
347
348 // loop {
349 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
350 // let payload = match self.next_payload(&mut connection).await {
351 // Ok(payload) => payload,
352 // Err(err) => {
353 // error!("Error while fetching next payload: {:?}", err);
354 // continue;
355 // }
356 // };
357
358 // if let Ok(MessageKind::Repository(RepositoryMessage {
359 // command:
360 // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
361 // ..
362 // })) = serde_json::from_slice(&payload)
363 // {
364 // return Ok(response);
365 // }
366 // }
367 // }
368
369 // /// Requests an authentication token for the given login.
370 // ///
371 // /// # Authorization
372 // /// This request can only be sent to the same instance from which
373 // /// it is issued.
374 // pub async fn authentication_token(
375 // &self,
376 // secret_key: String,
377 // username: String,
378 // password: String,
379 // pool: &DaemonConnectionPool,
380 // ) -> Result<String, Error> {
381 // let mut connection = pool.0.get().await.unwrap();
382
383 // let request = InstanceAuthenticated::new(
384 // AuthenticationTokenRequest {
385 // username,
386 // password,
387 // },
388 // pool.0.manager().target_instance.clone(),
389 // include_str!("example_keys/giterated.key").to_string(),
390 // )
391 // .unwrap();
392
393 // Self::send_message(
394 // &MessageKind::Authentication(AuthenticationMessage::Request(
395 // AuthenticationRequest::AuthenticationToken(request),
396 // )),
397 // &mut connection,
398 // )
399 // .await?;
400
401 // loop {
402 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
403 // let payload = match self.next_payload(&mut connection).await {
404 // Ok(payload) => payload,
405 // Err(err) => {
406 // error!("Error while fetching next payload: {:?}", err);
407 // continue;
408 // }
409 // };
410
411 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
412 // AuthenticationResponse::AuthenticationToken(response),
413 // ))) = serde_json::from_slice(&payload)
414 // {
415 // return Ok(response.token);
416 // }
417 // }
418 // }
419
420 // /// Requests a new token for the given login.
421 // ///
422 // /// # Authorization
423 // /// This request can only be sent to the same instance from which
424 // /// it is issued.
425 // pub async fn extend_token(
426 // &self,
427 // secret_key: String,
428 // token: String,
429 // pool: &DaemonConnectionPool,
430 // ) -> Result<Option<String>, Error> {
431 // let mut connection = pool.0.get().await.unwrap();
432
433 // let request = InstanceAuthenticated::new(
434 // TokenExtensionRequest { token },
435 // pool.0.manager().target_instance.clone(),
436 // self.configuration.our_private_key.clone(),
437 // )
438 // .unwrap();
439
440 // Self::send_message(
441 // &MessageKind::Authentication(AuthenticationMessage::Request(
442 // AuthenticationRequest::TokenExtension(request),
443 // )),
444 // &mut connection,
445 // )
446 // .await?;
447
448 // while let Ok(payload) = self.next_payload(&mut connection).await {
449 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
450 // AuthenticationResponse::TokenExtension(response),
451 // ))) = serde_json::from_slice(&payload)
452 // {
453 // return Ok(response.new_token);
454 // }
455 // }
456
457 // todo!()
458 // }
459
460 // pub async fn user_display_name(
461 // &self,
462 // user: impl ToOwned<Owned = User>,
463 // pool: &DaemonConnectionPool,
464 // ) -> Result<Option<String>, Error> {
465 // let mut connection = pool.0.get().await.unwrap();
466
467 // let request = UserDisplayNameRequest {
468 // user: user.to_owned(),
469 // };
470
471 // Self::send_message(
472 // &MessageKind::User(UserMessage {
473 // instance: self
474 // .configuration
475 // .target_instance
476 // .as_ref()
477 // .unwrap_or(&pool.0.manager().target_instance)
478 // .clone(),
479 // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)),
480 // }),
481 // &mut connection,
482 // )
483 // .await?;
484
485 // while let Ok(payload) = self.next_payload(&mut connection).await {
486 // if let Ok(MessageKind::User(UserMessage {
487 // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)),
488 // ..
489 // })) = serde_json::from_slice(&payload)
490 // {
491 // return Ok(response.display_name);
492 // }
493 // }
494
495 // connection.close(None).await?;
496
497 // Err(SocketClosedError.into())
498 // }
499
500 // pub async fn user_display_image(
501 // &self,
502 // user: impl ToOwned<Owned = User>,
503 // pool: &DaemonConnectionPool,
504 // ) -> Result<Option<String>, Error> {
505 // let mut connection = pool.0.get().await.unwrap();
506
507 // let request = UserDisplayImageRequest {
508 // user: user.to_owned(),
509 // };
510
511 // Self::send_message(
512 // &MessageKind::User(UserMessage {
513 // instance: self
514 // .configuration
515 // .target_instance
516 // .as_ref()
517 // .unwrap_or(&pool.0.manager().target_instance)
518 // .clone(),
519 // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)),
520 // }),
521 // &mut connection,
522 // )
523 // .await?;
524
525 // while let Ok(payload) = self.next_payload(&mut connection).await {
526 // if let Ok(MessageKind::User(UserMessage {
527 // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)),
528 // ..
529 // })) = serde_json::from_slice(&payload)
530 // {
531 // return Ok(response.image_url);
532 // }
533 // }
534
535 // connection.close(None).await?;
536
537 // Err(SocketClosedError.into())
538 // }
539
540 // pub async fn user_bio(
541 // &self,
542 // user: impl ToOwned<Owned = User>,
543 // pool: &DaemonConnectionPool,
544 // ) -> Result<Option<String>, Error> {
545 // let mut connection = pool.0.get().await.unwrap();
546
547 // let request = UserBioRequest {
548 // user: user.to_owned(),
549 // };
550
551 // Self::send_message(
552 // &MessageKind::User(UserMessage {
553 // instance: self
554 // .configuration
555 // .target_instance
556 // .as_ref()
557 // .unwrap_or(&pool.0.manager().target_instance)
558 // .clone(),
559 // message: UserMessageKind::Request(UserMessageRequest::Bio(request)),
560 // }),
561 // &mut connection,
562 // )
563 // .await?;
564
565 // while let Ok(payload) = self.next_payload(&mut connection).await {
566 // if let Ok(MessageKind::User(UserMessage {
567 // message: UserMessageKind::Response(UserMessageResponse::Bio(response)),
568 // ..
569 // })) = serde_json::from_slice(&payload)
570 // {
571 // return Ok(response.bio);
572 // }
573 // }
574
575 // connection.close(None).await?;
576
577 // Err(SocketClosedError.into())
578 // }
579
580 // pub async fn user_repositories(
581 // &self,
582 // user: impl ToOwned<Owned = User>,
583 // pool: &DaemonConnectionPool,
584 // ) -> Result<Vec<RepositorySummary>, Error> {
585 // let mut connection = pool.0.get().await.unwrap();
586
587 // let request = UserRepositoriesRequest {
588 // user: user.to_owned(),
589 // };
590
591 // Self::send_message(
592 // &MessageKind::User(UserMessage {
593 // instance: self
594 // .configuration
595 // .target_instance
596 // .as_ref()
597 // .unwrap_or(&pool.0.manager().target_instance)
598 // .clone(),
599 // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)),
600 // }),
601 // &mut connection,
602 // )
603 // .await?;
604
605 // while let Ok(payload) = self.next_payload(&mut connection).await {
606 // if let Ok(MessageKind::User(UserMessage {
607 // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)),
608 // ..
609 // })) = serde_json::from_slice(&payload)
610 // {
611 // return Ok(response.repositories);
612 // }
613 // }
614
615 // connection.close(None).await?;
616
617 // Err(SocketClosedError.into())
618 // }
619
620 async fn connect_to(
621 instance: &Instance,
622 socket_addr: &Option<SocketAddr>,
623 ) -> Result<Socket, anyhow::Error> {
624 if let Some(addr) = socket_addr {
625 info!(
626 "Connecting to {}",
627 format!("ws://{}/.giterated/daemon/", addr)
628 );
629 let (websocket, _response) =
630 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
631 info!("Connection established with {}", addr);
632
633 Ok(websocket)
634 } else {
635 info!(
636 "Connecting to {}",
637 format!("wss://{}/.giterated/daemon/", instance.url)
638 );
639 let (websocket, _response) =
640 connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?;
641 info!("Connection established with {}", instance.url);
642
643 Ok(websocket)
644 }
645 }
646
647 async fn handle_handshake(
648 socket: &mut Socket,
649 _instance: &Instance,
650 ) -> Result<(), anyhow::Error> {
651 // Send handshake initiation
652 Self::send_message(
653 &InitiateHandshake {
654 version: Version::from_str("0.0.0").unwrap(),
655 },
656 socket,
657 )
658 .await?;
659
660 while let Some(message) = socket.next().await {
661 let message = match message {
662 Ok(message) => message,
663 Err(err) => {
664 error!("Error reading message: {:?}", err);
665 continue;
666 }
667 };
668
669 let payload = match message {
670 Message::Text(text) => text.into_bytes(),
671 Message::Binary(bytes) => bytes,
672 Message::Ping(_) => continue,
673 Message::Pong(_) => continue,
674 Message::Close(_) => {
675 socket.close(None).await?;
676
677 return Err(SocketClosedError.into());
678 }
679 _ => unreachable!(),
680 };
681
682 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
683
684 // We try deserializing the finalize response first as it is smaller and just as common
685 if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) {
686 if finalize.success {
687 info!("Handshake success!");
688
689 return Ok(());
690 } else {
691 panic!()
692 }
693 } else {
694 match serde_json::from_slice::<HandshakeResponse>(&payload) {
695 Ok(response) => {
696 // let message = if !validate_version(&response.version) {
697 // error!(
698 // "Version compatibility failure! Our Version: {}, Their Version: {}",
699 // version(),
700 // response.version
701 // );
702
703 // HandshakeFinalize { success: false }
704 // } else {
705 // info!("Connected with a compatible version");
706
707 // HandshakeFinalize { success: true }
708 // };
709
710 warn!("Version compatibility has been no-op'd");
711
712 let message = HandshakeFinalize { success: true };
713 // Send [`HandshakeFinalize`] to indicate if we're compatible or not
714 Self::send_message(&message, socket).await?;
715 }
716 Err(err) => {
717 error!("Error deserializing message: {:?}", err);
718 continue;
719 }
720 }
721 }
722 }
723
724 Ok(())
725 }
726
727 async fn send_message<T: Serialize>(
728 message: &T,
729 socket: &mut Socket,
730 ) -> Result<(), anyhow::Error> {
731 socket
732 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
733 .await?;
734
735 Ok(())
736 }
737
738 // async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
739 // while let Some(message) = socket.next().await {
740 // let message = message?;
741
742 // match message {
743 // Message::Text(text) => return Ok(text.into_bytes()),
744 // Message::Binary(bytes) => return Ok(bytes),
745 // Message::Ping(_) => continue,
746 // Message::Pong(_) => continue,
747 // Message::Close(_) => {
748 // socket.close(None).await?;
749
750 // return Err(SocketClosedError.into());
751 // }
752 // _ => unreachable!(),
753 // }
754 // }
755
756 // socket.close(None).await?;
757
758 // return Err(SocketClosedError.into());
759 // }
760 }
761
762 #[derive(Debug, thiserror::Error)]
763 #[error("closed socket")]
764 struct SocketClosedError;

old/main.rs_old

View file
@@ -0,0 +1,131 @@
1 use std::str::FromStr;
2
3 use giterated_api::DaemonConnectionPool;
4 use giterated_api::{
5 messages::user::{UserBioRequest, UserBioResponse},
6 model::{instance::Instance, user::User},
7 };
8
9 use serde::{Deserialize, Serialize};
10 // use jwt::SignWithKey;
11
12 #[macro_use]
13 extern crate tracing;
14
15 // #[tokio::main]
16 // async fn main() -> Result<(), anyhow::Error> {
17 // tracing_subscriber::fmt::init();
18
19 // let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap();
20
21 // let mut api = GiteratedApiBuilder::from_local("giterated.dev")
22 // .unwrap()
23 // .private_key(include_str!("example_keys/giterated.key"))
24 // .public_key(include_str!("example_keys/giterated.key.pub"))
25 // .build()
26 // .await
27 // .unwrap();
28
29 // info!("Lets try to make an account!");
30
31 // let response = api
32 // .register(
33 // String::from("ambee"),
34 // None,
35 // String::from("lolthisisinthecommithistory"),
36 // &pool,
37 // )
38 // .await;
39
40 // info!("Registration response: {:?}", response);
41
42 // let token = api
43 // .authentication_token(
44 // String::from("foobar"),
45 // String::from("ambee"),
46 // String::from("password"),
47 // &pool,
48 // )
49 // .await
50 // .unwrap();
51
52 // println!("Token: {}", token);
53
54 // let public_key = api.public_key().await;
55
56 // println!("Server public key:\n{}", public_key);
57 // let verification_key = DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap();
58 // let data: TokenData<UserTokenMetadata> = decode(
59 // &token,
60 // &verification_key,
61 // &Validation::new(Algorithm::RS256),
62 // )
63 // .unwrap();
64
65 // println!("The token was valid! Data:\n{:#?}", data.claims);
66
67 // info!("Lets extend that token!");
68
69 // let new_token = api
70 // .extend_token(String::from("foobar"), token.clone(), &pool)
71 // .await
72 // .unwrap();
73 // info!("New Token Returned:\n{:?}", new_token);
74
75 // info!("Try to create a repository? uwu");
76
77 // let repository = api
78 // .create_repository(
79 // new_token.unwrap(),
80 // String::from("super-repository"),
81 // None,
82 // RepositoryVisibility::Public,
83 // String::from("master"),
84 // User::from_str("ambee:giterated.dev").unwrap(),
85 // &pool,
86 // )
87 // .await
88 // .unwrap();
89
90 // assert!(repository);
91
92 // info!("Lets view our repository!");
93
94 // let view = api
95 // .repository_info(
96 // &token,
97 // Repository::from_str("ambee:giterated.dev/[email protected]").unwrap(),
98 // &pool,
99 // )
100 // .await
101 // .unwrap();
102
103 // info!("Repository Info:\n{:#?}", view);
104
105 // Ok(())
106 // }
107
108 #[tokio::main]
109 async fn main() -> Result<(), anyhow::Error> {
110 tracing_subscriber::fmt::init();
111 let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap();
112
113 let request = UserBioRequest {
114 user: User::from_str("ambee:giterated.dev").unwrap(),
115 };
116
117 let response = giterated_api::request::request_local(&request)
118 .execute_expect::<UserBioResponse>(&pool)
119 .await?;
120
121 info!("Response: {:?}", response);
122
123 Ok(())
124 }
125
126 #[derive(Debug, Serialize, Deserialize)]
127 struct UserTokenMetadata {
128 user: User,
129 generated_for: Instance,
130 exp: u64,
131 }

old/request.rs_old

View file
@@ -0,0 +1,82 @@
1 use std::fmt::Debug;
2
3 use anyhow::Error;
4 use futures_util::{SinkExt, StreamExt};
5 use giterated_models::{
6 messages::{error::ConnectionError, MessageTarget},
7 model::{
8 authenticated::{Authenticated, AuthenticationSourceProvider},
9 instance::Instance,
10 },
11 };
12 use serde::{de::DeserializeOwned, Serialize};
13 use tokio_tungstenite::tungstenite::Message;
14
15 use crate::DaemonConnectionPool;
16
17 pub fn request_local<T: Serialize + DeserializeOwned + Debug + Send + Sync + MessageTarget>(
18 request: T,
19 ) -> PreparedRequest<T> {
20 PreparedRequest {
21 request: Authenticated::new_empty(request),
22 instance: None,
23 }
24 }
25
26 pub fn request<T: Serialize + DeserializeOwned + Debug + Send + Sync + MessageTarget>(
27 instance: Instance,
28 request: T,
29 ) -> PreparedRequest<T> {
30 PreparedRequest {
31 request: Authenticated::new_empty(request),
32 instance: Some(instance),
33 }
34 }
35
36 pub struct PreparedRequest<T: Serialize + DeserializeOwned + Send + Sync> {
37 instance: Option<Instance>,
38 request: Authenticated<T>,
39 }
40
41 impl<T: Serialize + DeserializeOwned + Debug + Send + Sync + MessageTarget> PreparedRequest<T> {
42 pub fn authenticate<P: AuthenticationSourceProvider + 'static + Send + Sync>(mut self, source: P) -> Self {
43 self.request.append_authentication(source);
44
45 self
46 }
47
48 pub async fn execute_expect<R: DeserializeOwned>(
49 self,
50 pool: &DaemonConnectionPool,
51 ) -> Result<R, Error> {
52 let mut socket = pool.0.get().await.unwrap();
53
54 let payload = serde_json::to_vec(&self.request.into_payload()).unwrap();
55
56 socket.send(Message::Binary(payload)).await?;
57
58 while let Some(message) = socket.next().await {
59 let payload = match message? {
60 Message::Binary(payload) => payload,
61 _ => {
62 continue;
63 }
64 };
65
66 let as_target = serde_json::from_slice::<R>(&payload).map_err(|e| Error::from(e));
67
68 if as_target.is_err() {
69 // Maybe we got an error payload?
70 if let Ok(error_payload) = serde_json::from_slice::<ConnectionError>(&payload) {
71 return Err(error_payload.into());
72 }
73 } else {
74 // We did not get an error payload, forward the deserialization error from the
75 // expected type
76 return as_target;
77 }
78 }
79
80 panic!()
81 }
82 }

src/daemon_backend.rs

View file
@@ -0,0 +1,102 @@
1 use std::fmt::Debug;
2
3 use deadpool::managed::Pool;
4 use futures_util::{SinkExt, StreamExt};
5 use giterated_models::{
6 error::OperationError,
7 model::{authenticated::Authenticated, MessageTarget},
8 operation::{
9 GiteratedMessage, GiteratedObject, GiteratedOperation, Object, ObjectBackend,
10 ObjectRequest, ObjectRequestError, ObjectResponse,
11 },
12 };
13 use serde::{de::DeserializeOwned, Deserialize, Serialize};
14 use tokio_tungstenite::tungstenite::Message;
15
16 use crate::{pool::GiteratedConnectionPool, DaemonConnectionPool, Socket};
17
18 #[async_trait::async_trait]
19 impl ObjectBackend for DaemonConnectionPool {
20 async fn object_operation<O: GiteratedObject + Debug, D: GiteratedOperation<O> + Debug>(
21 &self,
22 object: O,
23 operation: D,
24 ) -> Result<D::Success, OperationError<D::Failure>> {
25 let message = GiteratedMessage {
26 object,
27 operation: operation.operation_name().to_string(),
28 payload: operation,
29 };
30
31 let mut connection = self
32 .0
33 .get()
34 .await
35 .map_err(|e| OperationError::Internal(e.to_string()))?;
36
37 let authenticated = Authenticated::new(message);
38
39 send_expect(&mut connection, authenticated).await
40 }
41
42 async fn get_object<O: GiteratedObject + Debug>(
43 &self,
44 object_str: &str,
45 ) -> Result<Object<O, Self>, OperationError<ObjectRequestError>> {
46 let operation = ObjectRequest(object_str.to_string());
47 let message = GiteratedMessage {
48 object: self.0.manager().target_instance.clone(),
49 operation: operation.operation_name().to_string(),
50 payload: operation,
51 };
52
53 let mut connection = self
54 .0
55 .get()
56 .await
57 .map_err(|e| OperationError::Internal(e.to_string()))?;
58
59 let authenticated = Authenticated::new(message);
60
61 let object_raw: ObjectResponse = send_expect(&mut connection, authenticated).await?;
62
63 Ok(unsafe {
64 Object::new_unchecked(
65 serde_json::from_slice(&object_raw.0)
66 .map_err(|e| OperationError::Internal(e.to_string()))?,
67 self.clone(),
68 )
69 })
70 }
71 }
72
73 async fn send_expect<
74 O: GiteratedObject,
75 D: GiteratedOperation<O>,
76 B: DeserializeOwned,
77 R: DeserializeOwned,
78 >(
79 socket: &mut Socket,
80 message: Authenticated<O, D>,
81 ) -> Result<R, OperationError<B>> {
82 let payload = serde_json::to_vec(&message.into_payload()).unwrap();
83
84 socket
85 .send(Message::Binary(payload))
86 .await
87 .map_err(|e| OperationError::Internal(e.to_string()))?;
88
89 while let Some(message) = socket.next().await {
90 let payload = match message.map_err(|e| OperationError::Internal(e.to_string()))? {
91 Message::Binary(payload) => payload,
92 _ => {
93 continue;
94 }
95 };
96
97 let as_target = serde_json::from_slice::<R>(&payload)
98 .map_err(|e| OperationError::Internal(e.to_string()))?;
99 }
100
101 panic!()
102 }

src/handshake.rs

View file
@@ -0,0 +1,108 @@
1 use std::str::FromStr;
2
3 use futures_util::StreamExt;
4 use giterated_models::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake};
5 use semver::Version;
6 use serde::Serialize;
7 use tokio_tungstenite::tungstenite::Message;
8
9 use crate::Socket;
10
11 pub struct GiteratedConnectionHandshaker(Socket);
12
13 impl GiteratedConnectionHandshaker {
14 pub fn new(socket: Socket) -> Self {
15 Self(socket)
16 }
17
18 pub async fn handshake(self) -> Result<Socket, anyhow::Error> {
19 let mut socket = self.0;
20
21 // Send handshake initiation
22 Self::send_raw_message(
23 &mut socket,
24 &InitiateHandshake {
25 version: Version::from_str("0.0.0").unwrap(),
26 },
27 )
28 .await?;
29
30 while let Some(message) = socket.next().await {
31 let message = match message {
32 Ok(message) => message,
33 Err(err) => {
34 error!("Error reading message: {:?}", err);
35 continue;
36 }
37 };
38
39 let payload = match message {
40 Message::Text(text) => text.into_bytes(),
41 Message::Binary(bytes) => bytes,
42 Message::Ping(_) => continue,
43 Message::Pong(_) => continue,
44 Message::Close(_) => {
45 socket.close(None).await?;
46
47 return Err(SocketClosedError.into());
48 }
49 _ => unreachable!(),
50 };
51
52 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
53
54 // We try deserializing the finalize response first as it is smaller and just as common
55 if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) {
56 if finalize.success {
57 info!("Handshake success!");
58
59 return Ok(socket);
60 } else {
61 socket.close(None).await?;
62 return Err(SocketClosedError.into());
63 }
64 } else {
65 match serde_json::from_slice::<HandshakeResponse>(&payload) {
66 Ok(_response) => {
67 // let message = if !validate_version(&response.version) {
68 // error!(
69 // "Version compatibility failure! Our Version: {}, Their Version: {}",
70 // version(),
71 // response.version
72 // );
73
74 // HandshakeFinalize { success: false }
75 // } else {
76 // info!("Connected with a compatible version");
77
78 // HandshakeFinalize { success: true }
79 // };
80
81 warn!("Version compatibility has been no-op'd");
82
83 let message = HandshakeFinalize { success: true };
84 // Send [`HandshakeFinalize`] to indicate if we're compatible or not
85 Self::send_raw_message(&mut socket, &message).await?;
86 }
87 Err(err) => {
88 error!("Error deserializing message: {:?}", err);
89 continue;
90 }
91 }
92 }
93 }
94
95 Ok(socket)
96 }
97
98 async fn send_raw_message<T: Serialize>(
99 _socket: &Socket,
100 _message: &T,
101 ) -> Result<(), tokio_tungstenite::tungstenite::Error> {
102 todo!()
103 }
104 }
105
106 #[derive(Debug, thiserror::Error)]
107 #[error("closed socket")]
108 struct SocketClosedError;

src/lib.rs

View file
@@ -1,165 +1,26 @@
1 pub mod request;
1 use std::{fmt::Debug, net::SocketAddr};
2 2
3 pub mod model {
4 pub use giterated_models::model::*;
5 }
6
7 pub mod messages {
8 pub use giterated_models::messages::*;
9 }
10
11 use std::net::SocketAddr;
12 use std::str::FromStr;
13 use std::sync::Arc;
14 use std::{convert::Infallible, ops::Deref};
15
16 use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult};
17
18 use futures_util::{SinkExt, StreamExt};
19
20 use giterated_models::{
21 messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake},
22 model::instance::Instance,
23 };
24 use semver::Version;
25 use serde::Serialize;
3 use deadpool::managed::{BuildError, Pool};
4 use giterated_models::model::instance::Instance;
5 use pool::GiteratedConnectionPool;
26 6 use tokio::net::TcpStream;
27 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
7 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
8
9 pub mod daemon_backend;
10 mod handshake;
11 mod pool;
28 12
29 13 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
30 14
31 15 #[macro_use]
32 16 extern crate tracing;
33 17
34 pub struct GiteratedApiBuilder {
35 our_instance: Instance,
36 our_private_key: Option<String>,
37 our_public_key: Option<String>,
38 target_instance: Option<Instance>,
39 }
40
41 pub trait AsInstance {
42 type Error: std::error::Error + Send + Sync + 'static;
43
44 fn into_instance(self) -> Result<Instance, Self::Error>;
45 }
46
47 impl AsInstance for &str {
48 type Error = <Instance as FromStr>::Err;
49
50 fn into_instance(self) -> Result<Instance, Self::Error> {
51 Instance::from_str(self)
52 }
53 }
54
55 impl AsInstance for Instance {
56 type Error = Infallible;
57
58 fn into_instance(self) -> Result<Instance, Self::Error> {
59 Ok(self)
60 }
61 }
62
63 impl GiteratedApiBuilder {
64 pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> {
65 Ok(Self {
66 our_instance: instance.into_instance()?,
67 our_private_key: None,
68 our_public_key: None,
69 target_instance: None,
70 })
71 }
72
73 pub fn from_local_for_other(
74 instance: impl AsInstance,
75 other: impl AsInstance,
76 ) -> Result<Self, anyhow::Error> {
77 Ok(Self {
78 our_instance: instance.into_instance()?,
79 our_private_key: None,
80 our_public_key: None,
81 target_instance: Some(other.into_instance()?),
82 })
83 }
84
85 pub fn private_key(&mut self, key: impl ToString) -> &mut Self {
86 self.our_private_key = Some(key.to_string());
87
88 self
89 }
90
91 pub fn public_key(&mut self, key: impl ToString) -> &mut Self {
92 self.our_public_key = Some(key.to_string());
93
94 self
95 }
96
97 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
98 Ok(GiteratedApi {
99 configuration: Arc::new(GiteratedApiConfiguration {
100 our_private_key: self.our_private_key.take().unwrap(),
101 our_public_key: self.our_public_key.take().unwrap(),
102 target_instance: self.target_instance.take(),
103 // todo
104 target_public_key: None,
105 }),
106 })
107 }
108 }
109
110 pub struct GiteratedConnectionPool {
111 target_instance: Instance,
112 socket_addr: Option<SocketAddr>,
113 }
114
115 #[async_trait::async_trait]
116 impl Manager for GiteratedConnectionPool {
117 type Type = Socket;
118 type Error = anyhow::Error;
119
120 async fn create(&self) -> Result<Socket, Self::Error> {
121 info!("Creating new Daemon connection");
122 let mut connection =
123 GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?;
124
125 // Handshake first!
126 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
127
128 Ok(connection)
129 }
130
131 async fn recycle(&self, socket: &mut Socket) -> RecycleResult<Self::Error> {
132 match socket.send(Message::Ping(vec![])).await {
133 Ok(_) => Ok(()),
134 Err(err) => {
135 info!("Socket died!");
136 Err(RecycleError::Backend(err.into()))
137 }
138 }
139 }
140 }
141
142 pub struct GiteratedApiConfiguration {
143 pub our_private_key: String,
144 pub our_public_key: String,
145 pub target_instance: Option<Instance>,
146 pub target_public_key: Option<String>,
147 }
148
149 18 #[derive(Clone)]
150 19 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
151 20
152 impl DaemonConnectionPool {
153 pub fn instance(&self) -> &Instance {
154 &self.0.manager().target_instance
155 }
156 }
157
158 impl Deref for DaemonConnectionPool {
159 type Target = Pool<GiteratedConnectionPool>;
160
161 fn deref(&self) -> &Self::Target {
162 &self.0
21 impl Debug for DaemonConnectionPool {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 f.debug_tuple("DaemonConnectionPool").finish()
163 24 }
164 25 }
165 26
@@ -190,575 +51,3 @@ impl DaemonConnectionPool {
190 51 ))
191 52 }
192 53 }
193
194 // Keep this private
195 // seriousyl.
196 #[derive(Clone)]
197 struct GiteratedApi {
198 configuration: Arc<GiteratedApiConfiguration>,
199 }
200
201 impl GiteratedApi {
202 // pub async fn public_key(&self) -> String {
203 // if let Some(public_key) = &self.configuration.target_public_key {
204 // public_key.clone()
205 // } else {
206 // assert!(self.configuration.target_instance.is_none());
207
208 // self.configuration.our_public_key.clone()
209 // }
210 // }
211
212 // /// Register on an [`Instance`].
213 // ///
214 // /// # Authorization
215 // /// - Must be made by the same instance its being sent to
216 // pub async fn register(
217 // &self,
218 // username: String,
219 // email: Option<String>,
220 // password: String,
221 // pool: &DaemonConnectionPool,
222 // ) -> Result<RegisterAccountResponse, anyhow::Error> {
223 // let mut connection = pool.0.get().await.unwrap();
224
225 // let message = InstanceAuthenticated::new(
226 // RegisterAccountRequest {
227 // username,
228 // email,
229 // password,
230 // },
231 // pool.0.manager().target_instance.clone(),
232 // self.configuration.our_private_key.clone(),
233 // )
234 // .unwrap();
235
236 // Self::send_message(
237 // &MessageKind::Authentication(AuthenticationMessage::Request(
238 // AuthenticationRequest::RegisterAccount(message),
239 // )),
240 // &mut connection,
241 // )
242 // .await?;
243
244 // while let Ok(payload) = self.next_payload(&mut connection).await {
245 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
246 // AuthenticationResponse::RegisterAccount(response),
247 // ))) = serde_json::from_slice(&payload)
248 // {
249 // return Ok(response);
250 // }
251 // }
252
253 // connection.close(None).await?;
254
255 // Err(SocketClosedError.into())
256 // }
257
258 // /// Create repository on the target instance.
259 // pub async fn create_repository(
260 // &self,
261 // user_token: String,
262 // name: String,
263 // description: Option<String>,
264 // visibility: RepositoryVisibility,
265 // default_branch: String,
266 // owner: User,
267 // pool: &DaemonConnectionPool,
268 // ) -> Result<bool, anyhow::Error> {
269 // let mut connection = pool.0.get().await.unwrap();
270
271 // let target_respository = Repository {
272 // owner: owner.clone(),
273 // name: name.clone(),
274 // instance: self
275 // .configuration
276 // .target_instance
277 // .as_ref()
278 // .unwrap_or(&pool.0.manager().target_instance)
279 // .clone(),
280 // };
281
282 // let request = CreateRepositoryRequest {
283 // name,
284 // description,
285 // visibility,
286 // default_branch,
287 // owner,
288 // };
289
290 // let message = UnvalidatedUserAuthenticated::new(
291 // request,
292 // user_token,
293 // self.configuration.our_private_key.clone(),
294 // )
295 // .unwrap();
296
297 // Self::send_message(
298 // &MessageKind::Repository(RepositoryMessage {
299 // target: target_respository,
300 // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
301 // message,
302 // )),
303 // }),
304 // &mut connection,
305 // )
306 // .await?;
307
308 // while let Ok(payload) = self.next_payload(&mut connection).await {
309 // if let Ok(MessageKind::Repository(RepositoryMessage {
310 // command:
311 // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
312 // ..
313 // })) = serde_json::from_slice(&payload)
314 // {
315 // return Ok(true);
316 // }
317 // }
318
319 // connection.close(None).await?;
320
321 // Err(SocketClosedError.into())
322 // }
323
324 // pub async fn repository_info(
325 // &self,
326 // token: &str,
327 // request: &RepositoryInfoRequest,
328 // pool: &DaemonConnectionPool,
329 // ) -> Result<RepositoryView, Error> {
330 // let mut connection = pool.0.get().await.unwrap();
331
332 // let message = UnvalidatedUserAuthenticated::new(
333 // request.clone(),
334 // token.to_string(),
335 // self.configuration.our_private_key.clone(),
336 // )
337 // .unwrap();
338
339 // Self::send_message(
340 // &MessageKind::Repository(RepositoryMessage {
341 // target: request.repository.clone(),
342 // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
343 // }),
344 // &mut connection,
345 // )
346 // .await?;
347
348 // loop {
349 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
350 // let payload = match self.next_payload(&mut connection).await {
351 // Ok(payload) => payload,
352 // Err(err) => {
353 // error!("Error while fetching next payload: {:?}", err);
354 // continue;
355 // }
356 // };
357
358 // if let Ok(MessageKind::Repository(RepositoryMessage {
359 // command:
360 // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
361 // ..
362 // })) = serde_json::from_slice(&payload)
363 // {
364 // return Ok(response);
365 // }
366 // }
367 // }
368
369 // /// Requests an authentication token for the given login.
370 // ///
371 // /// # Authorization
372 // /// This request can only be sent to the same instance from which
373 // /// it is issued.
374 // pub async fn authentication_token(
375 // &self,
376 // secret_key: String,
377 // username: String,
378 // password: String,
379 // pool: &DaemonConnectionPool,
380 // ) -> Result<String, Error> {
381 // let mut connection = pool.0.get().await.unwrap();
382
383 // let request = InstanceAuthenticated::new(
384 // AuthenticationTokenRequest {
385 // username,
386 // password,
387 // },
388 // pool.0.manager().target_instance.clone(),
389 // include_str!("example_keys/giterated.key").to_string(),
390 // )
391 // .unwrap();
392
393 // Self::send_message(
394 // &MessageKind::Authentication(AuthenticationMessage::Request(
395 // AuthenticationRequest::AuthenticationToken(request),
396 // )),
397 // &mut connection,
398 // )
399 // .await?;
400
401 // loop {
402 // // while let Ok(payload) = Self::next_payload(&mut socket).await {
403 // let payload = match self.next_payload(&mut connection).await {
404 // Ok(payload) => payload,
405 // Err(err) => {
406 // error!("Error while fetching next payload: {:?}", err);
407 // continue;
408 // }
409 // };
410
411 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
412 // AuthenticationResponse::AuthenticationToken(response),
413 // ))) = serde_json::from_slice(&payload)
414 // {
415 // return Ok(response.token);
416 // }
417 // }
418 // }
419
420 // /// Requests a new token for the given login.
421 // ///
422 // /// # Authorization
423 // /// This request can only be sent to the same instance from which
424 // /// it is issued.
425 // pub async fn extend_token(
426 // &self,
427 // secret_key: String,
428 // token: String,
429 // pool: &DaemonConnectionPool,
430 // ) -> Result<Option<String>, Error> {
431 // let mut connection = pool.0.get().await.unwrap();
432
433 // let request = InstanceAuthenticated::new(
434 // TokenExtensionRequest { token },
435 // pool.0.manager().target_instance.clone(),
436 // self.configuration.our_private_key.clone(),
437 // )
438 // .unwrap();
439
440 // Self::send_message(
441 // &MessageKind::Authentication(AuthenticationMessage::Request(
442 // AuthenticationRequest::TokenExtension(request),
443 // )),
444 // &mut connection,
445 // )
446 // .await?;
447
448 // while let Ok(payload) = self.next_payload(&mut connection).await {
449 // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
450 // AuthenticationResponse::TokenExtension(response),
451 // ))) = serde_json::from_slice(&payload)
452 // {
453 // return Ok(response.new_token);
454 // }
455 // }
456
457 // todo!()
458 // }
459
460 // pub async fn user_display_name(
461 // &self,
462 // user: impl ToOwned<Owned = User>,
463 // pool: &DaemonConnectionPool,
464 // ) -> Result<Option<String>, Error> {
465 // let mut connection = pool.0.get().await.unwrap();
466
467 // let request = UserDisplayNameRequest {
468 // user: user.to_owned(),
469 // };
470
471 // Self::send_message(
472 // &MessageKind::User(UserMessage {
473 // instance: self
474 // .configuration
475 // .target_instance
476 // .as_ref()
477 // .unwrap_or(&pool.0.manager().target_instance)
478 // .clone(),
479 // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)),
480 // }),
481 // &mut connection,
482 // )
483 // .await?;
484
485 // while let Ok(payload) = self.next_payload(&mut connection).await {
486 // if let Ok(MessageKind::User(UserMessage {
487 // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)),
488 // ..
489 // })) = serde_json::from_slice(&payload)
490 // {
491 // return Ok(response.display_name);
492 // }
493 // }
494
495 // connection.close(None).await?;
496
497 // Err(SocketClosedError.into())
498 // }
499
500 // pub async fn user_display_image(
501 // &self,
502 // user: impl ToOwned<Owned = User>,
503 // pool: &DaemonConnectionPool,
504 // ) -> Result<Option<String>, Error> {
505 // let mut connection = pool.0.get().await.unwrap();
506
507 // let request = UserDisplayImageRequest {
508 // user: user.to_owned(),
509 // };
510
511 // Self::send_message(
512 // &MessageKind::User(UserMessage {
513 // instance: self
514 // .configuration
515 // .target_instance
516 // .as_ref()
517 // .unwrap_or(&pool.0.manager().target_instance)
518 // .clone(),
519 // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)),
520 // }),
521 // &mut connection,
522 // )
523 // .await?;
524
525 // while let Ok(payload) = self.next_payload(&mut connection).await {
526 // if let Ok(MessageKind::User(UserMessage {
527 // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)),
528 // ..
529 // })) = serde_json::from_slice(&payload)
530 // {
531 // return Ok(response.image_url);
532 // }
533 // }
534
535 // connection.close(None).await?;
536
537 // Err(SocketClosedError.into())
538 // }
539
540 // pub async fn user_bio(
541 // &self,
542 // user: impl ToOwned<Owned = User>,
543 // pool: &DaemonConnectionPool,
544 // ) -> Result<Option<String>, Error> {
545 // let mut connection = pool.0.get().await.unwrap();
546
547 // let request = UserBioRequest {
548 // user: user.to_owned(),
549 // };
550
551 // Self::send_message(
552 // &MessageKind::User(UserMessage {
553 // instance: self
554 // .configuration
555 // .target_instance
556 // .as_ref()
557 // .unwrap_or(&pool.0.manager().target_instance)
558 // .clone(),
559 // message: UserMessageKind::Request(UserMessageRequest::Bio(request)),
560 // }),
561 // &mut connection,
562 // )
563 // .await?;
564
565 // while let Ok(payload) = self.next_payload(&mut connection).await {
566 // if let Ok(MessageKind::User(UserMessage {
567 // message: UserMessageKind::Response(UserMessageResponse::Bio(response)),
568 // ..
569 // })) = serde_json::from_slice(&payload)
570 // {
571 // return Ok(response.bio);
572 // }
573 // }
574
575 // connection.close(None).await?;
576
577 // Err(SocketClosedError.into())
578 // }
579
580 // pub async fn user_repositories(
581 // &self,
582 // user: impl ToOwned<Owned = User>,
583 // pool: &DaemonConnectionPool,
584 // ) -> Result<Vec<RepositorySummary>, Error> {
585 // let mut connection = pool.0.get().await.unwrap();
586
587 // let request = UserRepositoriesRequest {
588 // user: user.to_owned(),
589 // };
590
591 // Self::send_message(
592 // &MessageKind::User(UserMessage {
593 // instance: self
594 // .configuration
595 // .target_instance
596 // .as_ref()
597 // .unwrap_or(&pool.0.manager().target_instance)
598 // .clone(),
599 // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)),
600 // }),
601 // &mut connection,
602 // )
603 // .await?;
604
605 // while let Ok(payload) = self.next_payload(&mut connection).await {
606 // if let Ok(MessageKind::User(UserMessage {
607 // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)),
608 // ..
609 // })) = serde_json::from_slice(&payload)
610 // {
611 // return Ok(response.repositories);
612 // }
613 // }
614
615 // connection.close(None).await?;
616
617 // Err(SocketClosedError.into())
618 // }
619
620 async fn connect_to(
621 instance: &Instance,
622 socket_addr: &Option<SocketAddr>,
623 ) -> Result<Socket, anyhow::Error> {
624 if let Some(addr) = socket_addr {
625 info!(
626 "Connecting to {}",
627 format!("ws://{}/.giterated/daemon/", addr)
628 );
629 let (websocket, _response) =
630 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
631 info!("Connection established with {}", addr);
632
633 Ok(websocket)
634 } else {
635 info!(
636 "Connecting to {}",
637 format!("wss://{}/.giterated/daemon/", instance.url)
638 );
639 let (websocket, _response) =
640 connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?;
641 info!("Connection established with {}", instance.url);
642
643 Ok(websocket)
644 }
645 }
646
647 async fn handle_handshake(
648 socket: &mut Socket,
649 _instance: &Instance,
650 ) -> Result<(), anyhow::Error> {
651 // Send handshake initiation
652 Self::send_message(
653 &InitiateHandshake {
654 version: Version::from_str("0.0.0").unwrap(),
655 },
656 socket,
657 )
658 .await?;
659
660 while let Some(message) = socket.next().await {
661 let message = match message {
662 Ok(message) => message,
663 Err(err) => {
664 error!("Error reading message: {:?}", err);
665 continue;
666 }
667 };
668
669 let payload = match message {
670 Message::Text(text) => text.into_bytes(),
671 Message::Binary(bytes) => bytes,
672 Message::Ping(_) => continue,
673 Message::Pong(_) => continue,
674 Message::Close(_) => {
675 socket.close(None).await?;
676
677 return Err(SocketClosedError.into());
678 }
679 _ => unreachable!(),
680 };
681
682 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
683
684 // We try deserializing the finalize response first as it is smaller and just as common
685 if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) {
686 if finalize.success {
687 info!("Handshake success!");
688
689 return Ok(());
690 } else {
691 panic!()
692 }
693 } else {
694 match serde_json::from_slice::<HandshakeResponse>(&payload) {
695 Ok(response) => {
696 // let message = if !validate_version(&response.version) {
697 // error!(
698 // "Version compatibility failure! Our Version: {}, Their Version: {}",
699 // version(),
700 // response.version
701 // );
702
703 // HandshakeFinalize { success: false }
704 // } else {
705 // info!("Connected with a compatible version");
706
707 // HandshakeFinalize { success: true }
708 // };
709
710 warn!("Version compatibility has been no-op'd");
711
712 let message = HandshakeFinalize { success: true };
713 // Send [`HandshakeFinalize`] to indicate if we're compatible or not
714 Self::send_message(&message, socket).await?;
715 }
716 Err(err) => {
717 error!("Error deserializing message: {:?}", err);
718 continue;
719 }
720 }
721 }
722 }
723
724 Ok(())
725 }
726
727 async fn send_message<T: Serialize>(
728 message: &T,
729 socket: &mut Socket,
730 ) -> Result<(), anyhow::Error> {
731 socket
732 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
733 .await?;
734
735 Ok(())
736 }
737
738 // async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
739 // while let Some(message) = socket.next().await {
740 // let message = message?;
741
742 // match message {
743 // Message::Text(text) => return Ok(text.into_bytes()),
744 // Message::Binary(bytes) => return Ok(bytes),
745 // Message::Ping(_) => continue,
746 // Message::Pong(_) => continue,
747 // Message::Close(_) => {
748 // socket.close(None).await?;
749
750 // return Err(SocketClosedError.into());
751 // }
752 // _ => unreachable!(),
753 // }
754 // }
755
756 // socket.close(None).await?;
757
758 // return Err(SocketClosedError.into());
759 // }
760 }
761
762 #[derive(Debug, thiserror::Error)]
763 #[error("closed socket")]
764 struct SocketClosedError;

src/main.rs

View file
@@ -1,131 +1,24 @@
1 1 use std::str::FromStr;
2 2
3 3 use giterated_api::DaemonConnectionPool;
4 use giterated_api::{
5 messages::user::{UserBioRequest, UserBioResponse},
4 use giterated_models::{
6 5 model::{instance::Instance, user::User},
6 operation::ObjectBackend,
7 values::user::DisplayName,
7 8 };
8 9
9 use serde::{Deserialize, Serialize};
10 // use jwt::SignWithKey;
11
12 #[macro_use]
13 extern crate tracing;
14
15 // #[tokio::main]
16 // async fn main() -> Result<(), anyhow::Error> {
17 // tracing_subscriber::fmt::init();
18
19 // let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap();
20
21 // let mut api = GiteratedApiBuilder::from_local("giterated.dev")
22 // .unwrap()
23 // .private_key(include_str!("example_keys/giterated.key"))
24 // .public_key(include_str!("example_keys/giterated.key.pub"))
25 // .build()
26 // .await
27 // .unwrap();
28
29 // info!("Lets try to make an account!");
30
31 // let response = api
32 // .register(
33 // String::from("ambee"),
34 // None,
35 // String::from("lolthisisinthecommithistory"),
36 // &pool,
37 // )
38 // .await;
39
40 // info!("Registration response: {:?}", response);
41
42 // let token = api
43 // .authentication_token(
44 // String::from("foobar"),
45 // String::from("ambee"),
46 // String::from("password"),
47 // &pool,
48 // )
49 // .await
50 // .unwrap();
51
52 // println!("Token: {}", token);
53
54 // let public_key = api.public_key().await;
55
56 // println!("Server public key:\n{}", public_key);
57 // let verification_key = DecodingKey::from_rsa_pem(public_key.as_bytes()).unwrap();
58 // let data: TokenData<UserTokenMetadata> = decode(
59 // &token,
60 // &verification_key,
61 // &Validation::new(Algorithm::RS256),
62 // )
63 // .unwrap();
64
65 // println!("The token was valid! Data:\n{:#?}", data.claims);
66
67 // info!("Lets extend that token!");
68
69 // let new_token = api
70 // .extend_token(String::from("foobar"), token.clone(), &pool)
71 // .await
72 // .unwrap();
73 // info!("New Token Returned:\n{:?}", new_token);
74
75 // info!("Try to create a repository? uwu");
76
77 // let repository = api
78 // .create_repository(
79 // new_token.unwrap(),
80 // String::from("super-repository"),
81 // None,
82 // RepositoryVisibility::Public,
83 // String::from("master"),
84 // User::from_str("ambee:giterated.dev").unwrap(),
85 // &pool,
86 // )
87 // .await
88 // .unwrap();
89
90 // assert!(repository);
91
92 // info!("Lets view our repository!");
93
94 // let view = api
95 // .repository_info(
96 // &token,
97 // Repository::from_str("ambee:giterated.dev/[email protected]").unwrap(),
98 // &pool,
99 // )
100 // .await
101 // .unwrap();
102
103 // info!("Repository Info:\n{:#?}", view);
104
105 // Ok(())
106 // }
107
108 10 #[tokio::main]
109 11 async fn main() -> Result<(), anyhow::Error> {
110 12 tracing_subscriber::fmt::init();
111 13 let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap();
112 14
113 let request = UserBioRequest {
114 user: User::from_str("ambee:giterated.dev").unwrap(),
115 };
15 let mut user = pool.get_object::<User>("ambee:giterated.dev").await?;
116 16
117 let response = giterated_api::request::request_local(&request)
118 .execute_expect::<UserBioResponse>(&pool)
119 .await?;
17 let display_name = user.get::<DisplayName>().await?;
120 18
121 info!("Response: {:?}", response);
19 let repositories = user
20 .repositories(&Instance::from_str("giterated.dev").unwrap())
21 .await?;
122 22
123 23 Ok(())
124 24 }
125
126 #[derive(Debug, Serialize, Deserialize)]
127 struct UserTokenMetadata {
128 user: User,
129 generated_for: Instance,
130 exp: u64,
131 }

src/pool.rs

View file
@@ -0,0 +1,67 @@
1 use std::net::SocketAddr;
2
3 use deadpool::managed::{Manager, RecycleError, RecycleResult};
4 use futures_util::SinkExt;
5 use giterated_models::model::instance::Instance;
6 use tokio_tungstenite::{connect_async, tungstenite::Message};
7
8 use crate::{handshake::GiteratedConnectionHandshaker, Socket};
9
10 pub struct GiteratedConnectionPool {
11 pub target_instance: Instance,
12 pub socket_addr: Option<SocketAddr>,
13 }
14
15 #[async_trait::async_trait]
16 impl Manager for GiteratedConnectionPool {
17 type Type = Socket;
18 type Error = anyhow::Error;
19
20 async fn create(&self) -> Result<Socket, Self::Error> {
21 info!("Creating new Daemon connection");
22 let connection = connect_to(&self.target_instance, &self.socket_addr).await?;
23
24 // Handshake first!
25 let connection = GiteratedConnectionHandshaker::new(connection)
26 .handshake()
27 .await?;
28 Ok(connection)
29 }
30
31 async fn recycle(&self, socket: &mut Socket) -> RecycleResult<Self::Error> {
32 match socket.send(Message::Ping(vec![])).await {
33 Ok(_) => Ok(()),
34 Err(err) => {
35 info!("Socket died!");
36 Err(RecycleError::Backend(err.into()))
37 }
38 }
39 }
40 }
41
42 async fn connect_to(
43 instance: &Instance,
44 socket_addr: &Option<SocketAddr>,
45 ) -> Result<Socket, anyhow::Error> {
46 if let Some(addr) = socket_addr {
47 info!(
48 "Connecting to {}",
49 format!("ws://{}/.giterated/daemon/", addr)
50 );
51 let (websocket, _response) =
52 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
53 info!("Connection established with {}", addr);
54
55 Ok(websocket)
56 } else {
57 info!(
58 "Connecting to {}",
59 format!("wss://{}/.giterated/daemon/", instance.url)
60 );
61 let (websocket, _response) =
62 connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?;
63 info!("Connection established with {}", instance.url);
64
65 Ok(websocket)
66 }
67 }