use std::{error::Error, net::SocketAddr, str::FromStr}; use futures_util::{SinkExt, StreamExt}; use giterated_daemon::{ command::{ repository::{ CreateRepositoryCommand, RepositoryInfoRequest, RepositoryMessage, RepositoryMessageKind, RepositoryRequest, RepositoryResponse, }, MessageKind, }, handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake}, model::{ instance::Instance, repository::{Repository, RepositoryView}, }, }; use serde::Serialize; use tokio::{ io::{AsyncRead, AsyncWrite}, net::{TcpSocket, TcpStream}, }; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; type Socket = WebSocketStream>; #[macro_use] extern crate tracing; pub struct GiteratedApi; impl GiteratedApi { pub async fn create_repository( target: Instance, request: CreateRepositoryCommand, ) -> Result> { let mut socket = Self::connect_to(&target.url).await?; let target = Repository { name: request.name.clone(), instance: target, }; Self::send_message( &MessageKind::Repository(RepositoryMessage { target, command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository( request, )), }), &mut socket, ) .await?; while let Ok(payload) = Self::next_payload(&mut socket).await { if let Ok(MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), .. })) = serde_json::from_slice(&payload) { return Ok(true); } } unreachable!() } pub async fn repository_info(repository: Repository) -> Result> { let mut socket = Self::connect_to(&repository.instance.url).await?; Self::send_message( &MessageKind::Repository(RepositoryMessage { target: repository.clone(), command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo( RepositoryInfoRequest, )), }), &mut socket, ) .await?; while let Ok(payload) = Self::next_payload(&mut socket).await { if let Ok(MessageKind::Repository(RepositoryMessage { command: RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), .. })) = serde_json::from_slice(&payload) { return Ok(response); } } unreachable!() } async fn connect_to(url: impl ToString) -> Result> { let url = url.to_string(); let (mut websocket, _response) = connect_async(&url).await?; Self::handle_handshake(&mut websocket).await?; Ok(websocket) } async fn handle_handshake(socket: &mut Socket) -> Result<(), Box> { // Send handshake initiation Self::send_message( &MessageKind::Handshake(HandshakeMessage::Initiate(InitiateHandshake { identity: Instance { url: String::from("foo.com"), }, version: String::from("0.1.0"), })), socket, ) .await?; while let Some(message) = socket.next().await { let message = match message { Ok(message) => message, Err(err) => { error!("Error reading message: {:?}", err); continue; } }; let payload = match message { Message::Text(text) => text.into_bytes(), Message::Binary(bytes) => bytes, Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { panic!() } _ => unreachable!(), }; let message = match serde_json::from_slice::(&payload) { Ok(message) => message, Err(err) => { error!("Error deserializing message: {:?}", err); continue; } }; if let MessageKind::Handshake(handshake) = message { match handshake { HandshakeMessage::Initiate(_) => unimplemented!(), HandshakeMessage::Response(_) => { // Send HandshakeMessage::Finalize Self::send_message( &MessageKind::Handshake(HandshakeMessage::Finalize( HandshakeFinalize { success: true }, )), socket, ) .await?; } HandshakeMessage::Finalize(finalize) => { if finalize.success { return Ok(()); } else { panic!() } } } } } Ok(()) } async fn send_message( message: &T, socket: &mut Socket, ) -> Result<(), Box> { socket .send(Message::Binary(serde_json::to_vec(&message).unwrap())) .await?; Ok(()) } async fn next_payload(socket: &mut Socket) -> Result, Box> { while let Some(message) = socket.next().await { let message = message?; match message { Message::Text(text) => return Ok(text.into_bytes()), Message::Binary(bytes) => return Ok(bytes), Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { panic!() } _ => unreachable!(), } } unreachable!() } }