JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Finish unified stack refactor.

Adds support for operation state, which will be used to pass authentication information around. Added generic backend that uses a channel to communicate with a typed backend.

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨d15581c

⁨giterated-daemon/src/main.rs⁩ - ⁨5461⁩ bytes
Raw
1 use anyhow::Error;
2 use connection::{Connections, RawConnection};
3 use giterated_daemon::{
4 authentication::AuthenticationTokenGranter,
5 backend::{
6 git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend,
7 },
8 connection::{self, wrapper::connection_wrapper},
9 database_backend::DatabaseBackend,
10 federation::connections::InstanceConnections,
11 };
12
13 use giterated_models::instance::Instance;
14
15 use giterated_stack::{BackendWrapper, StackOperationState};
16 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
17 use std::{net::SocketAddr, str::FromStr, sync::Arc};
18 use tokio::{
19 fs::File,
20 io::{AsyncRead, AsyncReadExt, AsyncWrite},
21 net::{TcpListener, TcpStream},
22 sync::Mutex,
23 };
24 use tokio_tungstenite::{accept_async, WebSocketStream};
25 use toml::Table;
26
27 #[macro_use]
28 extern crate tracing;
29
30 #[tokio::main]
31 async fn main() -> Result<(), Error> {
32 tracing_subscriber::fmt::init();
33 let config: Table = {
34 let mut file = File::open("Giterated.toml").await?;
35 let mut text = String::new();
36 file.read_to_string(&mut text).await?;
37 text.parse()?
38 };
39 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
40 let connections: Arc<Mutex<Connections>> = Arc::default();
41 let instance_connections: Arc<Mutex<InstanceConnections>> = Arc::default();
42 let db_conn_options = PgConnectOptions::new()
43 .host(config["postgres"]["host"].as_str().unwrap())
44 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
45 .database(config["postgres"]["database"].as_str().unwrap())
46 .username(config["postgres"]["user"].as_str().unwrap())
47 .password(config["postgres"]["password"].as_str().unwrap())
48 .log_statements(log::LevelFilter::Off);
49 let db_pool = PgPool::connect_with(db_conn_options).await?;
50
51 debug!("Running database migrations...");
52 sqlx::migrate!().run(&db_pool).await?;
53 info!("Connected");
54
55 let settings = Arc::new(Mutex::new(DatabaseSettings {
56 pg_pool: db_pool.clone(),
57 }));
58
59 let repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>> =
60 Arc::new(Mutex::new(GitBackend {
61 pg_pool: db_pool.clone(),
62 repository_folder: String::from(
63 config["giterated"]["backend"]["git"]["root"]
64 .as_str()
65 .unwrap(),
66 ),
67 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
68 .unwrap(),
69 settings_provider: settings.clone(),
70 }));
71
72 let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
73 config: config.clone(),
74 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
75 }));
76
77 let user_backend: Arc<Mutex<dyn UserBackend + Send>> = Arc::new(Mutex::new(UserAuth::new(
78 db_pool.clone(),
79 &Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
80 token_granter.clone(),
81 settings.clone(),
82 )));
83
84 info!("Connected");
85
86 let database_backend = DatabaseBackend::new(
87 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
88 user_backend.clone(),
89 repository_backend.clone(),
90 );
91
92 let backend = database_backend.into_backend();
93
94 let backend_wrapper = BackendWrapper::new(backend.clone());
95
96 let operation_state = {
97 StackOperationState {
98 giterated_backend: backend_wrapper,
99 }
100 };
101
102 loop {
103 let stream = accept_stream(&mut listener).await;
104 info!("Connected");
105
106 let (stream, address) = match stream {
107 Ok(stream) => stream,
108 Err(err) => {
109 error!("Failed to accept connection. {:?}", err);
110 continue;
111 }
112 };
113
114 info!("Accepted connection from {}", address);
115
116 let connection = accept_websocket_connection(stream).await;
117
118 let connection = match connection {
119 Ok(connection) => connection,
120 Err(err) => {
121 error!(
122 "Failed to initiate Websocket connection from {}. {:?}",
123 address, err
124 );
125 continue;
126 }
127 };
128
129 info!("Websocket connection established with {}", address);
130
131 let connection = RawConnection {
132 task: tokio::spawn(connection_wrapper(
133 connection,
134 connections.clone(),
135 repository_backend.clone(),
136 user_backend.clone(),
137 token_granter.clone(),
138 settings.clone(),
139 address,
140 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
141 instance_connections.clone(),
142 config.clone(),
143 backend.clone(),
144 operation_state.clone(),
145 )),
146 };
147
148 connections.lock().await.connections.push(connection);
149 }
150 }
151
152 async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
153 let stream = listener.accept().await?;
154
155 Ok(stream)
156 }
157
158 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
159 stream: S,
160 ) -> Result<WebSocketStream<S>, Error> {
161 let connection = accept_async(stream).await?;
162
163 Ok(connection)
164 }
165