JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Huge refactor to prep for moving the daemon over to the plugin architecture

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨5df753c

⁨giterated-daemon/src/main.rs⁩ - ⁨3664⁩ bytes
Raw
1 use anyhow::Error;
2 use giterated_daemon::{authentication::AuthenticationTokenGranter, client::client_wrapper};
3
4 use giterated_models::instance::Instance;
5
6 use giterated_plugin::new_stack::Runtime;
7 use giterated_protocol::NetworkedSubstack;
8 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
9 use std::{net::SocketAddr, str::FromStr, sync::Arc};
10 use tokio::{
11 fs::File,
12 io::{AsyncRead, AsyncReadExt, AsyncWrite},
13 net::{TcpListener, TcpStream},
14 sync::{Mutex, OnceCell},
15 };
16 use tokio_tungstenite::{accept_async, WebSocketStream};
17 use tokio_util::task::LocalPoolHandle;
18 use toml::Table;
19
20 #[macro_use]
21 extern crate tracing;
22
23 #[tokio::main]
24 async fn main() -> Result<(), Error> {
25 tracing_subscriber::fmt::init();
26 let config: Table = {
27 let mut file = File::open("Giterated.toml").await?;
28 let mut text = String::new();
29 file.read_to_string(&mut text).await?;
30 text.parse()?
31 };
32 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
33 let db_conn_options = PgConnectOptions::new()
34 .host(config["postgres"]["host"].as_str().unwrap())
35 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
36 .database(config["postgres"]["database"].as_str().unwrap())
37 .username(config["postgres"]["user"].as_str().unwrap())
38 .password(config["postgres"]["password"].as_str().unwrap())
39 .log_statements(log::LevelFilter::Off);
40 let db_pool = PgPool::connect_with(db_conn_options).await?;
41
42 debug!("Running database migrations...");
43 sqlx::migrate!().run(&db_pool).await?;
44 info!("Connected");
45
46 let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
47 config: config.clone(),
48 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
49 }));
50
51 info!("Connected");
52
53 let mut runtime = Runtime::default();
54
55 let networked_stack = NetworkedSubstack {
56 home_uri: Some(
57 Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
58 .unwrap()
59 .0,
60 ),
61 };
62
63 let runtime = Arc::new(runtime);
64
65 let pool = LocalPoolHandle::new(5);
66
67 loop {
68 let stream = accept_stream(&mut listener).await;
69 info!("Connected");
70
71 let (stream, address) = match stream {
72 Ok(stream) => stream,
73 Err(err) => {
74 error!("Failed to accept connection. {:?}", err);
75 continue;
76 }
77 };
78
79 info!("Accepted connection from {}", address);
80
81 let connection = accept_websocket_connection(stream).await;
82
83 let connection = match connection {
84 Ok(connection) => connection,
85 Err(err) => {
86 error!(
87 "Failed to initiate Websocket connection from {}. {:?}",
88 address, err
89 );
90 continue;
91 }
92 };
93
94 info!("Websocket connection established with {}", address);
95
96 let our_instance =
97 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap();
98 let runtime = runtime.clone();
99
100 pool.spawn_pinned(move || client_wrapper(our_instance, connection, runtime));
101 }
102 }
103
104 async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
105 let stream = listener.accept().await?;
106
107 Ok(stream)
108 }
109
110 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
111 stream: S,
112 ) -> Result<WebSocketStream<S>, Error> {
113 let connection = accept_async(stream).await?;
114
115 Ok(connection)
116 }
117