use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use futures_util::{stream::StreamExt, SinkExt, TryStreamExt}; use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpStream, sync::{ broadcast::{Receiver, Sender}, Mutex, }, task::JoinHandle, }; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::{ command::{ issues::IssuesCountResponse, repository::{ RepositoryFileInspectionResponse, RepositoryIssueLabelsResponse, RepositoryIssuesResponse, RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse, }, MessageKind, }, handshake::{HandshakeFinalize, HandshakeMessage, HandshakeResponse, InitiateHandshake}, listener::Listeners, model::{ instance::{Instance, InstanceMeta}, repository::{CommitMetadata, RepositoryView}, }, }; pub struct RawConnection { pub task: JoinHandle<()>, } pub struct InstanceConnection { pub instance: InstanceMeta, pub sender: Sender, pub task: JoinHandle<()>, } /// Represents a connection which hasn't finished the handshake. pub struct UnestablishedConnection { pub socket: WebSocketStream, } #[derive(Default)] pub struct Connections { pub connections: Vec, pub instance_connections: HashMap, } pub async fn connection_worker( mut socket: WebSocketStream, listeners: Arc>, mut connections: Arc>, addr: SocketAddr, ) { let mut handshaked = false; let this_instance = Instance { url: String::from("FOO"), }; while let Some(message) = socket.next().await { let message = match message { Ok(message) => message, Err(err) => { error!("Error reading message: {:?}", err); continue; } }; let payload = match message { Message::Text(text) => text.into_bytes(), Message::Binary(bytes) => bytes, Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { info!("Closing connection with {}.", addr); return; } _ => unreachable!(), }; let message = match serde_json::from_slice::(&payload) { Ok(message) => message, Err(err) => { error!("Error deserializing message from {}: {:?}", addr, err); continue; } }; if let MessageKind::Handshake(handshake) = message { match handshake { HandshakeMessage::Initiate(_) => { // Send HandshakeMessage::Response let message = HandshakeResponse { identity: Instance { url: String::from("foo.com"), }, version: String::from("0.1.0"), }; socket .send(Message::Binary( serde_json::to_vec(&HandshakeMessage::Response(message)).unwrap(), )) .await .unwrap(); continue; } HandshakeMessage::Response(_) => { // Send HandshakeMessage::Finalize let message = HandshakeFinalize { success: true }; socket .send(Message::Binary( serde_json::to_vec(&HandshakeMessage::Finalize(message)).unwrap(), )) .await .unwrap(); continue; } HandshakeMessage::Finalize(_) => { handshaked = true; continue; } } } if !handshaked { continue; } if let MessageKind::Repository(repository) = &message { if repository.target.instance != this_instance { // We need to send this command to a different instance let mut listener = send_and_get_listener(message, &listeners, &connections).await; // Wait for response while let Ok(message) = listener.recv().await { if let MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(_), .. }) = message { socket .send(Message::Binary(serde_json::to_vec(&message).unwrap())) .await .unwrap(); } } } else { // This message is targeting this instance match &repository.command { RepositoryMessageKind::Request(request) => match request { RepositoryRequest::CreateRepository(_) => todo!(), RepositoryRequest::RepositoryFileInspection(_) => { let response = RepositoryFileInspectionResponse::File { commit_metadata: CommitMetadata::default(), }; } RepositoryRequest::RepositoryInfo(_) => { let response = RepositoryView { name: String::from("Nederland"), description: String::from("ik hou van het nederland"), default_branch: String::from("nederland"), latest_commit: CommitMetadata::default(), files: vec![], }; socket .send(Message::Binary( serde_json::to_vec(&MessageKind::Repository( RepositoryMessage { target: repository.target.clone(), command: RepositoryMessageKind::Response( RepositoryResponse::RepositoryInfo(response), ), }, )) .unwrap(), )) .await .unwrap(); } RepositoryRequest::IssuesCount(_) => { let response: IssuesCountResponse = IssuesCountResponse { count: 727420 }; socket .send(Message::Binary( serde_json::to_vec(&MessageKind::Repository( RepositoryMessage { target: repository.target.clone(), command: RepositoryMessageKind::Response( RepositoryResponse::IssuesCount(response), ), }, )) .unwrap(), )) .await .unwrap(); } RepositoryRequest::IssueLabels(_) => { let response = RepositoryIssueLabelsResponse { labels: vec![] }; socket .send(Message::Binary( serde_json::to_vec(&MessageKind::Repository( RepositoryMessage { target: repository.target.clone(), command: RepositoryMessageKind::Response( RepositoryResponse::IssueLabels(response), ), }, )) .unwrap(), )) .await .unwrap(); } RepositoryRequest::Issues(_) => { let response = RepositoryIssuesResponse { issues: vec![] }; socket .send(Message::Binary( serde_json::to_vec(&MessageKind::Repository( RepositoryMessage { target: repository.target.clone(), command: RepositoryMessageKind::Response( RepositoryResponse::Issues(response), ), }, )) .unwrap(), )) .await .unwrap(); } }, RepositoryMessageKind::Response(response) => { unreachable!() } } } } } } async fn send_and_get_listener( message: MessageKind, listeners: &Arc>, mut connections: &Arc>, ) -> Receiver { let (instance, user, repository) = match &message { MessageKind::Handshake(_) => { todo!() } MessageKind::Repository(repository) => (None, None, Some(repository.target.clone())), }; let target = todo!(); let mut listeners = listeners.lock().await; let mut listener = listeners.add(instance, user, repository); drop(listeners); let connections = connections.lock().await; if let Some(connection) = connections.instance_connections.get(&target) { connection.sender.send(message); } else { error!("Unable to message {}, this is a bug.", target.url); panic!(); } drop(connections); listener }