Fixes
parent: tbd commit: 73a5af5
1 | use |
2 | , | SocketAddr
3 | , |
4 | ; |
5 | |
6 | use Error; |
7 | use ; |
8 | |
9 | use Instance; |
10 | |
11 | use ObjectBackend; |
12 | |
13 | use |
14 | , GiteratedMessage, AnyObject, | AuthenticatedPayload
15 | , | AnyOperation
16 | ; |
17 | use Serialize; |
18 | |
19 | use ; |
20 | use ; |
21 | use Table; |
22 | |
23 | use crate:: |
24 | , | AuthenticationTokenGranter
25 | , |
26 | , | DatabaseBackend
27 | , | InstanceConnections
28 | , | PublicKeyCache
29 | ; |
30 | |
31 | use Connections; |
32 | |
33 | pub async |
34 | socket: , |
35 | connections: , |
36 | repository_backend: , |
37 | user_backend: , |
38 | auth_granter: , |
39 | settings_backend: , |
40 | addr: SocketAddr, |
41 | instance: impl , |
42 | instance_connections: , |
43 | config: Table, |
44 | backend: DatabaseBackend, |
45 | |
46 | let connection_state = ConnectionState |
47 | socket: new, |
48 | connections, |
49 | repository_backend, |
50 | user_backend, |
51 | auth_granter, |
52 | settings_backend, |
53 | addr, |
54 | instance: instance.to_owned, |
55 | handshaked: new, |
56 | key_cache: default, |
57 | instance_connections: instance_connections.clone, |
58 | config, |
59 | ; |
60 | |
61 | let _handshaked = false; |
62 | |
63 | loop |
64 | let mut socket = connection_state.socket.lock .await; |
65 | let message = socket.next .await; |
66 | drop; |
67 | |
68 | match message |
69 | Some => |
70 | let payload = match message |
71 | => payload, | Binary
72 | => | Ping
73 | let mut socket = connection_state.socket.lock .await; |
74 | let _ = socket.send .await; |
75 | drop; |
76 | continue; |
77 | |
78 | => return, | Close
79 | _ => continue, |
80 | ; |
81 | |
82 | let message: AuthenticatedPayload = deserialize .unwrap; |
83 | |
84 | let message: = message.into_message; |
85 | |
86 | backend |
87 | .object_operation |
88 | .await |
89 | .unwrap; |
90 | |
91 | _ => |
92 | return; |
93 | |
94 | |
95 | |
96 | |
97 | // loop { |
98 | // let mut socket = connection_state.socket.lock().await; |
99 | // let message = socket.next().await; |
100 | // drop(socket); |
101 | |
102 | // match message { |
103 | // Some(Ok(message)) => { |
104 | // let payload = match message { |
105 | // Message::Binary(payload) => payload, |
106 | // Message::Ping(_) => { |
107 | // let mut socket = connection_state.socket.lock().await; |
108 | // let _ = socket.send(Message::Pong(vec![])).await; |
109 | // drop(socket); |
110 | // continue; |
111 | // } |
112 | // Message::Close(_) => return, |
113 | // _ => continue, |
114 | // }; |
115 | // info!("one payload"); |
116 | |
117 | // let message = NetworkMessage(payload.clone()); |
118 | |
119 | // if !handshaked { |
120 | // info!("im foo baring"); |
121 | // if handshake_handle(&message, &connection_state).await.is_ok() { |
122 | // if connection_state.handshaked.load(Ordering::SeqCst) { |
123 | // handshaked = true; |
124 | // } |
125 | // } |
126 | // } else { |
127 | // let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap(); |
128 | |
129 | // if let Some(target_instance) = &raw.target_instance { |
130 | // if connection_state.instance != *target_instance { |
131 | // // Forward request |
132 | // info!("Forwarding message to {}", target_instance.url); |
133 | // let mut instance_connections = instance_connections.lock().await; |
134 | // let pool = instance_connections.get_or_open(&target_instance).unwrap(); |
135 | // let pool_clone = pool.clone(); |
136 | // drop(pool); |
137 | |
138 | // let result = wrap_forwarded(&pool_clone, raw).await; |
139 | |
140 | // let mut socket = connection_state.socket.lock().await; |
141 | // let _ = socket.send(result).await; |
142 | |
143 | // continue; |
144 | // } |
145 | // } |
146 | |
147 | // let message_type = &raw.message_type; |
148 | |
149 | // info!("Handling message with type: {}", message_type); |
150 | |
151 | // match authentication_handle(message_type, &message, &connection_state).await { |
152 | // Err(e) => { |
153 | // let _ = connection_state |
154 | // .send_raw(ConnectionError(e.to_string())) |
155 | // .await; |
156 | // } |
157 | // Ok(true) => continue, |
158 | // Ok(false) => {} |
159 | // } |
160 | |
161 | // match repository_handle(message_type, &message, &connection_state).await { |
162 | // Err(e) => { |
163 | // let _ = connection_state |
164 | // .send_raw(ConnectionError(e.to_string())) |
165 | // .await; |
166 | // } |
167 | // Ok(true) => continue, |
168 | // Ok(false) => {} |
169 | // } |
170 | |
171 | // match user_handle(message_type, &message, &connection_state).await { |
172 | // Err(e) => { |
173 | // let _ = connection_state |
174 | // .send_raw(ConnectionError(e.to_string())) |
175 | // .await; |
176 | // } |
177 | // Ok(true) => continue, |
178 | // Ok(false) => {} |
179 | // } |
180 | |
181 | // match authentication_handle(message_type, &message, &connection_state).await { |
182 | // Err(e) => { |
183 | // let _ = connection_state |
184 | // .send_raw(ConnectionError(e.to_string())) |
185 | // .await; |
186 | // } |
187 | // Ok(true) => continue, |
188 | // Ok(false) => {} |
189 | // } |
190 | |
191 | // error!( |
192 | // "Message completely unhandled: {}", |
193 | // std::str::from_utf8(&payload).unwrap() |
194 | // ); |
195 | // } |
196 | // } |
197 | // Some(Err(e)) => { |
198 | // error!("Closing connection for {:?} for {}", e, addr); |
199 | // return; |
200 | // } |
201 | // _ => { |
202 | // info!("Unhandled"); |
203 | // continue; |
204 | // } |
205 | // } |
206 | // } |
207 | |
208 | |
209 | |
210 | |
211 | socket: , |
212 | pub connections: , |
213 | pub repository_backend: , |
214 | pub user_backend: , |
215 | pub auth_granter: , |
216 | pub settings_backend: , |
217 | pub addr: SocketAddr, |
218 | pub instance: Instance, |
219 | pub handshaked: , |
220 | pub key_cache: , |
221 | pub instance_connections: , |
222 | pub config: Table, |
223 | |
224 | |
225 | |
226 | pub async |
227 | let payload = to_string?; |
228 | info!; |
229 | self.socket |
230 | .lock |
231 | .await |
232 | .send |
233 | .await?; |
234 | |
235 | Ok |
236 | |
237 | |
238 | pub async |
239 | let payload = to_string?; |
240 | info!; |
241 | self.socket |
242 | .lock |
243 | .await |
244 | .send |
245 | .await?; |
246 | |
247 | Ok |
248 | |
249 | |
250 | pub async |
251 | let mut keys = self.key_cache.lock .await; |
252 | keys.get .await |
253 | |
254 | |
255 |