JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Base protocol refactor complete

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨079d544

⁨giterated-daemon/src/main.rs⁩ - ⁨4841⁩ bytes
Raw
1 use anyhow::Error;
2 use connection::{Connections, RawConnection};
3 use giterated_daemon::{
4 authentication::AuthenticationTokenGranter,
5 backend::{
6 git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend,
7 },
8 connection::{self, wrapper::connection_wrapper},
9 federation::connections::InstanceConnections,
10 };
11 use giterated_models::model::instance::Instance;
12 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
13 use std::{net::SocketAddr, str::FromStr, sync::Arc};
14 use tokio::{
15 fs::File,
16 io::{AsyncRead, AsyncReadExt, AsyncWrite},
17 net::{TcpListener, TcpStream},
18 sync::Mutex,
19 };
20 use tokio_tungstenite::{accept_async, WebSocketStream};
21 use toml::Table;
22
23 #[macro_use]
24 extern crate tracing;
25
26 #[tokio::main]
27 async fn main() -> Result<(), Error> {
28 tracing_subscriber::fmt::init();
29 let config: Table = {
30 let mut file = File::open("Giterated.toml").await?;
31 let mut text = String::new();
32 file.read_to_string(&mut text).await?;
33 text.parse()?
34 };
35 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
36 let connections: Arc<Mutex<Connections>> = Arc::default();
37 let instance_connections: Arc<Mutex<InstanceConnections>> = Arc::default();
38 let db_conn_options = PgConnectOptions::new()
39 .host(config["postgres"]["host"].as_str().unwrap())
40 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
41 .database(config["postgres"]["database"].as_str().unwrap())
42 .username(config["postgres"]["user"].as_str().unwrap())
43 .password(config["postgres"]["password"].as_str().unwrap())
44 .log_statements(log::LevelFilter::Off);
45 let db_pool = PgPool::connect_with(db_conn_options).await?;
46
47 debug!("Running database migrations...");
48 sqlx::migrate!().run(&db_pool).await?;
49 info!("Connected");
50
51 let settings = Arc::new(Mutex::new(DatabaseSettings {
52 pg_pool: db_pool.clone(),
53 }));
54
55 let repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>> =
56 Arc::new(Mutex::new(GitBackend {
57 pg_pool: db_pool.clone(),
58 repository_folder: String::from(
59 config["giterated"]["backend"]["git"]["root"]
60 .as_str()
61 .unwrap(),
62 ),
63 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
64 .unwrap(),
65 settings_provider: settings.clone(),
66 }));
67
68 let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
69 config: config.clone(),
70 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
71 }));
72
73 let user_backend: Arc<Mutex<dyn UserBackend + Send>> = Arc::new(Mutex::new(UserAuth::new(
74 db_pool.clone(),
75 &Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
76 token_granter.clone(),
77 settings.clone(),
78 )));
79
80 info!("Connected");
81
82 loop {
83 let stream = accept_stream(&mut listener).await;
84 info!("Connected");
85
86 let (stream, address) = match stream {
87 Ok(stream) => stream,
88 Err(err) => {
89 error!("Failed to accept connection. {:?}", err);
90 continue;
91 }
92 };
93
94 info!("Accepted connection from {}", address);
95
96 let connection = accept_websocket_connection(stream).await;
97
98 let connection = match connection {
99 Ok(connection) => connection,
100 Err(err) => {
101 error!(
102 "Failed to initiate Websocket connection from {}. {:?}",
103 address, err
104 );
105 continue;
106 }
107 };
108
109 info!("Websocket connection established with {}", address);
110
111 let connection = RawConnection {
112 task: tokio::spawn(connection_wrapper(
113 connection,
114 connections.clone(),
115 repository_backend.clone(),
116 user_backend.clone(),
117 token_granter.clone(),
118 settings.clone(),
119 address,
120 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
121 instance_connections.clone(),
122 config.clone(),
123 )),
124 };
125
126 connections.lock().await.connections.push(connection);
127 }
128 }
129
130 async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
131 let stream = listener.accept().await?;
132
133 Ok(stream)
134 }
135
136 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
137 stream: S,
138 ) -> Result<WebSocketStream<S>, Error> {
139 let connection = accept_async(stream).await?;
140
141 Ok(connection)
142 }
143