JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

Clean dependencies

Amber - ⁨1⁩ year ago

parent: tbd commit: ⁨9403d3f

⁨plugins/giterated-protocol/src/handlers.rs⁩ - ⁨7570⁩ bytes
Raw
1 use std::{fmt::Display, net::SocketAddr, str::FromStr};
2
3 use anyhow::Error;
4 use futures_util::{SinkExt, StreamExt};
5 use giterated_models::{
6 error::{NetworkOperationError, OperationError},
7 instance::Instance,
8 object::GiteratedObject,
9 operation::GiteratedOperation,
10 };
11 use giterated_plugin::abi::vtable::runtime::RuntimeHandle;
12 use serde::{Deserialize, Serialize};
13 use tokio::net::TcpStream;
14 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
15
16 use crate::{Authenticated, GiteratedMessage, ProtocolState, RemoteError};
17
18 pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
19 _state: ProtocolState,
20 object: NetworkedObject,
21 operation: NetworkedOperation,
22 runtime: RuntimeHandle,
23 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
24 trace!("Handle network operation {}", operation.name);
25
26 runtime
27 .handle_serialized(&object.0, &operation.name, &operation.payload)
28 .await
29 }
30
31 #[derive(Clone, Debug, Serialize, Deserialize)]
32 pub struct NetworkedObject(pub String);
33
34 impl FromStr for NetworkedObject {
35 type Err = ();
36
37 fn from_str(s: &str) -> Result<Self, Self::Err> {
38 Ok(NetworkedObject(s.to_string()))
39 }
40 }
41
42 impl Display for NetworkedObject {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.write_str(&self.0)
45 }
46 }
47
48 impl GiteratedObject for NetworkedObject {
49 fn object_name() -> &'static str {
50 "networked_object"
51 }
52
53 fn from_object_str(_object_str: &str) -> Result<Self, Error> {
54 todo!()
55 }
56
57 fn home_uri(&self) -> String {
58 todo!()
59 }
60 }
61
62 #[derive(Clone, Debug, Serialize, Deserialize)]
63 pub struct NetworkedOperation {
64 pub name: String,
65 pub payload: Vec<u8>,
66 }
67
68 impl NetworkedOperation {
69 pub fn new(name: String, payload: Vec<u8>) -> Self {
70 Self { name, payload }
71 }
72 }
73
74 impl GiteratedOperation<NetworkedObject> for NetworkedOperation {
75 type Success = Vec<u8>;
76
77 type Failure = Vec<u8>;
78
79 fn operation_name() -> &'static str {
80 "networked_operation"
81 }
82 }
83
84 /// Handler which will attempt to resolve any operation that doesn't resolve locally
85 /// against a remote instance.
86 pub async fn try_handle_with_remote(
87 state: ProtocolState,
88 object: AnyObject,
89 operation: AnyOperation,
90 _runtime: RuntimeHandle,
91 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
92 // if object.is::<NetworkedObject>() {
93 // return Err(OperationError::Unhandled);
94 // }
95 // trace!(
96 // "Try handling object operation {}::{} with remote",
97 // object.kind(),
98 // operation.kind().operation_name
99 // );
100 // TODO:
101 // Ideally we support pass-through on object types that aren't used locally.
102 // For now, we aren't worrying about that.
103 let object_meta = object.vtable();
104
105 let operation_meta = operation.vtable();
106
107 // trace!(
108 // "Serializing with {}::{}",
109 // operation.kind().object_name,
110 // operation.kind().operation_name
111 // );
112
113 let object_home_uri = unsafe { (object_meta.home_uri)(&object) };
114
115 if let Some(home_uri) = state.home_uri {
116 if &home_uri == object_home_uri.as_ref() {
117 // This isn't a remote request, requests aren't supposed to hit this layer
118 // if they're not remote.
119 // warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(),
120 // operation.kind().operation_name);
121
122 return Err(OperationError::Unhandled);
123 }
124 }
125
126 // trace!(
127 // "Handling object operation {}::{} sending payload",
128 // object.kind(),
129 // operation.kind().operation_name
130 // );
131
132 // let object = NetworkedObject(unsafe { (object_meta.to_str)(object).as_ref().to_string() });
133
134 let object = todo!();
135
136 let payload = unsafe { (operation_meta.serialize)(&operation) }.unwrap();
137 let payload = Vec::from(payload.as_ref());
138
139 let operation = NetworkedOperation::new(operation_meta.kind().to_string(), payload);
140
141 // let authenticated = Authenticated::new(object, operation);
142
143 let message = GiteratedMessage {
144 object,
145 operation: NetworkedOperation::operation_name().to_string(),
146 payload: operation,
147 };
148
149 let authenticated = Authenticated::new(message);
150
151 let mut socket: WebSocketStream<MaybeTlsStream<TcpStream>> = connect_to(
152 &Instance::from_str(&object_home_uri).unwrap(),
153 &Some(("127.0.0.1:1111").parse().unwrap()),
154 )
155 .await
156 .unwrap();
157
158 // TODO AUTH
159
160 let result: Result<Vec<u8>, OperationError<Vec<u8>>> =
161 send_expect(&mut socket, authenticated).await;
162
163 match result {
164 Ok(success) => {
165 let success = unsafe { (operation_meta.deserialize_success)(&success) }.unwrap();
166
167 Ok(success)
168 }
169 Err(err) => Err(match err {
170 OperationError::Operation(failure) => {
171 let failure = unsafe { (operation_meta.deserialize_failure)(&failure) }.unwrap();
172
173 OperationError::Operation(failure)
174 }
175 OperationError::Internal(internal) => OperationError::Internal(internal),
176 OperationError::Unhandled => OperationError::Unhandled,
177 }),
178 }
179 }
180
181 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
182
183 async fn connect_to(
184 instance: &Instance,
185
186 socket_addr: &Option<SocketAddr>,
187 ) -> Result<Socket, Error> {
188 if let Some(addr) = socket_addr {
189 info!(
190 "Connecting to {}",
191 format!("ws://{}/.giterated/daemon/", addr)
192 );
193
194 let (websocket, _response) =
195 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
196
197 info!("Connection established with {}", addr);
198
199 Ok(websocket)
200 } else {
201 info!(
202 "Connecting to {}",
203 format!("wss://{}/.giterated/daemon/", instance.0)
204 );
205
206 let (websocket, _response) =
207 connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?;
208
209 info!("Connection established with {}", instance.0);
210
211 Ok(websocket)
212 }
213 }
214
215 async fn send_expect<O: GiteratedObject, D: GiteratedOperation<O>>(
216 socket: &mut Socket,
217 message: Authenticated<O, D>,
218 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
219 let payload = bincode::serialize(&message.into_payload()).unwrap();
220
221 socket.send(Message::Binary(payload)).await.unwrap();
222
223 while let Some(message) = socket.next().await {
224 let payload = match message.unwrap() {
225 Message::Binary(payload) => payload,
226
227 _ => {
228 continue;
229 }
230 };
231
232 let raw_result =
233 bincode::deserialize::<Result<Vec<u8>, NetworkOperationError<Vec<u8>>>>(&payload)
234 .map_err(|e| OperationError::Internal(Error::from(e)))?;
235
236 trace!(
237 "Received response for networked operation {}::{}.",
238 O::object_name(),
239 D::operation_name()
240 );
241
242 return match raw_result {
243 Ok(success) => Ok(success),
244 Err(err) => Err(match err {
245 NetworkOperationError::Operation(operation_error) => {
246 OperationError::Operation(operation_error)
247 }
248 NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()),
249 NetworkOperationError::Unhandled => OperationError::Unhandled,
250 }),
251 };
252 }
253
254 panic!()
255 }
256