use std::{ net::SocketAddr, sync::{atomic::AtomicBool, Arc}, }; use anyhow::Error; use futures_util::SinkExt; use serde::Serialize; use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::{ authentication::AuthenticationTokenGranter, backend::{DiscoveryBackend, RepositoryBackend, UserBackend}, connection::ConnectionError, listener::Listeners, model::instance::Instance, }; use super::{connection_worker, Connections}; pub async fn connection_wrapper( mut socket: WebSocketStream, listeners: Arc>, connections: Arc>, repository_backend: Arc>, user_backend: Arc>, auth_granter: Arc>, discovery_backend: Arc>, addr: SocketAddr, ) { let mut handshaked = false; loop { if let Err(e) = connection_worker( &mut socket, &mut handshaked, &listeners, &connections, &repository_backend, &user_backend, &auth_granter, &discovery_backend, &addr, ) .await { error!("Error handling message: {:?}", e); if let ConnectionError::Shutdown = &e { info!("Closing connection {}", addr); return; } } } } #[derive(Clone)] pub struct ConnectionState { socket: Arc>>, pub listeners: Arc>, pub connections: Arc>, pub repository_backend: Arc>, pub user_backend: Arc>, pub auth_granter: Arc>, pub discovery_backend: Arc>, pub addr: SocketAddr, pub instance: Instance, pub handshaked: Arc, } impl ConnectionState { pub async fn send(&self, message: T) -> Result<(), Error> { self.socket .lock() .await .send(Message::Binary(serde_json::to_vec(&message)?)) .await?; Ok(()) } }