JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Unified stack `GetValue` implementation

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨325f5af

⁨giterated-daemon/src/main.rs⁩ - ⁨6270⁩ bytes
Raw
1 use anyhow::Error;
2 use connection::{Connections, RawConnection};
3 use giterated_daemon::{
4 authentication::AuthenticationTokenGranter,
5 backend::{
6 git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend,
7 },
8 connection::{self, wrapper::connection_wrapper},
9 database_backend::DatabaseBackend,
10 federation::connections::InstanceConnections,
11 };
12
13 use giterated_models::instance::Instance;
14
15 use giterated_stack::{GiteratedStack, StackOperationState};
16 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
17 use std::{net::SocketAddr, str::FromStr, sync::Arc};
18 use tokio::{
19 fs::File,
20 io::{AsyncRead, AsyncReadExt, AsyncWrite},
21 net::{TcpListener, TcpStream},
22 sync::Mutex,
23 task::LocalSet,
24 };
25 use tokio_tungstenite::{accept_async, WebSocketStream};
26 use tokio_util::task::LocalPoolHandle;
27 use toml::Table;
28
29 #[macro_use]
30 extern crate tracing;
31
32 #[tokio::main]
33 async fn main() -> Result<(), Error> {
34 tracing_subscriber::fmt::init();
35 let config: Table = {
36 let mut file = File::open("Giterated.toml").await?;
37 let mut text = String::new();
38 file.read_to_string(&mut text).await?;
39 text.parse()?
40 };
41 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
42 let connections: Arc<Mutex<Connections>> = Arc::default();
43 let instance_connections: Arc<Mutex<InstanceConnections>> = Arc::default();
44 let db_conn_options = PgConnectOptions::new()
45 .host(config["postgres"]["host"].as_str().unwrap())
46 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
47 .database(config["postgres"]["database"].as_str().unwrap())
48 .username(config["postgres"]["user"].as_str().unwrap())
49 .password(config["postgres"]["password"].as_str().unwrap())
50 .log_statements(log::LevelFilter::Off);
51 let db_pool = PgPool::connect_with(db_conn_options).await?;
52
53 debug!("Running database migrations...");
54 sqlx::migrate!().run(&db_pool).await?;
55 info!("Connected");
56
57 let settings = Arc::new(Mutex::new(DatabaseSettings {
58 pg_pool: db_pool.clone(),
59 }));
60
61 let repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>> =
62 Arc::new(Mutex::new(GitBackend {
63 pg_pool: db_pool.clone(),
64 repository_folder: String::from(
65 config["giterated"]["backend"]["git"]["root"]
66 .as_str()
67 .unwrap(),
68 ),
69 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
70 .unwrap(),
71 settings_provider: settings.clone(),
72 }));
73
74 let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
75 config: config.clone(),
76 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
77 }));
78
79 let user_backend: Arc<Mutex<dyn UserBackend + Send>> = Arc::new(Mutex::new(UserAuth::new(
80 db_pool.clone(),
81 &Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
82 token_granter.clone(),
83 settings.clone(),
84 )));
85
86 info!("Connected");
87
88 let database_backend = DatabaseBackend::new(
89 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
90 user_backend.clone(),
91 repository_backend.clone(),
92 );
93
94 let mut runtime = GiteratedStack::default();
95
96 let database_backend = database_backend.into_substack();
97 runtime.merge_builder(database_backend);
98
99 let runtime = Arc::new(runtime);
100
101 let operation_state = {
102 StackOperationState {
103 our_instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
104 .unwrap(),
105 runtime: runtime.clone(),
106 instance: None,
107 user: None,
108 }
109 };
110
111 let pool = LocalPoolHandle::new(5);
112
113 loop {
114 let stream = accept_stream(&mut listener).await;
115 info!("Connected");
116
117 let (stream, address) = match stream {
118 Ok(stream) => stream,
119 Err(err) => {
120 error!("Failed to accept connection. {:?}", err);
121 continue;
122 }
123 };
124
125 info!("Accepted connection from {}", address);
126
127 let connection = accept_websocket_connection(stream).await;
128
129 let connection = match connection {
130 Ok(connection) => connection,
131 Err(err) => {
132 error!(
133 "Failed to initiate Websocket connection from {}. {:?}",
134 address, err
135 );
136 continue;
137 }
138 };
139
140 info!("Websocket connection established with {}", address);
141 let connections_cloned = connections.clone();
142 let repository_backend = repository_backend.clone();
143 let user_backend = user_backend.clone();
144 let token_granter = token_granter.clone();
145 let settings = settings.clone();
146 let instance_connections = instance_connections.clone();
147 let config = config.clone();
148 let runtime = runtime.clone();
149 let operation_state = operation_state.clone();
150
151 pool.spawn_pinned(move || {
152 connection_wrapper(
153 connection,
154 connections_cloned,
155 repository_backend,
156 user_backend,
157 token_granter,
158 settings,
159 address,
160 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
161 instance_connections,
162 config,
163 runtime,
164 operation_state,
165 )
166 });
167
168 let connection = RawConnection {
169 task: tokio::spawn(async move { () }),
170 };
171
172 connections.lock().await.connections.push(connection);
173 }
174 }
175
176 async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
177 let stream = listener.accept().await?;
178
179 Ok(stream)
180 }
181
182 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
183 stream: S,
184 ) -> Result<WebSocketStream<S>, Error> {
185 let connection = accept_async(stream).await?;
186
187 Ok(connection)
188 }
189