JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Fixed imports!

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨ef0e853

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨9206⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{atomic::AtomicBool, Arc},
4 };
5
6 use anyhow::Error;
7 use futures_util::{SinkExt, StreamExt};
8
9 use giterated_models::instance::Instance;
10
11 use giterated_models::object_backend::ObjectBackend;
12
13 use giterated_models::{
14 authenticated::AuthenticatedPayload, message::GiteratedMessage, object::AnyObject,
15 operation::AnyOperation,
16 };
17 use serde::Serialize;
18
19 use tokio::{net::TcpStream, sync::Mutex};
20 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
21 use toml::Table;
22
23 use crate::{
24 authentication::AuthenticationTokenGranter,
25 backend::{MetadataBackend, RepositoryBackend, UserBackend},
26 database_backend::Foobackend,
27 federation::connections::InstanceConnections,
28 keys::PublicKeyCache,
29 };
30
31 use super::Connections;
32
33 pub async fn connection_wrapper(
34 socket: WebSocketStream<TcpStream>,
35 connections: Arc<Mutex<Connections>>,
36 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
37 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
38 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
39 settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
40 addr: SocketAddr,
41 instance: impl ToOwned<Owned = Instance>,
42 instance_connections: Arc<Mutex<InstanceConnections>>,
43 config: Table,
44 ) {
45 let connection_state = ConnectionState {
46 socket: Arc::new(Mutex::new(socket)),
47 connections,
48 repository_backend,
49 user_backend,
50 auth_granter,
51 settings_backend,
52 addr,
53 instance: instance.to_owned(),
54 handshaked: Arc::new(AtomicBool::new(false)),
55 key_cache: Arc::default(),
56 instance_connections: instance_connections.clone(),
57 config,
58 };
59
60 let _handshaked = false;
61
62 loop {
63 let mut socket = connection_state.socket.lock().await;
64 let message = socket.next().await;
65 drop(socket);
66
67 match message {
68 Some(Ok(message)) => {
69 let payload = match message {
70 Message::Binary(payload) => payload,
71 Message::Ping(_) => {
72 let mut socket = connection_state.socket.lock().await;
73 let _ = socket.send(Message::Pong(vec![])).await;
74 drop(socket);
75 continue;
76 }
77 Message::Close(_) => return,
78 _ => continue,
79 };
80
81 let message: AuthenticatedPayload = bincode::deserialize(&payload).unwrap();
82
83 let message: GiteratedMessage<AnyObject, AnyOperation> = message.into_message();
84
85 let backend = Foobackend {};
86
87 backend
88 .object_operation(message.object, message.payload)
89 .await
90 .unwrap();
91 }
92 _ => {
93 return;
94 }
95 }
96 }
97
98 // loop {
99 // let mut socket = connection_state.socket.lock().await;
100 // let message = socket.next().await;
101 // drop(socket);
102
103 // match message {
104 // Some(Ok(message)) => {
105 // let payload = match message {
106 // Message::Binary(payload) => payload,
107 // Message::Ping(_) => {
108 // let mut socket = connection_state.socket.lock().await;
109 // let _ = socket.send(Message::Pong(vec![])).await;
110 // drop(socket);
111 // continue;
112 // }
113 // Message::Close(_) => return,
114 // _ => continue,
115 // };
116 // info!("one payload");
117
118 // let message = NetworkMessage(payload.clone());
119
120 // if !handshaked {
121 // info!("im foo baring");
122 // if handshake_handle(&message, &connection_state).await.is_ok() {
123 // if connection_state.handshaked.load(Ordering::SeqCst) {
124 // handshaked = true;
125 // }
126 // }
127 // } else {
128 // let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap();
129
130 // if let Some(target_instance) = &raw.target_instance {
131 // if connection_state.instance != *target_instance {
132 // // Forward request
133 // info!("Forwarding message to {}", target_instance.url);
134 // let mut instance_connections = instance_connections.lock().await;
135 // let pool = instance_connections.get_or_open(&target_instance).unwrap();
136 // let pool_clone = pool.clone();
137 // drop(pool);
138
139 // let result = wrap_forwarded(&pool_clone, raw).await;
140
141 // let mut socket = connection_state.socket.lock().await;
142 // let _ = socket.send(result).await;
143
144 // continue;
145 // }
146 // }
147
148 // let message_type = &raw.message_type;
149
150 // info!("Handling message with type: {}", message_type);
151
152 // match authentication_handle(message_type, &message, &connection_state).await {
153 // Err(e) => {
154 // let _ = connection_state
155 // .send_raw(ConnectionError(e.to_string()))
156 // .await;
157 // }
158 // Ok(true) => continue,
159 // Ok(false) => {}
160 // }
161
162 // match repository_handle(message_type, &message, &connection_state).await {
163 // Err(e) => {
164 // let _ = connection_state
165 // .send_raw(ConnectionError(e.to_string()))
166 // .await;
167 // }
168 // Ok(true) => continue,
169 // Ok(false) => {}
170 // }
171
172 // match user_handle(message_type, &message, &connection_state).await {
173 // Err(e) => {
174 // let _ = connection_state
175 // .send_raw(ConnectionError(e.to_string()))
176 // .await;
177 // }
178 // Ok(true) => continue,
179 // Ok(false) => {}
180 // }
181
182 // match authentication_handle(message_type, &message, &connection_state).await {
183 // Err(e) => {
184 // let _ = connection_state
185 // .send_raw(ConnectionError(e.to_string()))
186 // .await;
187 // }
188 // Ok(true) => continue,
189 // Ok(false) => {}
190 // }
191
192 // error!(
193 // "Message completely unhandled: {}",
194 // std::str::from_utf8(&payload).unwrap()
195 // );
196 // }
197 // }
198 // Some(Err(e)) => {
199 // error!("Closing connection for {:?} for {}", e, addr);
200 // return;
201 // }
202 // _ => {
203 // info!("Unhandled");
204 // continue;
205 // }
206 // }
207 // }
208 }
209
210 #[derive(Clone)]
211 pub struct ConnectionState {
212 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
213 pub connections: Arc<Mutex<Connections>>,
214 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
215 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
216 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
217 pub settings_backend: Arc<Mutex<dyn MetadataBackend + Send>>,
218 pub addr: SocketAddr,
219 pub instance: Instance,
220 pub handshaked: Arc<AtomicBool>,
221 pub key_cache: Arc<Mutex<PublicKeyCache>>,
222 pub instance_connections: Arc<Mutex<InstanceConnections>>,
223 pub config: Table,
224 }
225
226 impl ConnectionState {
227 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
228 let payload = serde_json::to_string(&message)?;
229 info!("Sending payload: {}", &payload);
230 self.socket
231 .lock()
232 .await
233 .send(Message::Binary(payload.into_bytes()))
234 .await?;
235
236 Ok(())
237 }
238
239 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
240 let payload = serde_json::to_string(&message)?;
241 info!("Sending payload: {}", &payload);
242 self.socket
243 .lock()
244 .await
245 .send(Message::Binary(payload.into_bytes()))
246 .await?;
247
248 Ok(())
249 }
250
251 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
252 let mut keys = self.key_cache.lock().await;
253 keys.get(instance).await
254 }
255 }
256