JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

Add connection pool

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨98d3b04

Showing ⁨⁨3⁩ changed files⁩ with ⁨⁨186⁩ insertions⁩ and ⁨⁨161⁩ deletions⁩

Cargo.toml

View file
@@ -19,4 +19,6 @@ rand = "*"
19 19 jsonwebtoken = { version = "*", features = ["use_pem"]}
20 20 chrono = { version = "0.4", features = [ "serde", "std" ] }
21 21 reqwest = { version = "0.11" }
22 anyhow = "*"
22 \ No newline at end of file
22 anyhow = "*"
23 deadpool = "*"
24 async-trait = "*"
24 \ No newline at end of file

src/lib.rs

View file
@@ -1,13 +1,15 @@
1 1 use std::convert::Infallible;
2 use std::net::SocketAddr;
2 3 use std::str::FromStr;
3 use std::{error::Error, net::SocketAddr};
4 use std::sync::Arc;
4 5
6 use anyhow::Error;
7 use deadpool::managed::{BuildError, Manager, Pool, RecycleResult};
5 8 use futures_util::{SinkExt, StreamExt};
6 9 use giterated_daemon::messages::authentication::{RegisterAccountRequest, RegisterAccountResponse};
7 10 use giterated_daemon::messages::UnvalidatedUserAuthenticated;
8 11 use giterated_daemon::model::repository::RepositoryVisibility;
9 12 use giterated_daemon::model::user::User;
10 use giterated_daemon::{version, validate_version};
11 13 use giterated_daemon::{
12 14 handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake},
13 15 messages::{
@@ -26,8 +28,10 @@ use giterated_daemon::{
26 28 repository::{Repository, RepositoryView},
27 29 },
28 30 };
31 use giterated_daemon::{validate_version, version};
29 32 use serde::Serialize;
30 33 use tokio::net::TcpStream;
34 use tokio::sync::broadcast::{Receiver, Sender};
31 35 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
32 36
33 37 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
@@ -43,7 +47,7 @@ pub struct GiteratedApiBuilder {
43 47 }
44 48
45 49 pub trait AsInstance {
46 type Error: Error + Send + Sync + 'static;
50 type Error: std::error::Error + Send + Sync + 'static;
47 51
48 52 fn into_instance(self) -> Result<Instance, Self::Error>;
49 53 }
@@ -99,74 +103,78 @@ impl GiteratedApiBuilder {
99 103 }
100 104
101 105 pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> {
102 Ok(GiteratedApi::new(
103 self.our_instance.clone(),
104 self.our_private_key.clone().unwrap(),
105 self.our_public_key.clone().unwrap(),
106 self.target_instance.clone(),
107 )
108 .await?)
106 Ok(GiteratedApi {
107 configuration: Arc::new(GiteratedApiConfiguration {
108 our_private_key: self.our_private_key.take().unwrap(),
109 our_public_key: self.our_public_key.take().unwrap(),
110 target_instance: self.target_instance.take(),
111 // todo
112 target_public_key: None,
113 }),
114 })
109 115 }
110 116 }
111 117
112 pub struct GiteratedApi {
113 pub connection: Socket,
114 pub our_instance: Instance,
118 struct GiteratedConnectionPool {
119 target_instance: Instance,
120 }
121
122 #[async_trait::async_trait]
123 impl Manager for GiteratedConnectionPool {
124 type Type = Socket;
125 type Error = anyhow::Error;
126
127 async fn create(&self) -> Result<Socket, Self::Error> {
128 info!("Creating new Daemon connection");
129 let mut connection = GiteratedApi::connect_to(self.target_instance.clone()).await?;
130
131 // Handshake first!
132 GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?;
133
134 Ok(connection)
135 }
136
137 async fn recycle(&self, _: &mut Socket) -> RecycleResult<Self::Error> {
138 Ok(())
139 }
140 }
141
142 pub struct GiteratedApiConfiguration {
115 143 pub our_private_key: String,
116 144 pub our_public_key: String,
117 145 pub target_instance: Option<Instance>,
118 146 pub target_public_key: Option<String>,
119 147 }
120 148
121 impl GiteratedApi {
122 pub async fn new(
123 local_instance: Instance,
124 private_key: String,
125 public_key: String,
126 target_instance: Option<Instance>,
127 ) -> Result<Self, anyhow::Error> {
128 let connection = Self::connect_to(
129 target_instance
130 .clone()
131 .unwrap_or_else(|| local_instance.clone()),
132 )
133 .await?;
134
135 let mut api = GiteratedApi {
136 connection,
137 our_instance: local_instance,
138 our_private_key: private_key,
139 our_public_key: public_key,
140 target_instance,
141 target_public_key: None,
142 };
143
144 // Handle handshake
145 api.handle_handshake().await?;
146
147 Ok(api)
149 #[derive(Clone)]
150 pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>);
151
152 impl DaemonConnectionPool {
153 pub fn connect(
154 instance: impl ToOwned<Owned = Instance>,
155 ) -> Result<Self, BuildError<anyhow::Error>> {
156 Ok(Self(
157 Pool::builder(GiteratedConnectionPool {
158 target_instance: instance.to_owned(),
159 })
160 .build()?,
161 ))
148 162 }
163 }
149 164
165 #[derive(Clone)]
166 pub struct GiteratedApi {
167 configuration: Arc<GiteratedApiConfiguration>,
168 }
169
170 impl GiteratedApi {
150 171 pub async fn public_key(&mut self) -> String {
151 if let Some(public_key) = &self.target_public_key {
172 if let Some(public_key) = &self.configuration.target_public_key {
152 173 public_key.clone()
153 174 } else {
154 let key = reqwest::get(format!(
155 "https://{}/.giterated/pubkey.pem",
156 self.target_instance
157 .as_ref()
158 .unwrap_or_else(|| &self.our_instance)
159 .url
160 ))
161 .await
162 .unwrap()
163 .text()
164 .await
165 .unwrap();
166
167 self.target_public_key = Some(key.clone());
168
169 key
175 assert!(self.configuration.target_instance.is_none());
176
177 self.configuration.our_public_key.clone()
170 178 }
171 179 }
172 180
@@ -175,28 +183,34 @@ impl GiteratedApi {
175 183 /// # Authorization
176 184 /// - Must be made by the same instance its being sent to
177 185 pub async fn register(
178 &mut self,
186 &self,
179 187 username: String,
180 188 email: Option<String>,
181 189 password: String,
182 ) -> Result<RegisterAccountResponse, Box<dyn Error>> {
190 pool: &DaemonConnectionPool,
191 ) -> Result<RegisterAccountResponse, anyhow::Error> {
192 let mut connection = pool.0.get().await.unwrap();
193
183 194 let message = InstanceAuthenticated::new(
184 195 RegisterAccountRequest {
185 196 username,
186 197 email,
187 198 password,
188 199 },
189 self.our_instance.clone(),
190 self.our_private_key.clone(),
200 pool.0.manager().target_instance.clone(),
201 self.configuration.our_private_key.clone(),
191 202 )
192 203 .unwrap();
193 204
194 self.send_message(&MessageKind::Authentication(
195 AuthenticationMessage::Request(AuthenticationRequest::RegisterAccount(message)),
196 ))
205 Self::send_message(
206 &MessageKind::Authentication(AuthenticationMessage::Request(
207 AuthenticationRequest::RegisterAccount(message),
208 )),
209 &mut connection,
210 )
197 211 .await?;
198 212
199 while let Ok(payload) = self.next_payload().await {
213 while let Ok(payload) = self.next_payload(&mut connection).await {
200 214 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
201 215 AuthenticationResponse::RegisterAccount(response),
202 216 ))) = serde_json::from_slice(&payload)
@@ -210,21 +224,25 @@ impl GiteratedApi {
210 224
211 225 /// Create repository on the target instance.
212 226 pub async fn create_repository(
213 &mut self,
227 &self,
214 228 user_token: String,
215 229 name: String,
216 230 description: Option<String>,
217 231 visibility: RepositoryVisibility,
218 232 default_branch: String,
219 233 owner: User,
220 ) -> Result<bool, Box<dyn Error>> {
234 pool: &DaemonConnectionPool,
235 ) -> Result<bool, anyhow::Error> {
236 let mut connection = pool.0.get().await.unwrap();
237
221 238 let target_respository = Repository {
222 239 owner: owner.clone(),
223 240 name: name.clone(),
224 241 instance: self
242 .configuration
225 243 .target_instance
226 244 .as_ref()
227 .unwrap_or(&self.our_instance)
245 .unwrap_or(&pool.0.manager().target_instance)
228 246 .clone(),
229 247 };
230 248
@@ -236,17 +254,25 @@ impl GiteratedApi {
236 254 owner,
237 255 };
238 256
239 let message =
240 UnvalidatedUserAuthenticated::new(request, user_token, self.our_private_key.clone())
241 .unwrap();
257 let message = UnvalidatedUserAuthenticated::new(
258 request,
259 user_token,
260 self.configuration.our_private_key.clone(),
261 )
262 .unwrap();
242 263
243 self.send_message(&MessageKind::Repository(RepositoryMessage {
244 target: target_respository,
245 command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(message)),
246 }))
264 Self::send_message(
265 &MessageKind::Repository(RepositoryMessage {
266 target: target_respository,
267 command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
268 message,
269 )),
270 }),
271 &mut connection,
272 )
247 273 .await?;
248 274
249 while let Ok(payload) = self.next_payload().await {
275 while let Ok(payload) = self.next_payload(&mut connection).await {
250 276 if let Ok(MessageKind::Repository(RepositoryMessage {
251 277 command:
252 278 RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
@@ -264,7 +290,10 @@ impl GiteratedApi {
264 290 &mut self,
265 291 token: &str,
266 292 repository: Repository,
267 ) -> Result<RepositoryView, Box<dyn Error>> {
293 pool: &DaemonConnectionPool,
294 ) -> Result<RepositoryView, Error> {
295 let mut connection = pool.0.get().await.unwrap();
296
268 297 let message = UnvalidatedUserAuthenticated::new(
269 298 RepositoryInfoRequest {
270 299 repository: repository.clone(),
@@ -273,19 +302,22 @@ impl GiteratedApi {
273 302 path: None,
274 303 },
275 304 token.to_string(),
276 self.our_private_key.clone(),
305 self.configuration.our_private_key.clone(),
277 306 )
278 307 .unwrap();
279 308
280 self.send_message(&MessageKind::Repository(RepositoryMessage {
281 target: repository.clone(),
282 command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
283 }))
309 Self::send_message(
310 &MessageKind::Repository(RepositoryMessage {
311 target: repository.clone(),
312 command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)),
313 }),
314 &mut connection,
315 )
284 316 .await?;
285 317
286 318 loop {
287 319 // while let Ok(payload) = Self::next_payload(&mut socket).await {
288 let payload = match self.next_payload().await {
320 let payload = match self.next_payload(&mut connection).await {
289 321 Ok(payload) => payload,
290 322 Err(err) => {
291 323 error!("Error while fetching next payload: {:?}", err);
@@ -314,26 +346,32 @@ impl GiteratedApi {
314 346 secret_key: String,
315 347 username: String,
316 348 password: String,
317 ) -> Result<String, Box<dyn Error>> {
349 pool: &DaemonConnectionPool,
350 ) -> Result<String, Error> {
351 let mut connection = pool.0.get().await.unwrap();
352
318 353 let request = InstanceAuthenticated::new(
319 354 AuthenticationTokenRequest {
320 355 secret_key,
321 356 username,
322 357 password,
323 358 },
324 self.our_instance.clone(),
359 pool.0.manager().target_instance.clone(),
325 360 include_str!("example_keys/giterated.key").to_string(),
326 361 )
327 362 .unwrap();
328 363
329 self.send_message(&MessageKind::Authentication(
330 AuthenticationMessage::Request(AuthenticationRequest::AuthenticationToken(request)),
331 ))
364 Self::send_message(
365 &MessageKind::Authentication(AuthenticationMessage::Request(
366 AuthenticationRequest::AuthenticationToken(request),
367 )),
368 &mut connection,
369 )
332 370 .await?;
333 371
334 372 loop {
335 373 // while let Ok(payload) = Self::next_payload(&mut socket).await {
336 let payload = match self.next_payload().await {
374 let payload = match self.next_payload(&mut connection).await {
337 375 Ok(payload) => payload,
338 376 Err(err) => {
339 377 error!("Error while fetching next payload: {:?}", err);
@@ -359,20 +397,26 @@ impl GiteratedApi {
359 397 &mut self,
360 398 secret_key: String,
361 399 token: String,
362 ) -> Result<Option<String>, Box<dyn Error>> {
400 pool: &DaemonConnectionPool,
401 ) -> Result<Option<String>, Error> {
402 let mut connection = pool.0.get().await.unwrap();
403
363 404 let request = InstanceAuthenticated::new(
364 405 TokenExtensionRequest { secret_key, token },
365 self.our_instance.clone(),
366 self.our_private_key.clone(),
406 pool.0.manager().target_instance.clone(),
407 self.configuration.our_private_key.clone(),
367 408 )
368 409 .unwrap();
369 410
370 self.send_message(&MessageKind::Authentication(
371 AuthenticationMessage::Request(AuthenticationRequest::TokenExtension(request)),
372 ))
411 Self::send_message(
412 &MessageKind::Authentication(AuthenticationMessage::Request(
413 AuthenticationRequest::TokenExtension(request),
414 )),
415 &mut connection,
416 )
373 417 .await?;
374 418
375 while let Ok(payload) = self.next_payload().await {
419 while let Ok(payload) = self.next_payload(&mut connection).await {
376 420 if let Ok(MessageKind::Authentication(AuthenticationMessage::Response(
377 421 AuthenticationResponse::TokenExtension(response),
378 422 ))) = serde_json::from_slice(&payload)
@@ -397,18 +441,22 @@ impl GiteratedApi {
397 441 Ok(websocket)
398 442 }
399 443
400 async fn handle_handshake(&mut self) -> Result<(), anyhow::Error> {
444 async fn handle_handshake(
445 socket: &mut Socket,
446 instance: &Instance,
447 ) -> Result<(), anyhow::Error> {
401 448 // Send handshake initiation
402 449
403 self.send_message(&MessageKind::Handshake(HandshakeMessage::Initiate(
404 InitiateHandshake {
405 identity: self.our_instance.clone(),
450 Self::send_message(
451 &MessageKind::Handshake(HandshakeMessage::Initiate(InitiateHandshake {
452 identity: instance.clone(),
406 453 version: version(),
407 },
408 )))
454 })),
455 socket,
456 )
409 457 .await?;
410 458
411 while let Some(message) = self.connection.next().await {
459 while let Some(message) = socket.next().await {
412 460 let message = match message {
413 461 Ok(message) => message,
414 462 Err(err) => {
@@ -442,7 +490,6 @@ impl GiteratedApi {
442 490 match handshake {
443 491 HandshakeMessage::Initiate(_) => unimplemented!(),
444 492 HandshakeMessage::Response(response) => {
445
446 493 let message = if !validate_version(&response.version) {
447 494 error!(
448 495 "Version compatibility failure! Our Version: {}, Their Version: {}",
@@ -457,9 +504,10 @@ impl GiteratedApi {
457 504 HandshakeFinalize { success: true }
458 505 };
459 506 // Send HandshakeMessage::Finalize
460 self.send_message(&MessageKind::Handshake(HandshakeMessage::Finalize(
461 message
462 )))
507 Self::send_message(
508 &MessageKind::Handshake(HandshakeMessage::Finalize(message)),
509 socket,
510 )
463 511 .await?;
464 512 }
465 513 HandshakeMessage::Finalize(finalize) => {
@@ -476,15 +524,19 @@ impl GiteratedApi {
476 524 Ok(())
477 525 }
478 526
479 async fn send_message<T: Serialize>(&mut self, message: &T) -> Result<(), anyhow::Error> {
480 self.connection
527 async fn send_message<T: Serialize>(
528 message: &T,
529 socket: &mut Socket,
530 ) -> Result<(), anyhow::Error> {
531 socket
481 532 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
482 533 .await?;
534
483 535 Ok(())
484 536 }
485 537
486 async fn next_payload(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
487 while let Some(message) = self.connection.next().await {
538 async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> {
539 while let Some(message) = socket.next().await {
488 540 let message = message?;
489 541
490 542 match message {

src/main.rs

View file
@@ -1,16 +1,12 @@
1 use std::{collections::BTreeMap, str::FromStr, time::SystemTime};
2
3 use chrono::{DateTime, NaiveDateTime, Utc};
4 use giterated_api::{GiteratedApi, GiteratedApiBuilder};
5 use giterated_daemon::{
6 messages::repository::CreateRepositoryRequest,
7 model::{
8 instance::Instance,
9 repository::{Repository, RepositoryVisibility},
10 user::User,
11 },
1 use std::str::FromStr;
2
3 use giterated_api::{DaemonConnectionPool, GiteratedApiBuilder};
4 use giterated_daemon::model::{
5 instance::Instance,
6 repository::{Repository, RepositoryVisibility},
7 user::User,
12 8 };
13 use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, TokenData, Validation};
9 use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
14 10 use serde::{Deserialize, Serialize};
15 11 // use jwt::SignWithKey;
16 12
@@ -18,46 +14,15 @@ use serde::{Deserialize, Serialize};
18 14 extern crate tracing;
19 15
20 16 #[tokio::main]
21 async fn main() {
17 async fn main() -> Result<(), anyhow::Error> {
22 18 tracing_subscriber::fmt::init();
23 // info!(
24 // "Response from Daemon: {:?}",
25 // GiteratedApi::repository_info(Repository {
26 // name: String::from("foo"),
27 // instance: Instance {
28 // url: String::from("127.0.0.1:8080")
29 // }
30 // })
31 // .await
32 // );
33
34 // let encoding_key =
35 // EncodingKey::from_rsa_pem(include_bytes!("example_keys/giterated.key")).unwrap();
36
37 // let claims = UserTokenMetadata {
38 // user: User {
39 // username: String::from("ambee"),
40 // instance: Instance {
41 // url: String::from("giterated.dev"),
42 // },
43 // },
44 // generated_for: Instance {
45 // url: String::from("giterated.dev"),
46 // },
47 // exp: SystemTime::UNIX_EPOCH.elapsed().unwrap().as_secs(),
48 // };
49
50 // let token = encode(
51 // &jsonwebtoken::Header::new(Algorithm::RS256),
52 // &claims,
53 // &encoding_key,
54 // )
55 // .unwrap();
19
20 let pool = DaemonConnectionPool::connect(Instance::from_str("giterated.dev")?).unwrap();
56 21
57 22 let mut api = GiteratedApiBuilder::from_local("giterated.dev")
58 23 .unwrap()
59 24 .private_key(include_str!("example_keys/giterated.key"))
60 .public_key(include_str!("example_keys/giterated.key"))
25 .public_key(include_str!("example_keys/giterated.key.pub"))
61 26 .build()
62 27 .await
63 28 .unwrap();
@@ -69,6 +34,7 @@ async fn main() {
69 34 String::from("ambee"),
70 35 None,
71 36 String::from("lolthisisinthecommithistory"),
37 &pool,
72 38 )
73 39 .await;
74 40
@@ -79,6 +45,7 @@ async fn main() {
79 45 String::from("foobar"),
80 46 String::from("ambee"),
81 47 String::from("password"),
48 &pool,
82 49 )
83 50 .await
84 51 .unwrap();
@@ -101,7 +68,7 @@ async fn main() {
101 68 info!("Lets extend that token!");
102 69
103 70 let new_token = api
104 .extend_token(String::from("foobar"), token.clone())
71 .extend_token(String::from("foobar"), token.clone(), &pool)
105 72 .await
106 73 .unwrap();
107 74 info!("New Token Returned:\n{:?}", new_token);
@@ -116,6 +83,7 @@ async fn main() {
116 83 RepositoryVisibility::Public,
117 84 String::from("master"),
118 85 User::from_str("ambee:giterated.dev").unwrap(),
86 &pool,
119 87 )
120 88 .await
121 89 .unwrap();
@@ -127,12 +95,15 @@ async fn main() {
127 95 let view = api
128 96 .repository_info(
129 97 &token,
130 Repository::from_str("ambee:giterated.dev/[email protected]").unwrap(),
98 Repository::from_str("ambee:giterated.dev/[email protected]").unwrap(),
99 &pool,
131 100 )
132 101 .await
133 102 .unwrap();
134 103
135 104 info!("Repository Info:\n{:#?}", view);
105
106 Ok(())
136 107 }
137 108
138 109 #[derive(Debug, Serialize, Deserialize)]