JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Progress on refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨c9f076f

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨9124⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{atomic::AtomicBool, Arc},
4 };
5
6 use anyhow::Error;
7 use futures_util::{SinkExt, StreamExt};
8 use giterated_models::{
9 model::{authenticated::AuthenticatedPayload, instance::Instance},
10 operation::{AnyObject, AnyOperation, GiteratedMessage, ObjectBackend},
11 };
12
13 use serde::Serialize;
14
15 use tokio::{net::TcpStream, sync::Mutex};
16 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
17 use toml::Table;
18
19 use crate::{
20 authentication::AuthenticationTokenGranter,
21 backend::{RepositoryBackend, SettingsBackend, UserBackend},
22 database_backend::Foobackend,
23 federation::connections::InstanceConnections,
24 keys::PublicKeyCache,
25 };
26
27 use super::Connections;
28
29 pub async fn connection_wrapper(
30 socket: WebSocketStream<TcpStream>,
31 connections: Arc<Mutex<Connections>>,
32 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
33 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
34 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
35 settings_backend: Arc<Mutex<dyn SettingsBackend>>,
36 addr: SocketAddr,
37 instance: impl ToOwned<Owned = Instance>,
38 instance_connections: Arc<Mutex<InstanceConnections>>,
39 config: Table,
40 ) {
41 let connection_state = ConnectionState {
42 socket: Arc::new(Mutex::new(socket)),
43 connections,
44 repository_backend,
45 user_backend,
46 auth_granter,
47 settings_backend,
48 addr,
49 instance: instance.to_owned(),
50 handshaked: Arc::new(AtomicBool::new(false)),
51 key_cache: Arc::default(),
52 instance_connections: instance_connections.clone(),
53 config,
54 };
55
56 let _handshaked = false;
57
58 loop {
59 let mut socket = connection_state.socket.lock().await;
60 let message = socket.next().await;
61 drop(socket);
62
63 match message {
64 Some(Ok(message)) => {
65 let payload = match message {
66 Message::Binary(payload) => payload,
67 Message::Ping(_) => {
68 let mut socket = connection_state.socket.lock().await;
69 let _ = socket.send(Message::Pong(vec![])).await;
70 drop(socket);
71 continue;
72 }
73 Message::Close(_) => return,
74 _ => continue,
75 };
76
77 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
78
79 let message: GiteratedMessage<AnyObject, AnyOperation> = message.into_message();
80
81 let backend = Foobackend {};
82
83 backend
84 .object_operation(message.object, message.payload)
85 .await
86 .unwrap();
87 }
88 _ => {
89 return;
90 }
91 }
92 }
93
94 // loop {
95 // let mut socket = connection_state.socket.lock().await;
96 // let message = socket.next().await;
97 // drop(socket);
98
99 // match message {
100 // Some(Ok(message)) => {
101 // let payload = match message {
102 // Message::Binary(payload) => payload,
103 // Message::Ping(_) => {
104 // let mut socket = connection_state.socket.lock().await;
105 // let _ = socket.send(Message::Pong(vec![])).await;
106 // drop(socket);
107 // continue;
108 // }
109 // Message::Close(_) => return,
110 // _ => continue,
111 // };
112 // info!("one payload");
113
114 // let message = NetworkMessage(payload.clone());
115
116 // if !handshaked {
117 // info!("im foo baring");
118 // if handshake_handle(&message, &connection_state).await.is_ok() {
119 // if connection_state.handshaked.load(Ordering::SeqCst) {
120 // handshaked = true;
121 // }
122 // }
123 // } else {
124 // let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap();
125
126 // if let Some(target_instance) = &raw.target_instance {
127 // if connection_state.instance != *target_instance {
128 // // Forward request
129 // info!("Forwarding message to {}", target_instance.url);
130 // let mut instance_connections = instance_connections.lock().await;
131 // let pool = instance_connections.get_or_open(&target_instance).unwrap();
132 // let pool_clone = pool.clone();
133 // drop(pool);
134
135 // let result = wrap_forwarded(&pool_clone, raw).await;
136
137 // let mut socket = connection_state.socket.lock().await;
138 // let _ = socket.send(result).await;
139
140 // continue;
141 // }
142 // }
143
144 // let message_type = &raw.message_type;
145
146 // info!("Handling message with type: {}", message_type);
147
148 // match authentication_handle(message_type, &message, &connection_state).await {
149 // Err(e) => {
150 // let _ = connection_state
151 // .send_raw(ConnectionError(e.to_string()))
152 // .await;
153 // }
154 // Ok(true) => continue,
155 // Ok(false) => {}
156 // }
157
158 // match repository_handle(message_type, &message, &connection_state).await {
159 // Err(e) => {
160 // let _ = connection_state
161 // .send_raw(ConnectionError(e.to_string()))
162 // .await;
163 // }
164 // Ok(true) => continue,
165 // Ok(false) => {}
166 // }
167
168 // match user_handle(message_type, &message, &connection_state).await {
169 // Err(e) => {
170 // let _ = connection_state
171 // .send_raw(ConnectionError(e.to_string()))
172 // .await;
173 // }
174 // Ok(true) => continue,
175 // Ok(false) => {}
176 // }
177
178 // match authentication_handle(message_type, &message, &connection_state).await {
179 // Err(e) => {
180 // let _ = connection_state
181 // .send_raw(ConnectionError(e.to_string()))
182 // .await;
183 // }
184 // Ok(true) => continue,
185 // Ok(false) => {}
186 // }
187
188 // error!(
189 // "Message completely unhandled: {}",
190 // std::str::from_utf8(&payload).unwrap()
191 // );
192 // }
193 // }
194 // Some(Err(e)) => {
195 // error!("Closing connection for {:?} for {}", e, addr);
196 // return;
197 // }
198 // _ => {
199 // info!("Unhandled");
200 // continue;
201 // }
202 // }
203 // }
204 }
205
206 #[derive(Clone)]
207 pub struct ConnectionState {
208 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
209 pub connections: Arc<Mutex<Connections>>,
210 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
211 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
212 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
213 pub settings_backend: Arc<Mutex<dyn SettingsBackend>>,
214 pub addr: SocketAddr,
215 pub instance: Instance,
216 pub handshaked: Arc<AtomicBool>,
217 pub key_cache: Arc<Mutex<PublicKeyCache>>,
218 pub instance_connections: Arc<Mutex<InstanceConnections>>,
219 pub config: Table,
220 }
221
222 impl ConnectionState {
223 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
224 let payload = serde_json::to_string(&message)?;
225 info!("Sending payload: {}", &payload);
226 self.socket
227 .lock()
228 .await
229 .send(Message::Binary(payload.into_bytes()))
230 .await?;
231
232 Ok(())
233 }
234
235 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
236 let payload = serde_json::to_string(&message)?;
237 info!("Sending payload: {}", &payload);
238 self.socket
239 .lock()
240 .await
241 .send(Message::Binary(payload.into_bytes()))
242 .await?;
243
244 Ok(())
245 }
246
247 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
248 let mut keys = self.key_cache.lock().await;
249 keys.get(instance).await
250 }
251 }
252