JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Begin new protocol refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨26651b1

⁨giterated-daemon/src/connection/wrapper.rs⁩ - ⁨7794⁩ bytes
Raw
1 use std::{
2 net::SocketAddr,
3 sync::{atomic::AtomicBool, Arc},
4 };
5
6 use anyhow::Error;
7 use futures_util::{SinkExt, StreamExt};
8 use giterated_models::model::instance::Instance;
9
10 use serde::Serialize;
11
12 use tokio::{net::TcpStream, sync::Mutex};
13 use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
14 use toml::Table;
15
16 use crate::{
17 authentication::AuthenticationTokenGranter,
18 backend::{RepositoryBackend, SettingsBackend, UserBackend},
19 federation::connections::InstanceConnections,
20 keys::PublicKeyCache,
21 };
22
23 use super::Connections;
24
25 pub async fn connection_wrapper(
26 socket: WebSocketStream<TcpStream>,
27 connections: Arc<Mutex<Connections>>,
28 repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
29 user_backend: Arc<Mutex<dyn UserBackend + Send>>,
30 auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
31 settings_backend: Arc<Mutex<dyn SettingsBackend>>,
32 addr: SocketAddr,
33 instance: impl ToOwned<Owned = Instance>,
34 instance_connections: Arc<Mutex<InstanceConnections>>,
35 config: Table,
36 ) {
37 let _connection_state = ConnectionState {
38 socket: Arc::new(Mutex::new(socket)),
39 connections,
40 repository_backend,
41 user_backend,
42 auth_granter,
43 settings_backend,
44 addr,
45 instance: instance.to_owned(),
46 handshaked: Arc::new(AtomicBool::new(false)),
47 key_cache: Arc::default(),
48 instance_connections: instance_connections.clone(),
49 config,
50 };
51
52 let _handshaked = false;
53
54 // loop {
55 // let mut socket = connection_state.socket.lock().await;
56 // let message = socket.next().await;
57 // drop(socket);
58
59 // match message {
60 // Some(Ok(message)) => {
61 // let payload = match message {
62 // Message::Binary(payload) => payload,
63 // Message::Ping(_) => {
64 // let mut socket = connection_state.socket.lock().await;
65 // let _ = socket.send(Message::Pong(vec![])).await;
66 // drop(socket);
67 // continue;
68 // }
69 // Message::Close(_) => return,
70 // _ => continue,
71 // };
72 // info!("one payload");
73
74 // let message = NetworkMessage(payload.clone());
75
76 // if !handshaked {
77 // info!("im foo baring");
78 // if handshake_handle(&message, &connection_state).await.is_ok() {
79 // if connection_state.handshaked.load(Ordering::SeqCst) {
80 // handshaked = true;
81 // }
82 // }
83 // } else {
84 // let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap();
85
86 // if let Some(target_instance) = &raw.target_instance {
87 // if connection_state.instance != *target_instance {
88 // // Forward request
89 // info!("Forwarding message to {}", target_instance.url);
90 // let mut instance_connections = instance_connections.lock().await;
91 // let pool = instance_connections.get_or_open(&target_instance).unwrap();
92 // let pool_clone = pool.clone();
93 // drop(pool);
94
95 // let result = wrap_forwarded(&pool_clone, raw).await;
96
97 // let mut socket = connection_state.socket.lock().await;
98 // let _ = socket.send(result).await;
99
100 // continue;
101 // }
102 // }
103
104 // let message_type = &raw.message_type;
105
106 // info!("Handling message with type: {}", message_type);
107
108 // match authentication_handle(message_type, &message, &connection_state).await {
109 // Err(e) => {
110 // let _ = connection_state
111 // .send_raw(ConnectionError(e.to_string()))
112 // .await;
113 // }
114 // Ok(true) => continue,
115 // Ok(false) => {}
116 // }
117
118 // match repository_handle(message_type, &message, &connection_state).await {
119 // Err(e) => {
120 // let _ = connection_state
121 // .send_raw(ConnectionError(e.to_string()))
122 // .await;
123 // }
124 // Ok(true) => continue,
125 // Ok(false) => {}
126 // }
127
128 // match user_handle(message_type, &message, &connection_state).await {
129 // Err(e) => {
130 // let _ = connection_state
131 // .send_raw(ConnectionError(e.to_string()))
132 // .await;
133 // }
134 // Ok(true) => continue,
135 // Ok(false) => {}
136 // }
137
138 // match authentication_handle(message_type, &message, &connection_state).await {
139 // Err(e) => {
140 // let _ = connection_state
141 // .send_raw(ConnectionError(e.to_string()))
142 // .await;
143 // }
144 // Ok(true) => continue,
145 // Ok(false) => {}
146 // }
147
148 // error!(
149 // "Message completely unhandled: {}",
150 // std::str::from_utf8(&payload).unwrap()
151 // );
152 // }
153 // }
154 // Some(Err(e)) => {
155 // error!("Closing connection for {:?} for {}", e, addr);
156 // return;
157 // }
158 // _ => {
159 // info!("Unhandled");
160 // continue;
161 // }
162 // }
163 // }
164 }
165
166 #[derive(Clone)]
167 pub struct ConnectionState {
168 socket: Arc<Mutex<WebSocketStream<TcpStream>>>,
169 pub connections: Arc<Mutex<Connections>>,
170 pub repository_backend: Arc<Mutex<dyn RepositoryBackend + Send>>,
171 pub user_backend: Arc<Mutex<dyn UserBackend + Send>>,
172 pub auth_granter: Arc<Mutex<AuthenticationTokenGranter>>,
173 pub settings_backend: Arc<Mutex<dyn SettingsBackend>>,
174 pub addr: SocketAddr,
175 pub instance: Instance,
176 pub handshaked: Arc<AtomicBool>,
177 pub key_cache: Arc<Mutex<PublicKeyCache>>,
178 pub instance_connections: Arc<Mutex<InstanceConnections>>,
179 pub config: Table,
180 }
181
182 impl ConnectionState {
183 pub async fn send<T: Serialize>(&self, message: T) -> Result<(), Error> {
184 let payload = serde_json::to_string(&message)?;
185 info!("Sending payload: {}", &payload);
186 self.socket
187 .lock()
188 .await
189 .send(Message::Binary(payload.into_bytes()))
190 .await?;
191
192 Ok(())
193 }
194
195 pub async fn send_raw<T: Serialize>(&self, message: T) -> Result<(), Error> {
196 let payload = serde_json::to_string(&message)?;
197 info!("Sending payload: {}", &payload);
198 self.socket
199 .lock()
200 .await
201 .send(Message::Binary(payload.into_bytes()))
202 .await?;
203
204 Ok(())
205 }
206
207 pub async fn public_key(&self, instance: &Instance) -> Result<String, Error> {
208 let mut keys = self.key_cache.lock().await;
209 keys.get(instance).await
210 }
211 }
212