1 |
use anyhow::Error;
|
2 |
use connection::{connection_worker, Connections, RawConnection};
|
3 |
use giterated_daemon::{
|
4 |
authentication::AuthenticationTokenGranter,
|
5 |
backend::{
|
6 |
discovery::GiteratedDiscoveryProtocol, git::GitBackend, user::UserAuth, DiscoveryBackend,
|
7 |
RepositoryBackend, UserBackend,
|
8 |
},
|
9 |
connection::{self, wrapper::connection_wrapper},
|
10 |
listener,
|
11 |
model::instance::Instance,
|
12 |
};
|
13 |
use listener::Listeners;
|
14 |
use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
|
15 |
use std::{net::SocketAddr, str::FromStr, sync::Arc};
|
16 |
use tokio::{
|
17 |
fs::File,
|
18 |
io::{AsyncRead, AsyncReadExt, AsyncWrite},
|
19 |
net::{TcpListener, TcpStream},
|
20 |
sync::Mutex,
|
21 |
};
|
22 |
use tokio_tungstenite::{accept_async, WebSocketStream};
|
23 |
use toml::Table;
|
24 |
|
25 |
#[macro_use]
|
26 |
extern crate tracing;
|
27 |
|
28 |
#[tokio::main]
|
29 |
async fn main() -> Result<(), Error> {
|
30 |
tracing_subscriber::fmt::init();
|
31 |
let mut listener = TcpListener::bind("0.0.0.0:7270").await?;
|
32 |
let connections: Arc<Mutex<Connections>> = Arc::default();
|
33 |
let listeners: Arc<Mutex<Listeners>> = Arc::default();
|
34 |
let config: Table = {
|
35 |
let mut file = File::open("Giterated.toml").await?;
|
36 |
let mut text = String::new();
|
37 |
file.read_to_string(&mut text).await?;
|
38 |
text.parse()?
|
39 |
};
|
40 |
let db_conn_options = PgConnectOptions::new()
|
41 |
.host(config["postgres"]["host"].as_str().unwrap())
|
42 |
.port(config["postgres"]["port"].as_integer().unwrap() as u16)
|
43 |
.database(config["postgres"]["database"].as_str().unwrap())
|
44 |
.username(config["postgres"]["user"].as_str().unwrap())
|
45 |
.password(config["postgres"]["password"].as_str().unwrap())
|
46 |
.log_statements(log::LevelFilter::Off);
|
47 |
let db_pool = PgPool::connect_with(db_conn_options).await?;
|
48 |
|
49 |
debug!("Running database migrations...");
|
50 |
sqlx::migrate!().run(&db_pool).await?;
|
51 |
info!("Connected");
|
52 |
|
53 |
let repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>> =
|
54 |
Arc::new(Mutex::new(GitBackend {
|
55 |
pg_pool: db_pool.clone(),
|
56 |
repository_folder: String::from(
|
57 |
config["giterated"]["backend"]["git"]["root"]
|
58 |
.as_str()
|
59 |
.unwrap(),
|
60 |
),
|
61 |
instance: Instance::from_str("giterated.dev").unwrap(),
|
62 |
}));
|
63 |
|
64 |
let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
|
65 |
config: config.clone(),
|
66 |
instance: Instance::from_str("giterated.dev").unwrap(),
|
67 |
}));
|
68 |
|
69 |
let user_backend: Arc<Mutex<dyn UserBackend + Send>> = Arc::new(Mutex::new(UserAuth::new(
|
70 |
db_pool.clone(),
|
71 |
&Instance::from_str("giterated.dev").unwrap(),
|
72 |
token_granter.clone(),
|
73 |
)));
|
74 |
|
75 |
let discovery_backend: Arc<Mutex<dyn DiscoveryBackend + Send>> =
|
76 |
Arc::new(Mutex::new(GiteratedDiscoveryProtocol {
|
77 |
pool: db_pool.clone(),
|
78 |
}));
|
79 |
|
80 |
info!("Connected");
|
81 |
|
82 |
loop {
|
83 |
let stream = accept_stream(&mut listener).await;
|
84 |
info!("Connected");
|
85 |
|
86 |
let (stream, address) = match stream {
|
87 |
Ok(stream) => stream,
|
88 |
Err(err) => {
|
89 |
error!("Failed to accept connection. {:?}", err);
|
90 |
continue;
|
91 |
}
|
92 |
};
|
93 |
|
94 |
info!("Accepted connection from {}", address);
|
95 |
|
96 |
let connection = accept_websocket_connection(stream).await;
|
97 |
|
98 |
let connection = match connection {
|
99 |
Ok(connection) => connection,
|
100 |
Err(err) => {
|
101 |
error!(
|
102 |
"Failed to initiate Websocket connection from {}. {:?}",
|
103 |
address, err
|
104 |
);
|
105 |
continue;
|
106 |
}
|
107 |
};
|
108 |
|
109 |
info!("Websocket connection established with {}", address);
|
110 |
|
111 |
let connection = RawConnection {
|
112 |
task: tokio::spawn(connection_wrapper(
|
113 |
connection,
|
114 |
listeners.clone(),
|
115 |
connections.clone(),
|
116 |
repository_backend.clone(),
|
117 |
user_backend.clone(),
|
118 |
token_granter.clone(),
|
119 |
discovery_backend.clone(),
|
120 |
address,
|
121 |
)),
|
122 |
};
|
123 |
|
124 |
connections.lock().await.connections.push(connection);
|
125 |
}
|
126 |
}
|
127 |
|
128 |
async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
|
129 |
let stream = listener.accept().await?;
|
130 |
|
131 |
Ok(stream)
|
132 |
}
|
133 |
|
134 |
async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
|
135 |
stream: S,
|
136 |
) -> Result<WebSocketStream<S>, Error> {
|
137 |
let connection = accept_async(stream).await?;
|
138 |
|
139 |
Ok(connection)
|
140 |
}
|
141 |
|