JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Add support for networked GetSetting to Unified Stack refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨da6d78e

⁨giterated-daemon/src/main.rs⁩ - ⁨6250⁩ bytes
Raw
1 use anyhow::Error;
2 use connection::{Connections, RawConnection};
3 use giterated_daemon::{
4 authentication::AuthenticationTokenGranter,
5 backend::{
6 git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend,
7 },
8 connection::{self, wrapper::connection_wrapper},
9 database_backend::DatabaseBackend,
10 federation::connections::InstanceConnections,
11 };
12
13 use giterated_models::instance::Instance;
14
15 use giterated_stack::{GiteratedStack, StackOperationState};
16 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
17 use std::{net::SocketAddr, str::FromStr, sync::Arc};
18 use tokio::{
19 fs::File,
20 io::{AsyncRead, AsyncReadExt, AsyncWrite},
21 net::{TcpListener, TcpStream},
22 sync::Mutex,
23 };
24 use tokio_tungstenite::{accept_async, WebSocketStream};
25 use tokio_util::task::LocalPoolHandle;
26 use toml::Table;
27
28 #[macro_use]
29 extern crate tracing;
30
31 #[tokio::main]
32 async fn main() -> Result<(), Error> {
33 tracing_subscriber::fmt::init();
34 let config: Table = {
35 let mut file = File::open("Giterated.toml").await?;
36 let mut text = String::new();
37 file.read_to_string(&mut text).await?;
38 text.parse()?
39 };
40 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
41 let connections: Arc<Mutex<Connections>> = Arc::default();
42 let instance_connections: Arc<Mutex<InstanceConnections>> = Arc::default();
43 let db_conn_options = PgConnectOptions::new()
44 .host(config["postgres"]["host"].as_str().unwrap())
45 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
46 .database(config["postgres"]["database"].as_str().unwrap())
47 .username(config["postgres"]["user"].as_str().unwrap())
48 .password(config["postgres"]["password"].as_str().unwrap())
49 .log_statements(log::LevelFilter::Off);
50 let db_pool = PgPool::connect_with(db_conn_options).await?;
51
52 debug!("Running database migrations...");
53 sqlx::migrate!().run(&db_pool).await?;
54 info!("Connected");
55
56 let settings = Arc::new(Mutex::new(DatabaseSettings {
57 pg_pool: db_pool.clone(),
58 }));
59
60 let repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>> =
61 Arc::new(Mutex::new(GitBackend {
62 pg_pool: db_pool.clone(),
63 repository_folder: String::from(
64 config["giterated"]["backend"]["git"]["root"]
65 .as_str()
66 .unwrap(),
67 ),
68 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
69 .unwrap(),
70 settings_provider: settings.clone(),
71 }));
72
73 let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
74 config: config.clone(),
75 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
76 }));
77
78 let user_backend: Arc<Mutex<dyn UserBackend + Send>> = Arc::new(Mutex::new(UserAuth::new(
79 db_pool.clone(),
80 &Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
81 token_granter.clone(),
82 settings.clone(),
83 )));
84
85 info!("Connected");
86
87 let database_backend = DatabaseBackend::new(
88 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
89 user_backend.clone(),
90 repository_backend.clone(),
91 );
92
93 let mut runtime = GiteratedStack::default();
94
95 let database_backend = database_backend.into_substack();
96 runtime.merge_builder(database_backend);
97
98 let runtime = Arc::new(runtime);
99
100 let operation_state = {
101 StackOperationState {
102 our_instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
103 .unwrap(),
104 runtime: runtime.clone(),
105 instance: None,
106 user: None,
107 }
108 };
109
110 let pool = LocalPoolHandle::new(5);
111
112 loop {
113 let stream = accept_stream(&mut listener).await;
114 info!("Connected");
115
116 let (stream, address) = match stream {
117 Ok(stream) => stream,
118 Err(err) => {
119 error!("Failed to accept connection. {:?}", err);
120 continue;
121 }
122 };
123
124 info!("Accepted connection from {}", address);
125
126 let connection = accept_websocket_connection(stream).await;
127
128 let connection = match connection {
129 Ok(connection) => connection,
130 Err(err) => {
131 error!(
132 "Failed to initiate Websocket connection from {}. {:?}",
133 address, err
134 );
135 continue;
136 }
137 };
138
139 info!("Websocket connection established with {}", address);
140 let connections_cloned = connections.clone();
141 let repository_backend = repository_backend.clone();
142 let user_backend = user_backend.clone();
143 let token_granter = token_granter.clone();
144 let settings = settings.clone();
145 let instance_connections = instance_connections.clone();
146 let config = config.clone();
147 let runtime = runtime.clone();
148 let operation_state = operation_state.clone();
149
150 pool.spawn_pinned(move || {
151 connection_wrapper(
152 connection,
153 connections_cloned,
154 repository_backend,
155 user_backend,
156 token_granter,
157 settings,
158 address,
159 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
160 instance_connections,
161 config,
162 runtime,
163 operation_state,
164 )
165 });
166
167 let connection = RawConnection {
168 task: tokio::spawn(async move { () }),
169 };
170
171 connections.lock().await.connections.push(connection);
172 }
173 }
174
175 async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
176 let stream = listener.accept().await?;
177
178 Ok(stream)
179 }
180
181 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
182 stream: S,
183 ) -> Result<WebSocketStream<S>, Error> {
184 let connection = accept_async(stream).await?;
185
186 Ok(connection)
187 }
188