JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

`giterated_cache` initial implementation

# Giterated Stack - Added the ability for dynamic substack handlers to exist for operations relevant to caching. - Added type metadata to the dynamic types. # Giterated Cache - Created - Implemented caching and fetching from cache. Hell fucking yes!!!! It works so good. Are you snooping in the commit logs because you're curious about the history of giterated? Cool that it got so big... tell me I say hi :)

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨86afeef

⁨giterated-daemon/src/main.rs⁩ - ⁨6640⁩ bytes
Raw
1 use anyhow::Error;
2 use connection::{Connections, RawConnection};
3 use giterated_cache::CacheSubstack;
4 use giterated_daemon::{
5 authentication::AuthenticationTokenGranter,
6 backend::{
7 git::GitBackend, settings::DatabaseSettings, user::UserAuth, RepositoryBackend, UserBackend,
8 },
9 connection::{self, wrapper::connection_wrapper},
10 database_backend::DatabaseBackend,
11 federation::connections::InstanceConnections,
12 };
13
14 use giterated_models::instance::Instance;
15
16 use giterated_stack::{GiteratedStack, StackOperationState};
17 use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
18 use std::{net::SocketAddr, str::FromStr, sync::Arc};
19 use tokio::{
20 fs::File,
21 io::{AsyncRead, AsyncReadExt, AsyncWrite},
22 net::{TcpListener, TcpStream},
23 sync::{Mutex, OnceCell},
24 };
25 use tokio_tungstenite::{accept_async, WebSocketStream};
26 use tokio_util::task::LocalPoolHandle;
27 use toml::Table;
28
29 #[macro_use]
30 extern crate tracing;
31
32 #[tokio::main]
33 async fn main() -> Result<(), Error> {
34 tracing_subscriber::fmt::init();
35 let config: Table = {
36 let mut file = File::open("Giterated.toml").await?;
37 let mut text = String::new();
38 file.read_to_string(&mut text).await?;
39 text.parse()?
40 };
41 let mut listener = TcpListener::bind(config["giterated"]["bind"].as_str().unwrap()).await?;
42 let connections: Arc<Mutex<Connections>> = Arc::default();
43 let instance_connections: Arc<Mutex<InstanceConnections>> = Arc::default();
44 let db_conn_options = PgConnectOptions::new()
45 .host(config["postgres"]["host"].as_str().unwrap())
46 .port(config["postgres"]["port"].as_integer().unwrap() as u16)
47 .database(config["postgres"]["database"].as_str().unwrap())
48 .username(config["postgres"]["user"].as_str().unwrap())
49 .password(config["postgres"]["password"].as_str().unwrap())
50 .log_statements(log::LevelFilter::Off);
51 let db_pool = PgPool::connect_with(db_conn_options).await?;
52
53 debug!("Running database migrations...");
54 sqlx::migrate!().run(&db_pool).await?;
55 info!("Connected");
56
57 let stack_cell = Arc::new(OnceCell::default());
58
59 let settings = Arc::new(Mutex::new(DatabaseSettings {
60 pg_pool: db_pool.clone(),
61 stack: stack_cell.clone(),
62 }));
63
64 let repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>> =
65 Arc::new(Mutex::new(GitBackend {
66 pg_pool: db_pool.clone(),
67 repository_folder: String::from(
68 config["giterated"]["backend"]["git"]["root"]
69 .as_str()
70 .unwrap(),
71 ),
72 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
73 .unwrap(),
74 stack: stack_cell.clone(),
75 }));
76
77 let token_granter = Arc::new(Mutex::new(AuthenticationTokenGranter {
78 config: config.clone(),
79 instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
80 }));
81
82 let user_backend: Arc<Mutex<dyn UserBackend + Send>> = Arc::new(Mutex::new(UserAuth::new(
83 db_pool.clone(),
84 &Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
85 token_granter.clone(),
86 settings.clone(),
87 )));
88
89 info!("Connected");
90
91 let database_backend = DatabaseBackend::new(
92 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
93 user_backend.clone(),
94 repository_backend.clone(),
95 db_pool.clone(),
96 stack_cell.clone(),
97 );
98
99 let mut runtime = GiteratedStack::default();
100
101 let database_backend = database_backend.into_substack();
102 runtime.merge_builder(database_backend);
103
104 let cache_backend = CacheSubstack::default();
105 runtime.merge_builder(cache_backend.into_substack());
106
107 let runtime = Arc::new(runtime);
108
109 stack_cell
110 .set(runtime.clone())
111 .expect("failed to store global daemon stack");
112
113 let operation_state = {
114 StackOperationState {
115 our_instance: Instance::from_str(config["giterated"]["instance"].as_str().unwrap())
116 .unwrap(),
117 runtime: runtime.clone(),
118 instance: None,
119 user: None,
120 }
121 };
122
123 let pool = LocalPoolHandle::new(5);
124
125 loop {
126 let stream = accept_stream(&mut listener).await;
127 info!("Connected");
128
129 let (stream, address) = match stream {
130 Ok(stream) => stream,
131 Err(err) => {
132 error!("Failed to accept connection. {:?}", err);
133 continue;
134 }
135 };
136
137 info!("Accepted connection from {}", address);
138
139 let connection = accept_websocket_connection(stream).await;
140
141 let connection = match connection {
142 Ok(connection) => connection,
143 Err(err) => {
144 error!(
145 "Failed to initiate Websocket connection from {}. {:?}",
146 address, err
147 );
148 continue;
149 }
150 };
151
152 info!("Websocket connection established with {}", address);
153 let connections_cloned = connections.clone();
154 let repository_backend = repository_backend.clone();
155 let user_backend = user_backend.clone();
156 let token_granter = token_granter.clone();
157 let settings = settings.clone();
158 let instance_connections = instance_connections.clone();
159 let config = config.clone();
160 let runtime = runtime.clone();
161 let operation_state = operation_state.clone();
162
163 pool.spawn_pinned(move || {
164 connection_wrapper(
165 connection,
166 connections_cloned,
167 repository_backend,
168 user_backend,
169 token_granter,
170 settings,
171 address,
172 Instance::from_str(config["giterated"]["instance"].as_str().unwrap()).unwrap(),
173 instance_connections,
174 config,
175 runtime,
176 operation_state,
177 )
178 });
179
180 let connection = RawConnection {
181 task: tokio::spawn(async move { () }),
182 };
183
184 connections.lock().await.connections.push(connection);
185 }
186 }
187
188 async fn accept_stream(listener: &mut TcpListener) -> Result<(TcpStream, SocketAddr), Error> {
189 let stream = listener.accept().await?;
190
191 Ok(stream)
192 }
193
194 async fn accept_websocket_connection<S: AsyncRead + AsyncWrite + Unpin>(
195 stream: S,
196 ) -> Result<WebSocketStream<S>, Error> {
197 let connection = accept_async(stream).await?;
198
199 Ok(connection)
200 }
201