JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Refactor handshake messages

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨6c50283

⁨src/connection/handshake.rs⁩ - ⁨3559⁩ bytes
Raw
1 use std::{str::FromStr, sync::atomic::Ordering};
2
3 use anyhow::Error;
4 use semver::Version;
5
6 use crate::{
7 connection::ConnectionError,
8 messages::handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake},
9 model::authenticated::{AuthenticatedInstance, Message, MessageHandler, NetworkMessage, State},
10 validate_version,
11 };
12
13 use super::{wrapper::ConnectionState, HandlerUnhandled};
14
15 pub async fn handshake_handle(
16 message: &NetworkMessage,
17 state: &ConnectionState,
18 ) -> Result<(), Error> {
19 if initiate_handshake
20 .handle_message(&message, state)
21 .await
22 .is_ok()
23 {
24 Ok(())
25 } else if handshake_response
26 .handle_message(&message, state)
27 .await
28 .is_ok()
29 {
30 Ok(())
31 } else if handshake_finalize
32 .handle_message(&message, state)
33 .await
34 .is_ok()
35 {
36 Ok(())
37 } else {
38 Err(Error::from(HandlerUnhandled))
39 }
40 }
41
42 async fn initiate_handshake(
43 Message(initiation): Message<InitiateHandshake>,
44 State(connection_state): State<ConnectionState>,
45 _instance: AuthenticatedInstance,
46 ) -> Result<(), HandshakeError> {
47 if !validate_version(&initiation.version) {
48 error!(
49 "Version compatibility failure! Our Version: {}, Their Version: {}",
50 Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap()).unwrap(),
51 initiation.version
52 );
53
54 connection_state
55 .send(HandshakeFinalize { success: false })
56 .await
57 .map_err(|e| HandshakeError::SendError(e))?;
58
59 Ok(())
60 } else {
61 connection_state
62 .send(HandshakeFinalize { success: true })
63 .await
64 .map_err(|e| HandshakeError::SendError(e))?;
65
66 Ok(())
67 }
68 }
69
70 async fn handshake_response(
71 Message(response): Message<HandshakeResponse>,
72 State(connection_state): State<ConnectionState>,
73 _instance: AuthenticatedInstance,
74 ) -> Result<(), HandshakeError> {
75 if !validate_version(&response.version) {
76 error!(
77 "Version compatibility failure! Our Version: {}, Their Version: {}",
78 Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap()).unwrap(),
79 response.version
80 );
81
82 connection_state
83 .send(HandshakeFinalize { success: false })
84 .await
85 .map_err(|e| HandshakeError::SendError(e))?;
86
87 Ok(())
88 } else {
89 connection_state
90 .send(HandshakeFinalize { success: true })
91 .await
92 .map_err(|e| HandshakeError::SendError(e))?;
93
94 Ok(())
95 }
96 }
97
98 async fn handshake_finalize(
99 Message(finalize): Message<HandshakeFinalize>,
100 State(connection_state): State<ConnectionState>,
101 _instance: AuthenticatedInstance,
102 ) -> Result<(), HandshakeError> {
103 if !finalize.success {
104 error!("Error during handshake, aborting connection");
105 return Err(Error::from(ConnectionError::Shutdown).into());
106 } else {
107 connection_state.handshaked.store(true, Ordering::SeqCst);
108
109 connection_state
110 .send(HandshakeFinalize { success: true })
111 .await
112 .map_err(|e| HandshakeError::SendError(e))?;
113
114 Ok(())
115 }
116 }
117
118 #[derive(Debug, thiserror::Error)]
119 pub enum HandshakeError {
120 #[error("version mismatch during handshake, ours: {0}, theirs: {1}")]
121 VersionMismatch(Version, Version),
122 #[error("while sending message: {0}")]
123 SendError(Error),
124 #[error("{0}")]
125 Other(#[from] Error),
126 }
127