pub mod authentication; pub mod handshake; pub mod repository; pub mod user; pub mod wrapper; use std::{any::type_name, collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc}; use anyhow::Error; use futures_util::{stream::StreamExt, SinkExt}; use semver::Version; use serde::{de::DeserializeOwned, Serialize}; use tokio::{ net::TcpStream, sync::{ broadcast::{Receiver, Sender}, Mutex, }, task::JoinHandle, }; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::{ authentication::AuthenticationTokenGranter, backend::{DiscoveryBackend, RepositoryBackend, UserBackend}, handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse}, listener::Listeners, messages::{ authentication::{ AuthenticationMessage, AuthenticationRequest, AuthenticationResponse, TokenExtensionResponse, }, repository::{ RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse, }, user::{ UserMessage, UserMessageKind, UserMessageRequest, UserMessageResponse, UserRepositoriesResponse, }, ErrorMessage, MessageKind, }, model::{ instance::{Instance, InstanceMeta}, repository::Repository, user::User, }, validate_version, version, }; #[derive(Debug, thiserror::Error)] pub enum ConnectionError { #[error("connection error message {0}")] ErrorMessage(#[from] ErrorMessage), #[error("connection should close")] Shutdown, #[error("internal error {0}")] InternalError(#[from] Error), } pub struct RawConnection { pub task: JoinHandle<()>, } pub struct InstanceConnection { pub instance: InstanceMeta, pub sender: Sender, pub task: JoinHandle<()>, } /// Represents a connection which hasn't finished the handshake. pub struct UnestablishedConnection { pub socket: WebSocketStream, } #[derive(Default)] pub struct Connections { pub connections: Vec, pub instance_connections: HashMap, } pub async fn connection_worker( mut socket: &mut WebSocketStream, handshaked: &mut bool, listeners: &Arc>, connections: &Arc>, backend: &Arc>, user_backend: &Arc>, auth_granter: &Arc>, discovery_backend: &Arc>, addr: &SocketAddr, ) -> Result<(), ConnectionError> { let this_instance = Instance { url: String::from("giterated.dev"), }; let message = socket .next() .await .ok_or_else(|| ConnectionError::Shutdown)? .map_err(|e| Error::from(e))?; let payload = match message { Message::Text(text) => text.into_bytes(), Message::Binary(bytes) => bytes, Message::Ping(_) => return Ok(()), Message::Pong(_) => return Ok(()), Message::Close(_) => { info!("Closing connection with {}.", addr); return Err(ConnectionError::Shutdown); } _ => unreachable!(), }; let message = serde_json::from_slice::(&payload).map_err(|e| Error::from(e))?; if let MessageKind::Handshake(handshake) = message { match handshake { HandshakeMessage::Initiate(request) => { unimplemented!() } HandshakeMessage::Response(response) => { unimplemented!() } HandshakeMessage::Finalize(response) => { unimplemented!() } } } if !*handshaked { return Ok(()); } if let MessageKind::Repository(repository) = &message { if repository.target.instance != this_instance { info!("Forwarding command to {}", repository.target.instance.url); // We need to send this command to a different instance let mut listener = send_and_get_listener(message, &listeners, &connections).await; // Wait for response while let Ok(message) = listener.recv().await { if let MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(_), .. }) = message { let _result = send(&mut socket, message).await; } } return Ok(()); } else { // This message is targeting this instance match &repository.command { RepositoryMessageKind::Request(request) => match request.clone() { RepositoryRequest::CreateRepository(request) => { unimplemented!(); } RepositoryRequest::RepositoryFileInspect(request) => { unimplemented!() } RepositoryRequest::RepositoryInfo(request) => { unimplemented!() } RepositoryRequest::IssuesCount(request) => { unimplemented!() } RepositoryRequest::IssueLabels(request) => { unimplemented!() } RepositoryRequest::Issues(request) => { unimplemented!(); } }, RepositoryMessageKind::Response(_response) => { unreachable!() } } } } if let MessageKind::Authentication(authentication) = &message { match authentication { AuthenticationMessage::Request(request) => match request { AuthenticationRequest::AuthenticationToken(token) => { unimplemented!() } AuthenticationRequest::TokenExtension(request) => { unimplemented!() } AuthenticationRequest::RegisterAccount(request) => { unimplemented!() } }, AuthenticationMessage::Response(_) => unreachable!(), } } if let MessageKind::Discovery(message) = &message { let mut backend = discovery_backend.lock().await; backend.try_handle(message).await?; return Ok(()); } if let MessageKind::User(message) = &message { match &message.message { UserMessageKind::Request(request) => match request { UserMessageRequest::DisplayName(request) => { unimplemented!() } UserMessageRequest::DisplayImage(request) => { unimplemented!() } UserMessageRequest::Bio(request) => { unimplemented!() } UserMessageRequest::Repositories(request) => { unimplemented!() } }, UserMessageKind::Response(_) => unreachable!(), } } Ok(()) } async fn send_and_get_listener( message: MessageKind, listeners: &Arc>, connections: &Arc>, ) -> Receiver { let (instance, user, repository): (Option, Option, Option) = match &message { MessageKind::Handshake(_) => { todo!() } MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())), MessageKind::Authentication(_) => todo!(), MessageKind::Discovery(_) => todo!(), MessageKind::User(user) => todo!(), MessageKind::Error(_) => todo!(), }; let target = match (&instance, &user, &repository) { (Some(instance), _, _) => instance.clone(), (_, Some(user), _) => user.instance.clone(), (_, _, Some(repository)) => repository.instance.clone(), _ => unreachable!(), }; let mut listeners = listeners.lock().await; let listener = listeners.add(instance, user, repository); drop(listeners); let connections = connections.lock().await; if let Some(connection) = connections.instance_connections.get(&target) { if let Err(_) = connection.sender.send(message) { error!("Error sending message."); } } else { error!("Unable to message {}, this is a bug.", target.url); panic!(); } drop(connections); listener } async fn send( socket: &mut WebSocketStream, message: T, ) -> Result<(), Error> { socket .send(Message::Binary(serde_json::to_vec(&message)?)) .await?; Ok(()) } #[derive(Debug, thiserror::Error)] #[error("handler did not handle")] pub struct HandlerUnhandled; pub trait MessageHandling { fn message_type() -> &'static str; } impl MessageHandling<(T1,), M, R> for F where F: FnOnce(T1) -> R, T1: Serialize + DeserializeOwned, { fn message_type() -> &'static str { type_name::() } } impl MessageHandling<(T1, T2), M, R> for F where F: FnOnce(T1, T2) -> R, T1: Serialize + DeserializeOwned, { fn message_type() -> &'static str { type_name::() } } impl MessageHandling<(T1, T2, T3), M, R> for F where F: FnOnce(T1, T2, T3) -> R, T1: Serialize + DeserializeOwned, { fn message_type() -> &'static str { type_name::() } } impl MessageHandling<(T1, T2, T3, T4), M, R> for F where F: FnOnce(T1, T2, T3, T4) -> R, T1: Serialize + DeserializeOwned, { fn message_type() -> &'static str { type_name::() } }