JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

Add connection pool

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨98d3b04

⁨src/lib.rs⁩ - ⁨17296⁩ bytes
Raw
1 use std::convert::Infallible;
2 use std::net::SocketAddr;
3 use std::str::FromStr;
4 use std::sync::Arc;
5
6 use anyhow::Error;
7 use deadpool::managed::{BuildError, Manager, Pool, RecycleResult};
8 use futures_util::{SinkExt, StreamExt};
9 use giterated_daemon::messages::authentication::{RegisterAccountRequest, RegisterAccountResponse};
10 use giterated_daemon::messages::UnvalidatedUserAuthenticated;
11 use giterated_daemon::model::repository::RepositoryVisibility;
12 use giterated_daemon::model::user::User;
13 use giterated_daemon::{
14 handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake},
15 messages::{
16 authentication::{
17 AuthenticationMessage, AuthenticationRequest, AuthenticationResponse,
18 AuthenticationTokenRequest, TokenExtensionRequest,
19 },
20 repository::{
21 CreateRepositoryRequest, RepositoryInfoRequest, RepositoryMessage,
22 RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
23 },
24 InstanceAuthenticated, MessageKind,
25 },
26 model::{
27 instance::Instance,
28 repository::{Repository, RepositoryView},
29 },
30 };
31 use giterated_daemon::{validate_version, version};
32 use serde::Serialize;
33 use tokio::net::TcpStream;
34 use tokio::sync::broadcast::{Receiver, Sender};
35 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
36
37 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
38
39 #[macro_use]
40 extern crate tracing;
41
42 pub struct GiteratedApiBuilder {
43 our_instance: Instance,
44 our_private_key: Option<String>,
45 our_public_key: Option<String>,
46 target_instance: Option<Instance>,
47 }
48
49 pub trait AsInstance {
50 type Error: std::error::Error + Send + Sync + 'static;
51
52 fn into_instance(self) -> Result<Instance, Self::Error>;
53 }
54
55 impl AsInstance for &str {
56 type Error = <Instance as FromStr>::Err;
57
58 fn into_instance(self) -> Result<Instance, Self::Error> {
59 Instance::from_str(self)
60 }
61 }
62
63 impl AsInstance for Instance {
64 type Error = Infallible;
65
66 fn into_instance(self) -> Result<Instance, Self::Error> {
67 Ok(self)
68 }
69 }
70
71 impl GiteratedApiBuilder {
72 pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> {
73 Ok(Self {
74 our_instance: instance.into_instance()?,
75 our_private_key: None,
76 our_public_key: None,
77 target_instance: None,
78 })
79 }
80
81 pub fn from_local_for_other(
82 instance: impl AsInstance,
83 other: impl AsInstance,
84 ) -> Result<Self, anyhow::Error> {
85 Ok(Self {
86 our_instance: instance.into_instance()?,
87 our_private_key: None,
88 our_public_key: None,
89 target_instance: Some(other.into_instance()?),
90 })
91 }
92
93 pub fn private_key(&mut self, key: impl ToString) -> &mut Self {
94 self.our_private_key = Some(key.to_string());
95
96 self
97 }
98
99 pub fn public_key(&mut self, key: impl ToString) -> &mut Self {
100 self.our_public_key = Some(key.to_string());
101
102 self
103 }
104
105 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
106 Ok(GiteratedApi {
107 configuration: Arc::new(GiteratedApiConfiguration {
108 our_private_key: self.our_private_key.take().unwrap(),
109 our_public_key: self.our_public_key.take().unwrap(),
110 target_instance: self.target_instance.take(),
111 // todo
112 target_public_key: None,
113 }),
114 })
115 }
116 }
117
118 struct GiteratedConnectionPool {
119 target_instance: Instance,
120 }
121
122 #[async_trait::async_trait]
123 impl Manager for GiteratedConnectionPool {
124 type Type = Socket;
125 type Error = anyhow::Error;
126
127 async fn create(&self) -> Result<Socket, Self::Error> {
128 info!("Creating new Daemon connection");
129 let mut connection = GiteratedApi::connect_to(self.target_instance.clone()).await?;
130
131 // Handshake first!
132 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
133
134 Ok(connection)
135 }
136
137 async fn recycle(&self, _: &mut Socket) -> RecycleResult<Self::Error> {
138 Ok(())
139 }
140 }
141
142 pub struct GiteratedApiConfiguration {
143 pub our_private_key: String,
144 pub our_public_key: String,
145 pub target_instance: Option<Instance>,
146 pub target_public_key: Option<String>,
147 }
148
149 #[derive(Clone)]
150 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
151
152 impl DaemonConnectionPool {
153 pub fn connect(
154 instance: impl ToOwned<Owned = Instance>,
155 ) -> Result<Self, BuildError<anyhow::Error>> {
156 Ok(Self(
157 Pool::builder(GiteratedConnectionPool {
158 target_instance: instance.to_owned(),
159 })
160 .build()?,
161 ))
162 }
163 }
164
165 #[derive(Clone)]
166 pub struct GiteratedApi {
167 configuration: Arc<GiteratedApiConfiguration>,
168 }
169
170 impl GiteratedApi {
171 pub async fn public_key(&mut self) -> String {
172 if let Some(public_key) = &self.configuration.target_public_key {
173 public_key.clone()
174 } else {
175 assert!(self.configuration.target_instance.is_none());
176
177 self.configuration.our_public_key.clone()
178 }
179 }
180
181 /// Register on an [`Instance`].
182 ///
183 /// # Authorization
184 /// - Must be made by the same instance its being sent to
185 pub async fn register(
186 &self,
187 username: String,
188 email: Option<String>,
189 password: String,
190 pool: &DaemonConnectionPool,
191 ) -> Result<RegisterAccountResponse, anyhow::Error> {
192 let mut connection = pool.0.get().await.unwrap();
193
194 let message = InstanceAuthenticated::new(
195 RegisterAccountRequest {
196 username,
197 email,
198 password,
199 },
200 pool.0.manager().target_instance.clone(),
201 self.configuration.our_private_key.clone(),
202 )
203 .unwrap();
204
205 Self::send_message(
206 &MessageKind::Authentication(AuthenticationMessage::Request(
207 AuthenticationRequest::RegisterAccount(message),
208 )),
209 &mut connection,
210 )
211 .await?;
212
213 while let Ok(payload) = self.next_payload(&mut connection).await {
214 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
215 AuthenticationResponse::RegisterAccount(response),
216 ))) = serde_json::from_slice(&payload)
217 {
218 return Ok(response);
219 }
220 }
221
222 unreachable!()
223 }
224
225 /// Create repository on the target instance.
226 pub async fn create_repository(
227 &self,
228 user_token: String,
229 name: String,
230 description: Option<String>,
231 visibility: RepositoryVisibility,
232 default_branch: String,
233 owner: User,
234 pool: &DaemonConnectionPool,
235 ) -> Result<bool, anyhow::Error> {
236 let mut connection = pool.0.get().await.unwrap();
237
238 let target_respository = Repository {
239 owner: owner.clone(),
240 name: name.clone(),
241 instance: self
242 .configuration
243 .target_instance
244 .as_ref()
245 .unwrap_or(&pool.0.manager().target_instance)
246 .clone(),
247 };
248
249 let request = CreateRepositoryRequest {
250 name,
251 description,
252 visibility,
253 default_branch,
254 owner,
255 };
256
257 let message = UnvalidatedUserAuthenticated::new(
258 request,
259 user_token,
260 self.configuration.our_private_key.clone(),
261 )
262 .unwrap();
263
264 Self::send_message(
265 &MessageKind::Repository(RepositoryMessage {
266 target: target_respository,
267 command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
268 message,
269 )),
270 }),
271 &mut connection,
272 )
273 .await?;
274
275 while let Ok(payload) = self.next_payload(&mut connection).await {
276 if let Ok(MessageKind::Repository(RepositoryMessage {
277 command:
278 RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
279 ..
280 })) = serde_json::from_slice(&payload)
281 {
282 return Ok(true);
283 }
284 }
285
286 unreachable!()
287 }
288
289 pub async fn repository_info(
290 &mut self,
291 token: &str,
292 repository: Repository,
293 pool: &DaemonConnectionPool,
294 ) -> Result<RepositoryView, Error> {
295 let mut connection = pool.0.get().await.unwrap();
296
297 let message = UnvalidatedUserAuthenticated::new(
298 RepositoryInfoRequest {
299 repository: repository.clone(),
300 extra_metadata: true,
301 rev: None,
302 path: None,
303 },
304 token.to_string(),
305 self.configuration.our_private_key.clone(),
306 )
307 .unwrap();
308
309 Self::send_message(
310 &MessageKind::Repository(RepositoryMessage {
311 target: repository.clone(),
312 command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
313 }),
314 &mut connection,
315 )
316 .await?;
317
318 loop {
319 // while let Ok(payload) = Self::next_payload(&mut socket).await {
320 let payload = match self.next_payload(&mut connection).await {
321 Ok(payload) => payload,
322 Err(err) => {
323 error!("Error while fetching next payload: {:?}", err);
324 continue;
325 }
326 };
327
328 if let Ok(MessageKind::Repository(RepositoryMessage {
329 command:
330 RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
331 ..
332 })) = serde_json::from_slice(&payload)
333 {
334 return Ok(response);
335 }
336 }
337 }
338
339 /// Requests an authentication token for the given login.
340 ///
341 /// # Authorization
342 /// This request can only be sent to the same instance from which
343 /// it is issued.
344 pub async fn authentication_token(
345 &mut self,
346 secret_key: String,
347 username: String,
348 password: String,
349 pool: &DaemonConnectionPool,
350 ) -> Result<String, Error> {
351 let mut connection = pool.0.get().await.unwrap();
352
353 let request = InstanceAuthenticated::new(
354 AuthenticationTokenRequest {
355 secret_key,
356 username,
357 password,
358 },
359 pool.0.manager().target_instance.clone(),
360 include_str!("example_keys/giterated.key").to_string(),
361 )
362 .unwrap();
363
364 Self::send_message(
365 &MessageKind::Authentication(AuthenticationMessage::Request(
366 AuthenticationRequest::AuthenticationToken(request),
367 )),
368 &mut connection,
369 )
370 .await?;
371
372 loop {
373 // while let Ok(payload) = Self::next_payload(&mut socket).await {
374 let payload = match self.next_payload(&mut connection).await {
375 Ok(payload) => payload,
376 Err(err) => {
377 error!("Error while fetching next payload: {:?}", err);
378 continue;
379 }
380 };
381
382 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
383 AuthenticationResponse::AuthenticationToken(response),
384 ))) = serde_json::from_slice(&payload)
385 {
386 return Ok(response.token);
387 }
388 }
389 }
390
391 /// Requests a new token for the given login.
392 ///
393 /// # Authorization
394 /// This request can only be sent to the same instance from which
395 /// it is issued.
396 pub async fn extend_token(
397 &mut self,
398 secret_key: String,
399 token: String,
400 pool: &DaemonConnectionPool,
401 ) -> Result<Option<String>, Error> {
402 let mut connection = pool.0.get().await.unwrap();
403
404 let request = InstanceAuthenticated::new(
405 TokenExtensionRequest { secret_key, token },
406 pool.0.manager().target_instance.clone(),
407 self.configuration.our_private_key.clone(),
408 )
409 .unwrap();
410
411 Self::send_message(
412 &MessageKind::Authentication(AuthenticationMessage::Request(
413 AuthenticationRequest::TokenExtension(request),
414 )),
415 &mut connection,
416 )
417 .await?;
418
419 while let Ok(payload) = self.next_payload(&mut connection).await {
420 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
421 AuthenticationResponse::TokenExtension(response),
422 ))) = serde_json::from_slice(&payload)
423 {
424 return Ok(response.new_token);
425 }
426 }
427
428 todo!()
429 }
430
431 async fn connect_to(instance: Instance) -> Result<Socket, anyhow::Error> {
432 let url = &instance.url;
433 info!(
434 "Connecting to {}",
435 format!("wss://{}/.giterated/daemon/", url)
436 );
437 let (websocket, _response) =
438 connect_async(&format!("wss://{}/.giterated/daemon/", url)).await?;
439 info!("Connection established with {}", url);
440
441 Ok(websocket)
442 }
443
444 async fn handle_handshake(
445 socket: &mut Socket,
446 instance: &Instance,
447 ) -> Result<(), anyhow::Error> {
448 // Send handshake initiation
449
450 Self::send_message(
451 &MessageKind::Handshake(HandshakeMessage::Initiate(InitiateHandshake {
452 identity: instance.clone(),
453 version: version(),
454 })),
455 socket,
456 )
457 .await?;
458
459 while let Some(message) = socket.next().await {
460 let message = match message {
461 Ok(message) => message,
462 Err(err) => {
463 error!("Error reading message: {:?}", err);
464 continue;
465 }
466 };
467
468 let payload = match message {
469 Message::Text(text) => text.into_bytes(),
470 Message::Binary(bytes) => bytes,
471 Message::Ping(_) => continue,
472 Message::Pong(_) => continue,
473 Message::Close(_) => {
474 panic!()
475 }
476 _ => unreachable!(),
477 };
478
479 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
480
481 let message = match serde_json::from_slice::<MessageKind>(&payload) {
482 Ok(message) => message,
483 Err(err) => {
484 error!("Error deserializing message: {:?}", err);
485 continue;
486 }
487 };
488
489 if let MessageKind::Handshake(handshake) = message {
490 match handshake {
491 HandshakeMessage::Initiate(_) => unimplemented!(),
492 HandshakeMessage::Response(response) => {
493 let message = if !validate_version(&response.version) {
494 error!(
495 "Version compatibility failure! Our Version: {}, Their Version: {}",
496 version(),
497 response.version
498 );
499
500 HandshakeFinalize { success: false }
501 } else {
502 info!("Connected with a compatible version");
503
504 HandshakeFinalize { success: true }
505 };
506 // Send HandshakeMessage::Finalize
507 Self::send_message(
508 &MessageKind::Handshake(HandshakeMessage::Finalize(message)),
509 socket,
510 )
511 .await?;
512 }
513 HandshakeMessage::Finalize(finalize) => {
514 if finalize.success {
515 return Ok(());
516 } else {
517 panic!()
518 }
519 }
520 }
521 }
522 }
523
524 Ok(())
525 }
526
527 async fn send_message<T: Serialize>(
528 message: &T,
529 socket: &mut Socket,
530 ) -> Result<(), anyhow::Error> {
531 socket
532 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
533 .await?;
534
535 Ok(())
536 }
537
538 async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
539 while let Some(message) = socket.next().await {
540 let message = message?;
541
542 match message {
543 Message::Text(text) => return Ok(text.into_bytes()),
544 Message::Binary(bytes) => return Ok(bytes),
545 Message::Ping(_) => continue,
546 Message::Pong(_) => continue,
547 Message::Close(_) => {
548 panic!()
549 }
550 _ => unreachable!(),
551 }
552 }
553
554 unreachable!()
555 }
556 }
557