use std::convert::Infallible; use std::str::FromStr; use std::{error::Error, net::SocketAddr}; use futures_util::{SinkExt, StreamExt}; use giterated_daemon::messages::authentication::{RegisterAccountRequest, RegisterAccountResponse}; use giterated_daemon::messages::UnvalidatedUserAuthenticated; use giterated_daemon::model::repository::RepositoryVisibility; use giterated_daemon::model::user::User; use giterated_daemon::{version, validate_version}; use giterated_daemon::{ handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake}, messages::{ authentication::{ AuthenticationMessage, AuthenticationRequest, AuthenticationResponse, AuthenticationTokenRequest, TokenExtensionRequest, }, repository::{ CreateRepositoryRequest, RepositoryInfoRequest, RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse, }, InstanceAuthenticated, MessageKind, }, model::{ instance::Instance, repository::{Repository, RepositoryView}, }, }; use serde::Serialize; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; type Socket = WebSocketStream>; #[macro_use] extern crate tracing; pub struct GiteratedApiBuilder { our_instance: Instance, our_private_key: Option, our_public_key: Option, target_instance: Option, } pub trait AsInstance { type Error: Error + Send + Sync + 'static; fn into_instance(self) -> Result; } impl AsInstance for &str { type Error = ::Err; fn into_instance(self) -> Result { Instance::from_str(self) } } impl AsInstance for Instance { type Error = Infallible; fn into_instance(self) -> Result { Ok(self) } } impl GiteratedApiBuilder { pub fn from_local(instance: impl AsInstance) -> Result { Ok(Self { our_instance: instance.into_instance()?, our_private_key: None, our_public_key: None, target_instance: None, }) } pub fn from_local_for_other( instance: impl AsInstance, other: impl AsInstance, ) -> Result { Ok(Self { our_instance: instance.into_instance()?, our_private_key: None, our_public_key: None, target_instance: Some(other.into_instance()?), }) } pub fn private_key(&mut self, key: impl ToString) -> &mut Self { self.our_private_key = Some(key.to_string()); self } pub fn public_key(&mut self, key: impl ToString) -> &mut Self { self.our_public_key = Some(key.to_string()); self } pub async fn build(&mut self) -> Result { Ok(GiteratedApi::new( self.our_instance.clone(), self.our_private_key.clone().unwrap(), self.our_public_key.clone().unwrap(), self.target_instance.clone(), ) .await?) } } pub struct GiteratedApi { pub connection: Socket, pub our_instance: Instance, pub our_private_key: String, pub our_public_key: String, pub target_instance: Option, pub target_public_key: Option, } impl GiteratedApi { pub async fn new( local_instance: Instance, private_key: String, public_key: String, target_instance: Option, ) -> Result { let connection = Self::connect_to( target_instance .clone() .unwrap_or_else(|| local_instance.clone()), ) .await?; let mut api = GiteratedApi { connection, our_instance: local_instance, our_private_key: private_key, our_public_key: public_key, target_instance, target_public_key: None, }; // Handle handshake api.handle_handshake().await?; Ok(api) } pub async fn public_key(&mut self) -> String { if let Some(public_key) = &self.target_public_key { public_key.clone() } else { let key = reqwest::get(format!( "https://{}/.giterated/pubkey.pem", self.target_instance .as_ref() .unwrap_or_else(|| &self.our_instance) .url )) .await .unwrap() .text() .await .unwrap(); self.target_public_key = Some(key.clone()); key } } /// Register on an [`Instance`]. /// /// # Authorization /// - Must be made by the same instance its being sent to pub async fn register( &mut self, username: String, email: Option, password: String, ) -> Result> { let message = InstanceAuthenticated::new( RegisterAccountRequest { username, email, password, }, self.our_instance.clone(), self.our_private_key.clone(), ) .unwrap(); self.send_message(&MessageKind::Authentication( AuthenticationMessage::Request(AuthenticationRequest::RegisterAccount(message)), )) .await?; while let Ok(payload) = self.next_payload().await { if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( AuthenticationResponse::RegisterAccount(response), ))) = serde_json::from_slice(&payload) { return Ok(response); } } unreachable!() } /// Create repository on the target instance. pub async fn create_repository( &mut self, user_token: String, name: String, description: Option, visibility: RepositoryVisibility, default_branch: String, owner: User, ) -> Result> { let target_respository = Repository { owner: owner.clone(), name: name.clone(), instance: self .target_instance .as_ref() .unwrap_or(&self.our_instance) .clone(), }; let request = CreateRepositoryRequest { name, description, visibility, default_branch, owner, }; let message = UnvalidatedUserAuthenticated::new(request, user_token, self.our_private_key.clone()) .unwrap(); self.send_message(&MessageKind::Repository(RepositoryMessage { target: target_respository, command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(message)), })) .await?; while let Ok(payload) = self.next_payload().await { if let Ok(MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), .. })) = serde_json::from_slice(&payload) { return Ok(true); } } unreachable!() } pub async fn repository_info( &mut self, token: &str, repository: Repository, ) -> Result> { let message = UnvalidatedUserAuthenticated::new( RepositoryInfoRequest { repository: repository.clone(), extra_metadata: true, rev: None, path: None, }, token.to_string(), self.our_private_key.clone(), ) .unwrap(); self.send_message(&MessageKind::Repository(RepositoryMessage { target: repository.clone(), command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)), })) .await?; loop { // while let Ok(payload) = Self::next_payload(&mut socket).await { let payload = match self.next_payload().await { Ok(payload) => payload, Err(err) => { error!("Error while fetching next payload: {:?}", err); continue; } }; if let Ok(MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), .. })) = serde_json::from_slice(&payload) { return Ok(response); } } } /// Requests an authentication token for the given login. /// /// # Authorization /// This request can only be sent to the same instance from which /// it is issued. pub async fn authentication_token( &mut self, secret_key: String, username: String, password: String, ) -> Result> { let request = InstanceAuthenticated::new( AuthenticationTokenRequest { secret_key, username, password, }, self.our_instance.clone(), include_str!("example_keys/giterated.key").to_string(), ) .unwrap(); self.send_message(&MessageKind::Authentication( AuthenticationMessage::Request(AuthenticationRequest::AuthenticationToken(request)), )) .await?; loop { // while let Ok(payload) = Self::next_payload(&mut socket).await { let payload = match self.next_payload().await { Ok(payload) => payload, Err(err) => { error!("Error while fetching next payload: {:?}", err); continue; } }; if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( AuthenticationResponse::AuthenticationToken(response), ))) = serde_json::from_slice(&payload) { return Ok(response.token); } } } /// Requests a new token for the given login. /// /// # Authorization /// This request can only be sent to the same instance from which /// it is issued. pub async fn extend_token( &mut self, secret_key: String, token: String, ) -> Result, Box> { let request = InstanceAuthenticated::new( TokenExtensionRequest { secret_key, token }, self.our_instance.clone(), self.our_private_key.clone(), ) .unwrap(); self.send_message(&MessageKind::Authentication( AuthenticationMessage::Request(AuthenticationRequest::TokenExtension(request)), )) .await?; while let Ok(payload) = self.next_payload().await { if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( AuthenticationResponse::TokenExtension(response), ))) = serde_json::from_slice(&payload) { return Ok(response.new_token); } } todo!() } async fn connect_to(instance: Instance) -> Result { let url = &instance.url; info!( "Connecting to {}", format!("wss://{}/.giterated/daemon/", url) ); let (websocket, _response) = connect_async(&format!("wss://{}/.giterated/daemon/", url)).await?; info!("Connection established with {}", url); Ok(websocket) } async fn handle_handshake(&mut self) -> Result<(), anyhow::Error> { // Send handshake initiation self.send_message(&MessageKind::Handshake(HandshakeMessage::Initiate( InitiateHandshake { identity: self.our_instance.clone(), version: version(), }, ))) .await?; while let Some(message) = self.connection.next().await { let message = match message { Ok(message) => message, Err(err) => { error!("Error reading message: {:?}", err); continue; } }; let payload = match message { Message::Text(text) => text.into_bytes(), Message::Binary(bytes) => bytes, Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { panic!() } _ => unreachable!(), }; info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); let message = match serde_json::from_slice::(&payload) { Ok(message) => message, Err(err) => { error!("Error deserializing message: {:?}", err); continue; } }; if let MessageKind::Handshake(handshake) = message { match handshake { HandshakeMessage::Initiate(_) => unimplemented!(), HandshakeMessage::Response(response) => { let message = if !validate_version(&response.version) { error!( "Version compatibility failure! Our Version: {}, Their Version: {}", version(), response.version ); HandshakeFinalize { success: false } } else { info!("Connected with a compatible version"); HandshakeFinalize { success: true } }; // Send HandshakeMessage::Finalize self.send_message(&MessageKind::Handshake(HandshakeMessage::Finalize( message ))) .await?; } HandshakeMessage::Finalize(finalize) => { if finalize.success { return Ok(()); } else { panic!() } } } } } Ok(()) } async fn send_message(&mut self, message: &T) -> Result<(), anyhow::Error> { self.connection .send(Message::Binary(serde_json::to_vec(&message).unwrap())) .await?; Ok(()) } async fn next_payload(&mut self) -> Result, Box> { while let Some(message) = self.connection.next().await { let message = message?; match message { Message::Text(text) => return Ok(text.into_bytes()), Message::Binary(bytes) => return Ok(bytes), Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { panic!() } _ => unreachable!(), } } unreachable!() } }