JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

Re-export all models

Type: Fix

emilia - ⁨2⁩ years ago

parent: tbd commit: ⁨de795a7

⁨src/lib.rs⁩ - ⁨17134⁩ bytes
Raw
1 pub use giterated_daemon::model::*;
2
3 use std::convert::Infallible;
4 use std::str::FromStr;
5 use std::sync::Arc;
6
7 use anyhow::Error;
8 use deadpool::managed::{BuildError, Manager, Pool, RecycleResult};
9 use futures_util::{SinkExt, StreamExt};
10 use giterated_daemon::messages::authentication::{RegisterAccountRequest, RegisterAccountResponse};
11 use giterated_daemon::messages::UnvalidatedUserAuthenticated;
12 use giterated_daemon::model::repository::{Repository, RepositoryView, RepositoryVisibility};
13 use giterated_daemon::{
14 handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake},
15 messages::{
16 authentication::{
17 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
18 AuthenticationTokenRequest, TokenExtensionRequest,
19 },
20 repository::{
21 CreateRepositoryRequest, RepositoryInfoRequest, RepositoryMessage,
22 RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
23 },
24 InstanceAuthenticated, MessageKind,
25 },
26 };
27 use giterated_daemon::{validate_version, version};
28 use serde::Serialize;
29 use tokio::net::TcpStream;
30 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
31
32 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
33
34 #[macro_use]
35 extern crate tracing;
36
37 pub struct GiteratedApiBuilder {
38 our_instance: Instance,
39 our_private_key: Option<String>,
40 our_public_key: Option<String>,
41 target_instance: Option<Instance>,
42 }
43
44 pub trait AsInstance {
45 type Error: std::error::Error + Send + Sync + 'static;
46
47 fn into_instance(self) -> Result<Instance, Self::Error>;
48 }
49
50 impl AsInstance for &str {
51 type Error = <Instance as FromStr>::Err;
52
53 fn into_instance(self) -> Result<Instance, Self::Error> {
54 Instance::from_str(self)
55 }
56 }
57
58 impl AsInstance for Instance {
59 type Error = Infallible;
60
61 fn into_instance(self) -> Result<Instance, Self::Error> {
62 Ok(self)
63 }
64 }
65
66 impl GiteratedApiBuilder {
67 pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> {
68 Ok(Self {
69 our_instance: instance.into_instance()?,
70 our_private_key: None,
71 our_public_key: None,
72 target_instance: None,
73 })
74 }
75
76 pub fn from_local_for_other(
77 instance: impl AsInstance,
78 other: impl AsInstance,
79 ) -> Result<Self, anyhow::Error> {
80 Ok(Self {
81 our_instance: instance.into_instance()?,
82 our_private_key: None,
83 our_public_key: None,
84 target_instance: Some(other.into_instance()?),
85 })
86 }
87
88 pub fn private_key(&mut self, key: impl ToString) -> &mut Self {
89 self.our_private_key = Some(key.to_string());
90
91 self
92 }
93
94 pub fn public_key(&mut self, key: impl ToString) -> &mut Self {
95 self.our_public_key = Some(key.to_string());
96
97 self
98 }
99
100 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
101 Ok(GiteratedApi {
102 configuration: Arc::new(GiteratedApiConfiguration {
103 our_private_key: self.our_private_key.take().unwrap(),
104 our_public_key: self.our_public_key.take().unwrap(),
105 target_instance: self.target_instance.take(),
106 // todo
107 target_public_key: None,
108 }),
109 })
110 }
111 }
112
113 struct GiteratedConnectionPool {
114 target_instance: Instance,
115 }
116
117 #[async_trait::async_trait]
118 impl Manager for GiteratedConnectionPool {
119 type Type = Socket;
120 type Error = anyhow::Error;
121
122 async fn create(&self) -> Result<Socket, Self::Error> {
123 info!("Creating new Daemon connection");
124 let mut connection = GiteratedApi::connect_to(self.target_instance.clone()).await?;
125
126 // Handshake first!
127 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
128
129 Ok(connection)
130 }
131
132 async fn recycle(&self, _: &mut Socket) -> RecycleResult<Self::Error> {
133 Ok(())
134 }
135 }
136
137 pub struct GiteratedApiConfiguration {
138 pub our_private_key: String,
139 pub our_public_key: String,
140 pub target_instance: Option<Instance>,
141 pub target_public_key: Option<String>,
142 }
143
144 #[derive(Clone)]
145 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
146
147 impl DaemonConnectionPool {
148 pub fn connect(
149 instance: impl ToOwned<Owned = Instance>,
150 ) -> Result<Self, BuildError<anyhow::Error>> {
151 Ok(Self(
152 Pool::builder(GiteratedConnectionPool {
153 target_instance: instance.to_owned(),
154 })
155 .build()?,
156 ))
157 }
158 }
159
160 #[derive(Clone)]
161 pub struct GiteratedApi {
162 configuration: Arc<GiteratedApiConfiguration>,
163 }
164
165 impl GiteratedApi {
166 pub async fn public_key(&self) -> String {
167 if let Some(public_key) = &self.configuration.target_public_key {
168 public_key.clone()
169 } else {
170 assert!(self.configuration.target_instance.is_none());
171
172 self.configuration.our_public_key.clone()
173 }
174 }
175
176 /// Register on an [`Instance`].
177 ///
178 /// # Authorization
179 /// - Must be made by the same instance its being sent to
180 pub async fn register(
181 &self,
182 username: String,
183 email: Option<String>,
184 password: String,
185 pool: &DaemonConnectionPool,
186 ) -> Result<RegisterAccountResponse, anyhow::Error> {
187 let mut connection = pool.0.get().await.unwrap();
188
189 let message = InstanceAuthenticated::new(
190 RegisterAccountRequest {
191 username,
192 email,
193 password,
194 },
195 pool.0.manager().target_instance.clone(),
196 self.configuration.our_private_key.clone(),
197 )
198 .unwrap();
199
200 Self::send_message(
201 &MessageKind::Authentication(AuthenticationMessage::Request(
202 AuthenticationRequest::RegisterAccount(message),
203 )),
204 &mut connection,
205 )
206 .await?;
207
208 while let Ok(payload) = self.next_payload(&mut connection).await {
209 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
210 AuthenticationResponse::RegisterAccount(response),
211 ))) = serde_json::from_slice(&payload)
212 {
213 return Ok(response);
214 }
215 }
216
217 unreachable!()
218 }
219
220 /// Create repository on the target instance.
221 pub async fn create_repository(
222 &self,
223 user_token: String,
224 name: String,
225 description: Option<String>,
226 visibility: RepositoryVisibility,
227 default_branch: String,
228 owner: User,
229 pool: &DaemonConnectionPool,
230 ) -> Result<bool, anyhow::Error> {
231 let mut connection = pool.0.get().await.unwrap();
232
233 let target_respository = Repository {
234 owner: owner.clone(),
235 name: name.clone(),
236 instance: self
237 .configuration
238 .target_instance
239 .as_ref()
240 .unwrap_or(&pool.0.manager().target_instance)
241 .clone(),
242 };
243
244 let request = CreateRepositoryRequest {
245 name,
246 description,
247 visibility,
248 default_branch,
249 owner,
250 };
251
252 let message = UnvalidatedUserAuthenticated::new(
253 request,
254 user_token,
255 self.configuration.our_private_key.clone(),
256 )
257 .unwrap();
258
259 Self::send_message(
260 &MessageKind::Repository(RepositoryMessage {
261 target: target_respository,
262 command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
263 message,
264 )),
265 }),
266 &mut connection,
267 )
268 .await?;
269
270 while let Ok(payload) = self.next_payload(&mut connection).await {
271 if let Ok(MessageKind::Repository(RepositoryMessage {
272 command:
273 RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
274 ..
275 })) = serde_json::from_slice(&payload)
276 {
277 return Ok(true);
278 }
279 }
280
281 unreachable!()
282 }
283
284 pub async fn repository_info(
285 &self,
286 token: &str,
287 repository: Repository,
288 pool: &DaemonConnectionPool,
289 ) -> Result<RepositoryView, Error> {
290 let mut connection = pool.0.get().await.unwrap();
291
292 let message = UnvalidatedUserAuthenticated::new(
293 RepositoryInfoRequest {
294 repository: repository.clone(),
295 extra_metadata: true,
296 rev: None,
297 path: None,
298 },
299 token.to_string(),
300 self.configuration.our_private_key.clone(),
301 )
302 .unwrap();
303
304 Self::send_message(
305 &MessageKind::Repository(RepositoryMessage {
306 target: repository.clone(),
307 command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
308 }),
309 &mut connection,
310 )
311 .await?;
312
313 loop {
314 // while let Ok(payload) = Self::next_payload(&mut socket).await {
315 let payload = match self.next_payload(&mut connection).await {
316 Ok(payload) => payload,
317 Err(err) => {
318 error!("Error while fetching next payload: {:?}", err);
319 continue;
320 }
321 };
322
323 if let Ok(MessageKind::Repository(RepositoryMessage {
324 command:
325 RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
326 ..
327 })) = serde_json::from_slice(&payload)
328 {
329 return Ok(response);
330 }
331 }
332 }
333
334 /// Requests an authentication token for the given login.
335 ///
336 /// # Authorization
337 /// This request can only be sent to the same instance from which
338 /// it is issued.
339 pub async fn authentication_token(
340 &self,
341 secret_key: String,
342 username: String,
343 password: String,
344 pool: &DaemonConnectionPool,
345 ) -> Result<String, Error> {
346 let mut connection = pool.0.get().await.unwrap();
347
348 let request = InstanceAuthenticated::new(
349 AuthenticationTokenRequest {
350 secret_key,
351 username,
352 password,
353 },
354 pool.0.manager().target_instance.clone(),
355 include_str!("example_keys/giterated.key").to_string(),
356 )
357 .unwrap();
358
359 Self::send_message(
360 &MessageKind::Authentication(AuthenticationMessage::Request(
361 AuthenticationRequest::AuthenticationToken(request),
362 )),
363 &mut connection,
364 )
365 .await?;
366
367 loop {
368 // while let Ok(payload) = Self::next_payload(&mut socket).await {
369 let payload = match self.next_payload(&mut connection).await {
370 Ok(payload) => payload,
371 Err(err) => {
372 error!("Error while fetching next payload: {:?}", err);
373 continue;
374 }
375 };
376
377 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
378 AuthenticationResponse::AuthenticationToken(response),
379 ))) = serde_json::from_slice(&payload)
380 {
381 return Ok(response.token);
382 }
383 }
384 }
385
386 /// Requests a new token for the given login.
387 ///
388 /// # Authorization
389 /// This request can only be sent to the same instance from which
390 /// it is issued.
391 pub async fn extend_token(
392 &self,
393 secret_key: String,
394 token: String,
395 pool: &DaemonConnectionPool,
396 ) -> Result<Option<String>, Error> {
397 let mut connection = pool.0.get().await.unwrap();
398
399 let request = InstanceAuthenticated::new(
400 TokenExtensionRequest { secret_key, token },
401 pool.0.manager().target_instance.clone(),
402 self.configuration.our_private_key.clone(),
403 )
404 .unwrap();
405
406 Self::send_message(
407 &MessageKind::Authentication(AuthenticationMessage::Request(
408 AuthenticationRequest::TokenExtension(request),
409 )),
410 &mut connection,
411 )
412 .await?;
413
414 while let Ok(payload) = self.next_payload(&mut connection).await {
415 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
416 AuthenticationResponse::TokenExtension(response),
417 ))) = serde_json::from_slice(&payload)
418 {
419 return Ok(response.new_token);
420 }
421 }
422
423 todo!()
424 }
425
426 async fn connect_to(instance: Instance) -> Result<Socket, anyhow::Error> {
427 let url = &instance.url;
428 info!(
429 "Connecting to {}",
430 format!("wss://{}/.giterated/daemon/", url)
431 );
432 let (websocket, _response) =
433 connect_async(&format!("wss://{}/.giterated/daemon/", url)).await?;
434 info!("Connection established with {}", url);
435
436 Ok(websocket)
437 }
438
439 async fn handle_handshake(
440 socket: &mut Socket,
441 instance: &Instance,
442 ) -> Result<(), anyhow::Error> {
443 // Send handshake initiation
444
445 Self::send_message(
446 &MessageKind::Handshake(HandshakeMessage::Initiate(InitiateHandshake {
447 identity: instance.clone(),
448 version: version(),
449 })),
450 socket,
451 )
452 .await?;
453
454 while let Some(message) = socket.next().await {
455 let message = match message {
456 Ok(message) => message,
457 Err(err) => {
458 error!("Error reading message: {:?}", err);
459 continue;
460 }
461 };
462
463 let payload = match message {
464 Message::Text(text) => text.into_bytes(),
465 Message::Binary(bytes) => bytes,
466 Message::Ping(_) => continue,
467 Message::Pong(_) => continue,
468 Message::Close(_) => {
469 panic!()
470 }
471 _ => unreachable!(),
472 };
473
474 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
475
476 let message = match serde_json::from_slice::<MessageKind>(&payload) {
477 Ok(message) => message,
478 Err(err) => {
479 error!("Error deserializing message: {:?}", err);
480 continue;
481 }
482 };
483
484 if let MessageKind::Handshake(handshake) = message {
485 match handshake {
486 HandshakeMessage::Initiate(_) => unimplemented!(),
487 HandshakeMessage::Response(response) => {
488 let message = if !validate_version(&response.version) {
489 error!(
490 "Version compatibility failure! Our Version: {}, Their Version: {}",
491 version(),
492 response.version
493 );
494
495 HandshakeFinalize { success: false }
496 } else {
497 info!("Connected with a compatible version");
498
499 HandshakeFinalize { success: true }
500 };
501 // Send HandshakeMessage::Finalize
502 Self::send_message(
503 &MessageKind::Handshake(HandshakeMessage::Finalize(message)),
504 socket,
505 )
506 .await?;
507 }
508 HandshakeMessage::Finalize(finalize) => {
509 if finalize.success {
510 return Ok(());
511 } else {
512 panic!()
513 }
514 }
515 }
516 }
517 }
518
519 Ok(())
520 }
521
522 async fn send_message<T: Serialize>(
523 message: &T,
524 socket: &mut Socket,
525 ) -> Result<(), anyhow::Error> {
526 socket
527 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
528 .await?;
529
530 Ok(())
531 }
532
533 async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
534 while let Some(message) = socket.next().await {
535 let message = message?;
536
537 match message {
538 Message::Text(text) => return Ok(text.into_bytes()),
539 Message::Binary(bytes) => return Ok(bytes),
540 Message::Ping(_) => continue,
541 Message::Pong(_) => continue,
542 Message::Close(_) => {
543 panic!()
544 }
545 _ => unreachable!(),
546 }
547 }
548
549 unreachable!()
550 }
551 }
552