use anyhow::Error; use connection::{Connections, RawConnection}; use giterated_cache::CacheSubstack; use giterated_daemon::{ authentication::AuthenticationTokenGranter, backend::{ git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend, }, connection::{self, wrapper::connection_wrapper}, database_backend::DatabaseBackend, federation::connections::InstanceConnections, }; use giterated_models::instance::Instance; use giterated_stack::{GiteratedStack, StackOperationState}; use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool}; use std::{net::SocketAddr, str::FromStr, sync::Arc}; use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt, AsyncWrite}, net::{TcpListener, TcpStream}, sync::{Mutex, OnceCell}, }; use tokio_tungstenite::{accept_async, WebSocketStream}; use tokio_util::task::LocalPoolHandle; use toml::Table; #[macro_use] extern crate tracing; #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt::init(); let config: Table = { let mut file = File::open("Giterated.toml").await?; let mut text = String::new(); file.read_to_string(&mut text).await?; text.parse()? }; let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?; let connections: Arc> = Arc::default(); let instance_connections: Arc> = Arc::default(); let db_conn_options = PgConnectOptions::new() .host(config["postgres"]["host"].as_str().unwrap()) .port(config["postgres"]["port"].as_integer().unwrap() as u16) .database(config["postgres"]["database"].as_str().unwrap()) .username(config["postgres"]["user"].as_str().unwrap()) .password(config["postgres"]["password"].as_str().unwrap()) .log_statements(log::LevelFilter::Off); let db_pool = PgPool::connect_with(db_conn_options).await?; debug!("Running database migrations..."); sqlx::migrate!().run(&db_pool).await?; info!("Connected"); let stack_cell = Arc::new(OnceCell::default()); let settings = Arc::new(Mutex::new(DatabaseSettings { pg_pool: db_pool.clone(), stack: stack_cell.clone(), })); let repository_backend: Arc> = Arc::new(Mutex::new(GitBackend { pg_pool: db_pool.clone(), repository_folder: String::from( config["giterated"]["backend"]["git"]["root"] .as_str() .unwrap(), ), instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()) .unwrap(), stack: stack_cell.clone(), })); let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter { config: config.clone(), instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(), })); let user_backend: Arc> = Arc::new(Mutex::new(UserAuth::new( db_pool.clone(), &Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(), token_granter.clone(), settings.clone(), ))); info!("Connected"); let database_backend = DatabaseBackend::new( Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(), user_backend.clone(), repository_backend.clone(), db_pool.clone(), stack_cell.clone(), ); let mut runtime = GiteratedStack::default(); let database_backend = database_backend.into_substack(); runtime.merge_builder(database_backend); let cache_backend = CacheSubstack::default(); runtime.merge_builder(cache_backend.into_substack()); let runtime = Arc::new(runtime); stack_cell .set(runtime.clone()) .expect("failed to store global daemon stack"); let operation_state = { StackOperationState { our_instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()) .unwrap(), runtime: runtime.clone(), instance: None, user: None, } }; let pool = LocalPoolHandle::new(5); loop { let stream = accept_stream(&mut listener).await; info!("Connected"); let (stream, address) = match stream { Ok(stream) => stream, Err(err) => { error!("Failed to accept connection. {:?}", err); continue; } }; info!("Accepted connection from {}", address); let connection = accept_websocket_connection(stream).await; let connection = match connection { Ok(connection) => connection, Err(err) => { error!( "Failed to initiate Websocket connection from {}. {:?}", address, err ); continue; } }; info!("Websocket connection established with {}", address); let connections_cloned = connections.clone(); let repository_backend = repository_backend.clone(); let user_backend = user_backend.clone(); let token_granter = token_granter.clone(); let settings = settings.clone(); let instance_connections = instance_connections.clone(); let config = config.clone(); let runtime = runtime.clone(); let operation_state = operation_state.clone(); pool.spawn_pinned(move || { connection_wrapper( connection, connections_cloned, repository_backend, user_backend, token_granter, settings, address, Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(), instance_connections, config, runtime, operation_state, ) }); let connection = RawConnection { task: tokio::spawn(async move { () }), }; connections.lock().await.connections.push(connection); } } async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> { let stream = listener.accept().await?; Ok(stream) } async fn accept_websocket_connection( stream: S, ) -> Result, Error> { let connection = accept_async(stream).await?; Ok(connection) }