Git repository hosting, collaboration, and discovery for the Fediverse.
begin new protocol refactor
parent: tbd commit: 2091700
1 | pub mod request; |
2 | |
3 | pub mod model { |
4 | pub use giterated_models::model::*; |
5 | } |
6 | |
7 | pub mod messages { |
8 | pub use giterated_models::messages::*; |
9 | } |
10 | |
11 | use std::net::SocketAddr; |
12 | use std::str::FromStr; |
13 | use std::sync::Arc; |
14 | use std::{convert::Infallible, ops::Deref}; |
15 | |
16 | use deadpool::managed::{BuildError, Manager, Object, Pool, RecycleError, RecycleResult}; |
17 | |
18 | use futures_util::{SinkExt, StreamExt}; |
19 | |
20 | use giterated_models::{ |
21 | messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake}, |
22 | model::instance::Instance, |
23 | }; |
24 | use semver::Version; |
25 | use serde::Serialize; |
26 | use tokio::net::TcpStream; |
27 | use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; |
28 | |
29 | type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>; |
30 | |
31 | #[macro_use] |
32 | extern crate tracing; |
33 | |
34 | pub struct GiteratedApiBuilder { |
35 | our_instance: Instance, |
36 | our_private_key: Option<String>, |
37 | our_public_key: Option<String>, |
38 | target_instance: Option<Instance>, |
39 | } |
40 | |
41 | pub trait AsInstance { |
42 | type Error: std::error::Error + Send + Sync + 'static; |
43 | |
44 | fn into_instance(self) -> Result<Instance, Self::Error>; |
45 | } |
46 | |
47 | impl AsInstance for &str { |
48 | type Error = <Instance as FromStr>::Err; |
49 | |
50 | fn into_instance(self) -> Result<Instance, Self::Error> { |
51 | Instance::from_str(self) |
52 | } |
53 | } |
54 | |
55 | impl AsInstance for Instance { |
56 | type Error = Infallible; |
57 | |
58 | fn into_instance(self) -> Result<Instance, Self::Error> { |
59 | Ok(self) |
60 | } |
61 | } |
62 | |
63 | impl GiteratedApiBuilder { |
64 | pub fn from_local(instance: impl AsInstance) -> Result<Self, anyhow::Error> { |
65 | Ok(Self { |
66 | our_instance: instance.into_instance()?, |
67 | our_private_key: None, |
68 | our_public_key: None, |
69 | target_instance: None, |
70 | }) |
71 | } |
72 | |
73 | pub fn from_local_for_other( |
74 | instance: impl AsInstance, |
75 | other: impl AsInstance, |
76 | ) -> Result<Self, anyhow::Error> { |
77 | Ok(Self { |
78 | our_instance: instance.into_instance()?, |
79 | our_private_key: None, |
80 | our_public_key: None, |
81 | target_instance: Some(other.into_instance()?), |
82 | }) |
83 | } |
84 | |
85 | pub fn private_key(&mut self, key: impl ToString) -> &mut Self { |
86 | self.our_private_key = Some(key.to_string()); |
87 | |
88 | self |
89 | } |
90 | |
91 | pub fn public_key(&mut self, key: impl ToString) -> &mut Self { |
92 | self.our_public_key = Some(key.to_string()); |
93 | |
94 | self |
95 | } |
96 | |
97 | pub async fn build(&mut self) -> Result<GiteratedApi, anyhow::Error> { |
98 | Ok(GiteratedApi { |
99 | configuration: Arc::new(GiteratedApiConfiguration { |
100 | our_private_key: self.our_private_key.take().unwrap(), |
101 | our_public_key: self.our_public_key.take().unwrap(), |
102 | target_instance: self.target_instance.take(), |
103 | // todo |
104 | target_public_key: None, |
105 | }), |
106 | }) |
107 | } |
108 | } |
109 | |
110 | pub struct GiteratedConnectionPool { |
111 | target_instance: Instance, |
112 | socket_addr: Option<SocketAddr>, |
113 | } |
114 | |
115 | #[async_trait::async_trait] |
116 | impl Manager for GiteratedConnectionPool { |
117 | type Type = Socket; |
118 | type Error = anyhow::Error; |
119 | |
120 | async fn create(&self) -> Result<Socket, Self::Error> { |
121 | info!("Creating new Daemon connection"); |
122 | let mut connection = |
123 | GiteratedApi::connect_to(&self.target_instance, &self.socket_addr).await?; |
124 | |
125 | // Handshake first! |
126 | GiteratedApi::handle_handshake(&mut connection, &self.target_instance).await?; |
127 | |
128 | Ok(connection) |
129 | } |
130 | |
131 | async fn recycle(&self, socket: &mut Socket) -> RecycleResult<Self::Error> { |
132 | match socket.send(Message::Ping(vec![])).await { |
133 | Ok(_) => Ok(()), |
134 | Err(err) => { |
135 | info!("Socket died!"); |
136 | Err(RecycleError::Backend(err.into())) |
137 | } |
138 | } |
139 | } |
140 | } |
141 | |
142 | pub struct GiteratedApiConfiguration { |
143 | pub our_private_key: String, |
144 | pub our_public_key: String, |
145 | pub target_instance: Option<Instance>, |
146 | pub target_public_key: Option<String>, |
147 | } |
148 | |
149 | #[derive(Clone)] |
150 | pub struct DaemonConnectionPool(Pool<GiteratedConnectionPool>); |
151 | |
152 | impl DaemonConnectionPool { |
153 | pub fn instance(&self) -> &Instance { |
154 | &self.0.manager().target_instance |
155 | } |
156 | } |
157 | |
158 | impl Deref for DaemonConnectionPool { |
159 | type Target = Pool<GiteratedConnectionPool>; |
160 | |
161 | fn deref(&self) -> &Self::Target { |
162 | &self.0 |
163 | } |
164 | } |
165 | |
166 | impl DaemonConnectionPool { |
167 | pub fn connect( |
168 | instance: impl ToOwned<Owned = Instance>, |
169 | ) -> Result<Self, BuildError<anyhow::Error>> { |
170 | let instance = instance.to_owned(); |
171 | Ok(Self( |
172 | Pool::builder(GiteratedConnectionPool { |
173 | socket_addr: None, |
174 | target_instance: instance.to_owned(), |
175 | }) |
176 | .build()?, |
177 | )) |
178 | } |
179 | |
180 | pub fn connect_other( |
181 | instance_identity: impl ToOwned<Owned = Instance>, |
182 | connection_addr: SocketAddr, |
183 | ) -> Result<Self, BuildError<anyhow::Error>> { |
184 | Ok(Self( |
185 | Pool::builder(GiteratedConnectionPool { |
186 | target_instance: instance_identity.to_owned(), |
187 | socket_addr: Some(connection_addr), |
188 | }) |
189 | .build()?, |
190 | )) |
191 | } |
192 | } |
193 | |
194 | // Keep this private |
195 | // seriousyl. |
196 | #[derive(Clone)] |
197 | struct GiteratedApi { |
198 | configuration: Arc<GiteratedApiConfiguration>, |
199 | } |
200 | |
201 | impl GiteratedApi { |
202 | // pub async fn public_key(&self) -> String { |
203 | // if let Some(public_key) = &self.configuration.target_public_key { |
204 | // public_key.clone() |
205 | // } else { |
206 | // assert!(self.configuration.target_instance.is_none()); |
207 | |
208 | // self.configuration.our_public_key.clone() |
209 | // } |
210 | // } |
211 | |
212 | // /// Register on an [`Instance`]. |
213 | // /// |
214 | // /// # Authorization |
215 | // /// - Must be made by the same instance its being sent to |
216 | // pub async fn register( |
217 | // &self, |
218 | // username: String, |
219 | // email: Option<String>, |
220 | // password: String, |
221 | // pool: &DaemonConnectionPool, |
222 | // ) -> Result<RegisterAccountResponse, anyhow::Error> { |
223 | // let mut connection = pool.0.get().await.unwrap(); |
224 | |
225 | // let message = InstanceAuthenticated::new( |
226 | // RegisterAccountRequest { |
227 | // username, |
228 | // email, |
229 | // password, |
230 | // }, |
231 | // pool.0.manager().target_instance.clone(), |
232 | // self.configuration.our_private_key.clone(), |
233 | // ) |
234 | // .unwrap(); |
235 | |
236 | // Self::send_message( |
237 | // &MessageKind::Authentication(AuthenticationMessage::Request( |
238 | // AuthenticationRequest::RegisterAccount(message), |
239 | // )), |
240 | // &mut connection, |
241 | // ) |
242 | // .await?; |
243 | |
244 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
245 | // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( |
246 | // AuthenticationResponse::RegisterAccount(response), |
247 | // ))) = serde_json::from_slice(&payload) |
248 | // { |
249 | // return Ok(response); |
250 | // } |
251 | // } |
252 | |
253 | // connection.close(None).await?; |
254 | |
255 | // Err(SocketClosedError.into()) |
256 | // } |
257 | |
258 | // /// Create repository on the target instance. |
259 | // pub async fn create_repository( |
260 | // &self, |
261 | // user_token: String, |
262 | // name: String, |
263 | // description: Option<String>, |
264 | // visibility: RepositoryVisibility, |
265 | // default_branch: String, |
266 | // owner: User, |
267 | // pool: &DaemonConnectionPool, |
268 | // ) -> Result<bool, anyhow::Error> { |
269 | // let mut connection = pool.0.get().await.unwrap(); |
270 | |
271 | // let target_respository = Repository { |
272 | // owner: owner.clone(), |
273 | // name: name.clone(), |
274 | // instance: self |
275 | // .configuration |
276 | // .target_instance |
277 | // .as_ref() |
278 | // .unwrap_or(&pool.0.manager().target_instance) |
279 | // .clone(), |
280 | // }; |
281 | |
282 | // let request = CreateRepositoryRequest { |
283 | // name, |
284 | // description, |
285 | // visibility, |
286 | // default_branch, |
287 | // owner, |
288 | // }; |
289 | |
290 | // let message = UnvalidatedUserAuthenticated::new( |
291 | // request, |
292 | // user_token, |
293 | // self.configuration.our_private_key.clone(), |
294 | // ) |
295 | // .unwrap(); |
296 | |
297 | // Self::send_message( |
298 | // &MessageKind::Repository(RepositoryMessage { |
299 | // target: target_respository, |
300 | // command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository( |
301 | // message, |
302 | // )), |
303 | // }), |
304 | // &mut connection, |
305 | // ) |
306 | // .await?; |
307 | |
308 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
309 | // if let Ok(MessageKind::Repository(RepositoryMessage { |
310 | // command: |
311 | // RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)), |
312 | // .. |
313 | // })) = serde_json::from_slice(&payload) |
314 | // { |
315 | // return Ok(true); |
316 | // } |
317 | // } |
318 | |
319 | // connection.close(None).await?; |
320 | |
321 | // Err(SocketClosedError.into()) |
322 | // } |
323 | |
324 | // pub async fn repository_info( |
325 | // &self, |
326 | // token: &str, |
327 | // request: &RepositoryInfoRequest, |
328 | // pool: &DaemonConnectionPool, |
329 | // ) -> Result<RepositoryView, Error> { |
330 | // let mut connection = pool.0.get().await.unwrap(); |
331 | |
332 | // let message = UnvalidatedUserAuthenticated::new( |
333 | // request.clone(), |
334 | // token.to_string(), |
335 | // self.configuration.our_private_key.clone(), |
336 | // ) |
337 | // .unwrap(); |
338 | |
339 | // Self::send_message( |
340 | // &MessageKind::Repository(RepositoryMessage { |
341 | // target: request.repository.clone(), |
342 | // command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(message)), |
343 | // }), |
344 | // &mut connection, |
345 | // ) |
346 | // .await?; |
347 | |
348 | // loop { |
349 | // // while let Ok(payload) = Self::next_payload(&mut socket).await { |
350 | // let payload = match self.next_payload(&mut connection).await { |
351 | // Ok(payload) => payload, |
352 | // Err(err) => { |
353 | // error!("Error while fetching next payload: {:?}", err); |
354 | // continue; |
355 | // } |
356 | // }; |
357 | |
358 | // if let Ok(MessageKind::Repository(RepositoryMessage { |
359 | // command: |
360 | // RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)), |
361 | // .. |
362 | // })) = serde_json::from_slice(&payload) |
363 | // { |
364 | // return Ok(response); |
365 | // } |
366 | // } |
367 | // } |
368 | |
369 | // /// Requests an authentication token for the given login. |
370 | // /// |
371 | // /// # Authorization |
372 | // /// This request can only be sent to the same instance from which |
373 | // /// it is issued. |
374 | // pub async fn authentication_token( |
375 | // &self, |
376 | // secret_key: String, |
377 | // username: String, |
378 | // password: String, |
379 | // pool: &DaemonConnectionPool, |
380 | // ) -> Result<String, Error> { |
381 | // let mut connection = pool.0.get().await.unwrap(); |
382 | |
383 | // let request = InstanceAuthenticated::new( |
384 | // AuthenticationTokenRequest { |
385 | // username, |
386 | // password, |
387 | // }, |
388 | // pool.0.manager().target_instance.clone(), |
389 | // include_str!("example_keys/giterated.key").to_string(), |
390 | // ) |
391 | // .unwrap(); |
392 | |
393 | // Self::send_message( |
394 | // &MessageKind::Authentication(AuthenticationMessage::Request( |
395 | // AuthenticationRequest::AuthenticationToken(request), |
396 | // )), |
397 | // &mut connection, |
398 | // ) |
399 | // .await?; |
400 | |
401 | // loop { |
402 | // // while let Ok(payload) = Self::next_payload(&mut socket).await { |
403 | // let payload = match self.next_payload(&mut connection).await { |
404 | // Ok(payload) => payload, |
405 | // Err(err) => { |
406 | // error!("Error while fetching next payload: {:?}", err); |
407 | // continue; |
408 | // } |
409 | // }; |
410 | |
411 | // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( |
412 | // AuthenticationResponse::AuthenticationToken(response), |
413 | // ))) = serde_json::from_slice(&payload) |
414 | // { |
415 | // return Ok(response.token); |
416 | // } |
417 | // } |
418 | // } |
419 | |
420 | // /// Requests a new token for the given login. |
421 | // /// |
422 | // /// # Authorization |
423 | // /// This request can only be sent to the same instance from which |
424 | // /// it is issued. |
425 | // pub async fn extend_token( |
426 | // &self, |
427 | // secret_key: String, |
428 | // token: String, |
429 | // pool: &DaemonConnectionPool, |
430 | // ) -> Result<Option<String>, Error> { |
431 | // let mut connection = pool.0.get().await.unwrap(); |
432 | |
433 | // let request = InstanceAuthenticated::new( |
434 | // TokenExtensionRequest { token }, |
435 | // pool.0.manager().target_instance.clone(), |
436 | // self.configuration.our_private_key.clone(), |
437 | // ) |
438 | // .unwrap(); |
439 | |
440 | // Self::send_message( |
441 | // &MessageKind::Authentication(AuthenticationMessage::Request( |
442 | // AuthenticationRequest::TokenExtension(request), |
443 | // )), |
444 | // &mut connection, |
445 | // ) |
446 | // .await?; |
447 | |
448 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
449 | // if let Ok(MessageKind::Authentication(AuthenticationMessage::Response( |
450 | // AuthenticationResponse::TokenExtension(response), |
451 | // ))) = serde_json::from_slice(&payload) |
452 | // { |
453 | // return Ok(response.new_token); |
454 | // } |
455 | // } |
456 | |
457 | // todo!() |
458 | // } |
459 | |
460 | // pub async fn user_display_name( |
461 | // &self, |
462 | // user: impl ToOwned<Owned = User>, |
463 | // pool: &DaemonConnectionPool, |
464 | // ) -> Result<Option<String>, Error> { |
465 | // let mut connection = pool.0.get().await.unwrap(); |
466 | |
467 | // let request = UserDisplayNameRequest { |
468 | // user: user.to_owned(), |
469 | // }; |
470 | |
471 | // Self::send_message( |
472 | // &MessageKind::User(UserMessage { |
473 | // instance: self |
474 | // .configuration |
475 | // .target_instance |
476 | // .as_ref() |
477 | // .unwrap_or(&pool.0.manager().target_instance) |
478 | // .clone(), |
479 | // message: UserMessageKind::Request(UserMessageRequest::DisplayName(request)), |
480 | // }), |
481 | // &mut connection, |
482 | // ) |
483 | // .await?; |
484 | |
485 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
486 | // if let Ok(MessageKind::User(UserMessage { |
487 | // message: UserMessageKind::Response(UserMessageResponse::DisplayName(response)), |
488 | // .. |
489 | // })) = serde_json::from_slice(&payload) |
490 | // { |
491 | // return Ok(response.display_name); |
492 | // } |
493 | // } |
494 | |
495 | // connection.close(None).await?; |
496 | |
497 | // Err(SocketClosedError.into()) |
498 | // } |
499 | |
500 | // pub async fn user_display_image( |
501 | // &self, |
502 | // user: impl ToOwned<Owned = User>, |
503 | // pool: &DaemonConnectionPool, |
504 | // ) -> Result<Option<String>, Error> { |
505 | // let mut connection = pool.0.get().await.unwrap(); |
506 | |
507 | // let request = UserDisplayImageRequest { |
508 | // user: user.to_owned(), |
509 | // }; |
510 | |
511 | // Self::send_message( |
512 | // &MessageKind::User(UserMessage { |
513 | // instance: self |
514 | // .configuration |
515 | // .target_instance |
516 | // .as_ref() |
517 | // .unwrap_or(&pool.0.manager().target_instance) |
518 | // .clone(), |
519 | // message: UserMessageKind::Request(UserMessageRequest::DisplayImage(request)), |
520 | // }), |
521 | // &mut connection, |
522 | // ) |
523 | // .await?; |
524 | |
525 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
526 | // if let Ok(MessageKind::User(UserMessage { |
527 | // message: UserMessageKind::Response(UserMessageResponse::DisplayImage(response)), |
528 | // .. |
529 | // })) = serde_json::from_slice(&payload) |
530 | // { |
531 | // return Ok(response.image_url); |
532 | // } |
533 | // } |
534 | |
535 | // connection.close(None).await?; |
536 | |
537 | // Err(SocketClosedError.into()) |
538 | // } |
539 | |
540 | // pub async fn user_bio( |
541 | // &self, |
542 | // user: impl ToOwned<Owned = User>, |
543 | // pool: &DaemonConnectionPool, |
544 | // ) -> Result<Option<String>, Error> { |
545 | // let mut connection = pool.0.get().await.unwrap(); |
546 | |
547 | // let request = UserBioRequest { |
548 | // user: user.to_owned(), |
549 | // }; |
550 | |
551 | // Self::send_message( |
552 | // &MessageKind::User(UserMessage { |
553 | // instance: self |
554 | // .configuration |
555 | // .target_instance |
556 | // .as_ref() |
557 | // .unwrap_or(&pool.0.manager().target_instance) |
558 | // .clone(), |
559 | // message: UserMessageKind::Request(UserMessageRequest::Bio(request)), |
560 | // }), |
561 | // &mut connection, |
562 | // ) |
563 | // .await?; |
564 | |
565 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
566 | // if let Ok(MessageKind::User(UserMessage { |
567 | // message: UserMessageKind::Response(UserMessageResponse::Bio(response)), |
568 | // .. |
569 | // })) = serde_json::from_slice(&payload) |
570 | // { |
571 | // return Ok(response.bio); |
572 | // } |
573 | // } |
574 | |
575 | // connection.close(None).await?; |
576 | |
577 | // Err(SocketClosedError.into()) |
578 | // } |
579 | |
580 | // pub async fn user_repositories( |
581 | // &self, |
582 | // user: impl ToOwned<Owned = User>, |
583 | // pool: &DaemonConnectionPool, |
584 | // ) -> Result<Vec<RepositorySummary>, Error> { |
585 | // let mut connection = pool.0.get().await.unwrap(); |
586 | |
587 | // let request = UserRepositoriesRequest { |
588 | // user: user.to_owned(), |
589 | // }; |
590 | |
591 | // Self::send_message( |
592 | // &MessageKind::User(UserMessage { |
593 | // instance: self |
594 | // .configuration |
595 | // .target_instance |
596 | // .as_ref() |
597 | // .unwrap_or(&pool.0.manager().target_instance) |
598 | // .clone(), |
599 | // message: UserMessageKind::Request(UserMessageRequest::Repositories(request)), |
600 | // }), |
601 | // &mut connection, |
602 | // ) |
603 | // .await?; |
604 | |
605 | // while let Ok(payload) = self.next_payload(&mut connection).await { |
606 | // if let Ok(MessageKind::User(UserMessage { |
607 | // message: UserMessageKind::Response(UserMessageResponse::Repositories(response)), |
608 | // .. |
609 | // })) = serde_json::from_slice(&payload) |
610 | // { |
611 | // return Ok(response.repositories); |
612 | // } |
613 | // } |
614 | |
615 | // connection.close(None).await?; |
616 | |
617 | // Err(SocketClosedError.into()) |
618 | // } |
619 | |
620 | async fn connect_to( |
621 | instance: &Instance, |
622 | socket_addr: &Option<SocketAddr>, |
623 | ) -> Result<Socket, anyhow::Error> { |
624 | if let Some(addr) = socket_addr { |
625 | info!( |
626 | "Connecting to {}", |
627 | format!("ws://{}/.giterated/daemon/", addr) |
628 | ); |
629 | let (websocket, _response) = |
630 | connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?; |
631 | info!("Connection established with {}", addr); |
632 | |
633 | Ok(websocket) |
634 | } else { |
635 | info!( |
636 | "Connecting to {}", |
637 | format!("wss://{}/.giterated/daemon/", instance.url) |
638 | ); |
639 | let (websocket, _response) = |
640 | connect_async(&format!("wss://{}/.giterated/daemon/", instance.url)).await?; |
641 | info!("Connection established with {}", instance.url); |
642 | |
643 | Ok(websocket) |
644 | } |
645 | } |
646 | |
647 | async fn handle_handshake( |
648 | socket: &mut Socket, |
649 | _instance: &Instance, |
650 | ) -> Result<(), anyhow::Error> { |
651 | // Send handshake initiation |
652 | Self::send_message( |
653 | &InitiateHandshake { |
654 | version: Version::from_str("0.0.0").unwrap(), |
655 | }, |
656 | socket, |
657 | ) |
658 | .await?; |
659 | |
660 | while let Some(message) = socket.next().await { |
661 | let message = match message { |
662 | Ok(message) => message, |
663 | Err(err) => { |
664 | error!("Error reading message: {:?}", err); |
665 | continue; |
666 | } |
667 | }; |
668 | |
669 | let payload = match message { |
670 | Message::Text(text) => text.into_bytes(), |
671 | Message::Binary(bytes) => bytes, |
672 | Message::Ping(_) => continue, |
673 | Message::Pong(_) => continue, |
674 | Message::Close(_) => { |
675 | socket.close(None).await?; |
676 | |
677 | return Err(SocketClosedError.into()); |
678 | } |
679 | _ => unreachable!(), |
680 | }; |
681 | |
682 | info!("Read payload: {}", std::str::from_utf8(&payload).unwrap()); |
683 | |
684 | // We try deserializing the finalize response first as it is smaller and just as common |
685 | if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) { |
686 | if finalize.success { |
687 | info!("Handshake success!"); |
688 | |
689 | return Ok(()); |
690 | } else { |
691 | panic!() |
692 | } |
693 | } else { |
694 | match serde_json::from_slice::<HandshakeResponse>(&payload) { |
695 | Ok(response) => { |
696 | // let message = if !validate_version(&response.version) { |
697 | // error!( |
698 | // "Version compatibility failure! Our Version: {}, Their Version: {}", |
699 | // version(), |
700 | // response.version |
701 | // ); |
702 | |
703 | // HandshakeFinalize { success: false } |
704 | // } else { |
705 | // info!("Connected with a compatible version"); |
706 | |
707 | // HandshakeFinalize { success: true } |
708 | // }; |
709 | |
710 | warn!("Version compatibility has been no-op'd"); |
711 | |
712 | let message = HandshakeFinalize { success: true }; |
713 | // Send [`HandshakeFinalize`] to indicate if we're compatible or not |
714 | Self::send_message(&message, socket).await?; |
715 | } |
716 | Err(err) => { |
717 | error!("Error deserializing message: {:?}", err); |
718 | continue; |
719 | } |
720 | } |
721 | } |
722 | } |
723 | |
724 | Ok(()) |
725 | } |
726 | |
727 | async fn send_message<T: Serialize>( |
728 | message: &T, |
729 | socket: &mut Socket, |
730 | ) -> Result<(), anyhow::Error> { |
731 | socket |
732 | .send(Message::Binary(serde_json::to_vec(&message).unwrap())) |
733 | .await?; |
734 | |
735 | Ok(()) |
736 | } |
737 | |
738 | // async fn next_payload(&self, socket: &mut Socket) -> Result<Vec<u8>, Error> { |
739 | // while let Some(message) = socket.next().await { |
740 | // let message = message?; |
741 | |
742 | // match message { |
743 | // Message::Text(text) => return Ok(text.into_bytes()), |
744 | // Message::Binary(bytes) => return Ok(bytes), |
745 | // Message::Ping(_) => continue, |
746 | // Message::Pong(_) => continue, |
747 | // Message::Close(_) => { |
748 | // socket.close(None).await?; |
749 | |
750 | // return Err(SocketClosedError.into()); |
751 | // } |
752 | // _ => unreachable!(), |
753 | // } |
754 | // } |
755 | |
756 | // socket.close(None).await?; |
757 | |
758 | // return Err(SocketClosedError.into()); |
759 | // } |
760 | } |
761 | |
762 | #[derive(Debug, thiserror::Error)] |
763 | #[error("closed socket")] |
764 | struct SocketClosedError; |