JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

begin new protocol refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨2091700

⁨src/handshake.rs⁩ - ⁨3661⁩ bytes
Raw
1 use std::str::FromStr;
2
3 use futures_util::StreamExt;
4 use giterated_models::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake};
5 use semver::Version;
6 use serde::Serialize;
7 use tokio_tungstenite::tungstenite::Message;
8
9 use crate::Socket;
10
11 pub struct GiteratedConnectionHandshaker(Socket);
12
13 impl GiteratedConnectionHandshaker {
14 pub fn new(socket: Socket) -> Self {
15 Self(socket)
16 }
17
18 pub async fn handshake(self) -> Result<Socket, anyhow::Error> {
19 let mut socket = self.0;
20
21 // Send handshake initiation
22 Self::send_raw_message(
23 &mut socket,
24 &InitiateHandshake {
25 version: Version::from_str("0.0.0").unwrap(),
26 },
27 )
28 .await?;
29
30 while let Some(message) = socket.next().await {
31 let message = match message {
32 Ok(message) => message,
33 Err(err) => {
34 error!("Error reading message: {:?}", err);
35 continue;
36 }
37 };
38
39 let payload = match message {
40 Message::Text(text) => text.into_bytes(),
41 Message::Binary(bytes) => bytes,
42 Message::Ping(_) => continue,
43 Message::Pong(_) => continue,
44 Message::Close(_) => {
45 socket.close(None).await?;
46
47 return Err(SocketClosedError.into());
48 }
49 _ => unreachable!(),
50 };
51
52 info!("Read payload: {}", std::str::from_utf8(&payload).unwrap());
53
54 // We try deserializing the finalize response first as it is smaller and just as common
55 if let Ok(finalize) = serde_json::from_slice::<HandshakeFinalize>(&payload) {
56 if finalize.success {
57 info!("Handshake success!");
58
59 return Ok(socket);
60 } else {
61 socket.close(None).await?;
62 return Err(SocketClosedError.into());
63 }
64 } else {
65 match serde_json::from_slice::<HandshakeResponse>(&payload) {
66 Ok(_response) => {
67 // let message = if !validate_version(&response.version) {
68 // error!(
69 // "Version compatibility failure! Our Version: {}, Their Version: {}",
70 // version(),
71 // response.version
72 // );
73
74 // HandshakeFinalize { success: false }
75 // } else {
76 // info!("Connected with a compatible version");
77
78 // HandshakeFinalize { success: true }
79 // };
80
81 warn!("Version compatibility has been no-op'd");
82
83 let message = HandshakeFinalize { success: true };
84 // Send [`HandshakeFinalize`] to indicate if we're compatible or not
85 Self::send_raw_message(&mut socket, &message).await?;
86 }
87 Err(err) => {
88 error!("Error deserializing message: {:?}", err);
89 continue;
90 }
91 }
92 }
93 }
94
95 Ok(socket)
96 }
97
98 async fn send_raw_message<T: Serialize>(
99 _socket: &Socket,
100 _message: &T,
101 ) -> Result<(), tokio_tungstenite::tungstenite::Error> {
102 todo!()
103 }
104 }
105
106 #[derive(Debug, thiserror::Error)]
107 #[error("closed socket")]
108 struct SocketClosedError;
109