Begin new protocol refactor
parent: tbd commit: 26651b1
1 | use |
2 | , | SocketAddr
3 | , |
4 | ; |
5 | |
6 | use Error; |
7 | use ; |
8 | use Instance; |
9 | |
10 | use Serialize; |
11 | |
12 | use ; |
13 | use ; |
14 | use Table; |
15 | |
16 | use crate:: |
17 | , | AuthenticationTokenGranter
18 | , |
19 | , | InstanceConnections
20 | , | PublicKeyCache
21 | ; |
22 | |
23 | use Connections; |
24 | |
25 | pub async |
26 | socket: , |
27 | connections: , |
28 | repository_backend: , |
29 | user_backend: , |
30 | auth_granter: , |
31 | settings_backend: , |
32 | addr: SocketAddr, |
33 | instance: impl , |
34 | instance_connections: , |
35 | config: Table, |
36 | |
37 | let _connection_state = ConnectionState |
38 | socket: new, |
39 | connections, |
40 | repository_backend, |
41 | user_backend, |
42 | auth_granter, |
43 | settings_backend, |
44 | addr, |
45 | instance: instance.to_owned, |
46 | handshaked: new, |
47 | key_cache: default, |
48 | instance_connections: instance_connections.clone, |
49 | config, |
50 | ; |
51 | |
52 | let _handshaked = false; |
53 | |
54 | // loop { |
55 | // let mut socket = connection_state.socket.lock().await; |
56 | // let message = socket.next().await; |
57 | // drop(socket); |
58 | |
59 | // match message { |
60 | // Some(Ok(message)) => { |
61 | // let payload = match message { |
62 | // Message::Binary(payload) => payload, |
63 | // Message::Ping(_) => { |
64 | // let mut socket = connection_state.socket.lock().await; |
65 | // let _ = socket.send(Message::Pong(vec![])).await; |
66 | // drop(socket); |
67 | // continue; |
68 | // } |
69 | // Message::Close(_) => return, |
70 | // _ => continue, |
71 | // }; |
72 | // info!("one payload"); |
73 | |
74 | // let message = NetworkMessage(payload.clone()); |
75 | |
76 | // if !handshaked { |
77 | // info!("im foo baring"); |
78 | // if handshake_handle(&message, &connection_state).await.is_ok() { |
79 | // if connection_state.handshaked.load(Ordering::SeqCst) { |
80 | // handshaked = true; |
81 | // } |
82 | // } |
83 | // } else { |
84 | // let raw = serde_json::from_slice::<AuthenticatedPayload>(&payload).unwrap(); |
85 | |
86 | // if let Some(target_instance) = &raw.target_instance { |
87 | // if connection_state.instance != *target_instance { |
88 | // // Forward request |
89 | // info!("Forwarding message to {}", target_instance.url); |
90 | // let mut instance_connections = instance_connections.lock().await; |
91 | // let pool = instance_connections.get_or_open(&target_instance).unwrap(); |
92 | // let pool_clone = pool.clone(); |
93 | // drop(pool); |
94 | |
95 | // let result = wrap_forwarded(&pool_clone, raw).await; |
96 | |
97 | // let mut socket = connection_state.socket.lock().await; |
98 | // let _ = socket.send(result).await; |
99 | |
100 | // continue; |
101 | // } |
102 | // } |
103 | |
104 | // let message_type = &raw.message_type; |
105 | |
106 | // info!("Handling message with type: {}", message_type); |
107 | |
108 | // match authentication_handle(message_type, &message, &connection_state).await { |
109 | // Err(e) => { |
110 | // let _ = connection_state |
111 | // .send_raw(ConnectionError(e.to_string())) |
112 | // .await; |
113 | // } |
114 | // Ok(true) => continue, |
115 | // Ok(false) => {} |
116 | // } |
117 | |
118 | // match repository_handle(message_type, &message, &connection_state).await { |
119 | // Err(e) => { |
120 | // let _ = connection_state |
121 | // .send_raw(ConnectionError(e.to_string())) |
122 | // .await; |
123 | // } |
124 | // Ok(true) => continue, |
125 | // Ok(false) => {} |
126 | // } |
127 | |
128 | // match user_handle(message_type, &message, &connection_state).await { |
129 | // Err(e) => { |
130 | // let _ = connection_state |
131 | // .send_raw(ConnectionError(e.to_string())) |
132 | // .await; |
133 | // } |
134 | // Ok(true) => continue, |
135 | // Ok(false) => {} |
136 | // } |
137 | |
138 | // match authentication_handle(message_type, &message, &connection_state).await { |
139 | // Err(e) => { |
140 | // let _ = connection_state |
141 | // .send_raw(ConnectionError(e.to_string())) |
142 | // .await; |
143 | // } |
144 | // Ok(true) => continue, |
145 | // Ok(false) => {} |
146 | // } |
147 | |
148 | // error!( |
149 | // "Message completely unhandled: {}", |
150 | // std::str::from_utf8(&payload).unwrap() |
151 | // ); |
152 | // } |
153 | // } |
154 | // Some(Err(e)) => { |
155 | // error!("Closing connection for {:?} for {}", e, addr); |
156 | // return; |
157 | // } |
158 | // _ => { |
159 | // info!("Unhandled"); |
160 | // continue; |
161 | // } |
162 | // } |
163 | // } |
164 | |
165 | |
166 | |
167 | |
168 | socket: , |
169 | pub connections: , |
170 | pub repository_backend: , |
171 | pub user_backend: , |
172 | pub auth_granter: , |
173 | pub settings_backend: , |
174 | pub addr: SocketAddr, |
175 | pub instance: Instance, |
176 | pub handshaked: , |
177 | pub key_cache: , |
178 | pub instance_connections: , |
179 | pub config: Table, |
180 | |
181 | |
182 | |
183 | pub async |
184 | let payload = to_string?; |
185 | info!; |
186 | self.socket |
187 | .lock |
188 | .await |
189 | .send |
190 | .await?; |
191 | |
192 | Ok |
193 | |
194 | |
195 | pub async |
196 | let payload = to_string?; |
197 | info!; |
198 | self.socket |
199 | .lock |
200 | .await |
201 | .send |
202 | .await?; |
203 | |
204 | Ok |
205 | |
206 | |
207 | pub async |
208 | let mut keys = self.key_cache.lock .await; |
209 | keys.get .await |
210 | |
211 | |
212 |