1 |
use std::{
|
2 |
net::SocketAddr,
|
3 |
sync::{atomic::AtomicBool, Arc},
|
4 |
};
|
5 |
|
6 |
use anyhow::Error;
|
7 |
use futures_util::SinkExt;
|
8 |
use serde::Serialize;
|
9 |
use tokio::{net::TcpStream, sync::Mutex};
|
10 |
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
11 |
|
12 |
use crate::{
|
13 |
authentication::AuthenticationTokenGranter,
|
14 |
backend::{DiscoveryBackend, RepositoryBackend, UserBackend},
|
15 |
connection::ConnectionError,
|
16 |
listener::Listeners,
|
17 |
model::instance::Instance,
|
18 |
};
|
19 |
|
20 |
use super::{connection_worker, Connections};
|
21 |
|
22 |
pub async fn connection_wrapper(
|
23 |
mut socket: WebSocketStream<TcpStream>,
|
24 |
listeners: Arc<Mutex<Listeners>>,
|
25 |
connections: Arc<Mutex<Connections>>,
|
26 |
repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
|
27 |
user_backend: Arc<Mutex<dyn UserBackend + Send>>,
|
28 |
auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
|
29 |
discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
|
30 |
addr: SocketAddr,
|
31 |
) {
|
32 |
let mut handshaked = false;
|
33 |
loop {
|
34 |
if let Err(e) = connection_worker(
|
35 |
&mut socket,
|
36 |
&mut handshaked,
|
37 |
&listeners,
|
38 |
&connections,
|
39 |
&repository_backend,
|
40 |
&user_backend,
|
41 |
&auth_granter,
|
42 |
&discovery_backend,
|
43 |
&addr,
|
44 |
)
|
45 |
.await
|
46 |
{
|
47 |
error!("Error handling message: {:?}", e);
|
48 |
|
49 |
if let ConnectionError::Shutdown = &e {
|
50 |
info!("Closing connection {}", addr);
|
51 |
return;
|
52 |
}
|
53 |
}
|
54 |
}
|
55 |
}
|
56 |
|
57 |
#[derive(Clone)]
|
58 |
pub struct ConnectionState {
|
59 |
socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
|
60 |
pub listeners: Arc<Mutex<Listeners>>,
|
61 |
pub connections: Arc<Mutex<Connections>>,
|
62 |
pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
|
63 |
pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
|
64 |
pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
|
65 |
pub discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>>,
|
66 |
pub addr: SocketAddr,
|
67 |
pub instance: Instance,
|
68 |
pub handshaked: Arc<AtomicBool>,
|
69 |
}
|
70 |
|
71 |
impl ConnectionState {
|
72 |
pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
|
73 |
self.socket
|
74 |
.lock()
|
75 |
.await
|
76 |
.send(Message::Binary(serde_json::to_vec(&message)?))
|
77 |
.await?;
|
78 |
|
79 |
Ok(())
|
80 |
}
|
81 |
}
|
82 |
|