use std::net::SocketAddr; use deadpool::managed::{Manager, RecycleError, RecycleResult, Metrics}; use futures_util::SinkExt; use giterated_models::instance::Instance; use tokio_tungstenite::{connect_async, tungstenite::Message}; use crate::{handshake::GiteratedConnectionHandshaker, Socket}; pub struct GiteratedConnectionPool { pub target_instance: Instance, pub socket_addr: Option, } #[async_trait::async_trait] impl Manager for GiteratedConnectionPool { type Type = Socket; type Error = anyhow::Error; async fn create(&self) -> Result { info!("Creating new Daemon connection"); let connection = connect_to(&self.target_instance, &self.socket_addr).await?; // Handshake first! let connection = GiteratedConnectionHandshaker::new(connection) .handshake() .await?; Ok(connection) } async fn recycle(&self, socket: &mut Socket, _metrics: &Metrics) -> RecycleResult { match socket.send(Message::Ping(vec![])).await { Ok(_) => Ok(()), Err(err) => { info!("Socket died!"); Err(RecycleError::Backend(err.into())) } } } } async fn connect_to( instance: &Instance, socket_addr: &Option, ) -> Result { if let Some(addr) = socket_addr { info!( "Connecting to {}", format!("ws://{}/.giterated/daemon/", addr) ); let (websocket, _response) = connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; info!("Connection established with {}", addr); Ok(websocket) } else { info!( "Connecting to {}", format!("wss://{}/.giterated/daemon/", instance.url) ); let (websocket, _response) = connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?; info!("Connection established with {}", instance.url); Ok(websocket) } }