JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Major connection refactor base

Type: Refactor

Amber - ⁨2⁩ years ago

parent: tbd commit: ⁨8dcc111

⁨src/connection/handshake.rs⁩ - ⁨3604⁩ bytes
Raw
1 use std::{str::FromStr, sync::atomic::Ordering};
2
3 use anyhow::Error;
4 use semver::Version;
5 use thiserror::Error;
6
7 use crate::model::authenticated::MessageHandler;
8 use crate::{
9 connection::ConnectionError,
10 handshake::{HandshakeFinalize, HandshakeResponse, InitiateHandshake},
11 model::authenticated::{AuthenticatedInstance, Message, NetworkMessage, State},
12 validate_version,
13 };
14
15 use super::{wrapper::ConnectionState, HandlerUnhandled};
16
17 pub async fn handshake_handle(
18 message: &NetworkMessage,
19 state: &ConnectionState,
20 ) -> Result<(), Error> {
21 if initiate_handshake
22 .handle_message(&message, state)
23 .await
24 .is_ok()
25 {
26 Ok(())
27 } else if handshake_response
28 .handle_message(&message, state)
29 .await
30 .is_ok()
31 {
32 Ok(())
33 } else if handshake_finalize
34 .handle_message(&message, state)
35 .await
36 .is_ok()
37 {
38 Ok(())
39 } else {
40 Err(Error::from(HandlerUnhandled))
41 }
42 }
43
44 async fn initiate_handshake(
45 Message(initiation): Message<InitiateHandshake>,
46 State(connection_state): State<ConnectionState>,
47 _instance: AuthenticatedInstance,
48 ) -> Result<(), HandshakeError> {
49 if !validate_version(&initiation.version) {
50 error!(
51 "Version compatibility failure! Our Version: {}, Their Version: {}",
52 Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap()).unwrap(),
53 initiation.version
54 );
55
56 connection_state
57 .send(HandshakeFinalize { success: false })
58 .await
59 .map_err(|e| HandshakeError::SendError(e))?;
60
61 Ok(())
62 } else {
63 connection_state
64 .send(HandshakeFinalize { success: true })
65 .await
66 .map_err(|e| HandshakeError::SendError(e))?;
67
68 Ok(())
69 }
70 }
71
72 async fn handshake_response(
73 Message(response): Message<HandshakeResponse>,
74 State(connection_state): State<ConnectionState>,
75 _instance: AuthenticatedInstance,
76 ) -> Result<(), HandshakeError> {
77 if !validate_version(&response.version) {
78 error!(
79 "Version compatibility failure! Our Version: {}, Their Version: {}",
80 Version::from_str(&std::env::var("CARGO_PKG_VERSION").unwrap()).unwrap(),
81 response.version
82 );
83
84 connection_state
85 .send(HandshakeFinalize { success: false })
86 .await
87 .map_err(|e| HandshakeError::SendError(e))?;
88
89 Ok(())
90 } else {
91 connection_state
92 .send(HandshakeFinalize { success: true })
93 .await
94 .map_err(|e| HandshakeError::SendError(e))?;
95
96 Ok(())
97 }
98 }
99
100 async fn handshake_finalize(
101 Message(finalize): Message<HandshakeFinalize>,
102 State(connection_state): State<ConnectionState>,
103 _instance: AuthenticatedInstance,
104 ) -> Result<(), HandshakeError> {
105 if !finalize.success {
106 error!("Error during handshake, aborting connection");
107 return Err(Error::from(ConnectionError::Shutdown).into());
108 } else {
109 connection_state.handshaked.store(true, Ordering::SeqCst);
110
111 connection_state
112 .send(HandshakeFinalize { success: true })
113 .await
114 .map_err(|e| HandshakeError::SendError(e))?;
115
116 Ok(())
117 }
118 }
119
120 #[derive(Debug, thiserror::Error)]
121 pub enum HandshakeError {
122 #[error("version mismatch during handshake, ours: {0}, theirs: {1}")]
123 VersionMismatch(Version, Version),
124 #[error("while sending message: {0}")]
125 SendError(Error),
126 #[error("{0}")]
127 Other(#[from] Error),
128 }
129