JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

`[feature/plugins]` Some plugin work?

Amber - ⁨1⁩ year ago

parent: tbd commit: ⁨10a447b

⁨plugins/giterated-protocol/src/handlers.rs⁩ - ⁨7633⁩ bytes
Raw
1 use std::{fmt::Display, net::SocketAddr, str::FromStr};
2
3 use anyhow::Error;
4 use futures_util::{SinkExt, StreamExt};
5 use giterated_models::{
6 error::{NetworkOperationError, OperationError},
7 instance::Instance,
8 object::GiteratedObject,
9 operation::GiteratedOperation,
10 };
11 use giterated_plugin::{
12 new_stack::{handle::RuntimeHandle, OperationState},
13 AnyFailure, AnyObject, AnyOperation, AnySuccess,
14 };
15 use serde::{Deserialize, Serialize};
16 use tokio::net::TcpStream;
17 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
18
19 use crate::{Authenticated, GiteratedMessage, ProtocolState, RemoteError};
20
21 pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
22 _state: ProtocolState,
23 object: NetworkedObject,
24 operation: NetworkedOperation,
25 runtime: RuntimeHandle,
26 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
27 trace!("Handle network operation {}", operation.name);
28
29 runtime
30 .handle_serialized(&object.0, &operation.name, &operation.payload)
31 .await
32 }
33
34 #[derive(Clone, Debug, Serialize, Deserialize)]
35 pub struct NetworkedObject(pub String);
36
37 impl FromStr for NetworkedObject {
38 type Err = ();
39
40 fn from_str(s: &str) -> Result<Self, Self::Err> {
41 Ok(NetworkedObject(s.to_string()))
42 }
43 }
44
45 impl Display for NetworkedObject {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.write_str(&self.0)
48 }
49 }
50
51 impl GiteratedObject for NetworkedObject {
52 fn object_name() -> &'static str {
53 "networked_object"
54 }
55
56 fn from_object_str(_object_str: &str) -> Result<Self, Error> {
57 todo!()
58 }
59
60 fn home_uri(&self) -> String {
61 todo!()
62 }
63 }
64
65 #[derive(Clone, Debug, Serialize, Deserialize)]
66 pub struct NetworkedOperation {
67 pub name: String,
68 pub payload: Vec<u8>,
69 }
70
71 impl NetworkedOperation {
72 pub fn new(name: String, payload: Vec<u8>) -> Self {
73 Self { name, payload }
74 }
75 }
76
77 impl GiteratedOperation<NetworkedObject> for NetworkedOperation {
78 type Success = Vec<u8>;
79
80 type Failure = Vec<u8>;
81
82 fn operation_name() -> &'static str {
83 "networked_operation"
84 }
85 }
86
87 /// Handler which will attempt to resolve any operation that doesn't resolve locally
88 /// against a remote instance.
89 pub async fn try_handle_with_remote(
90 state: ProtocolState,
91 object: AnyObject,
92 operation: AnyOperation,
93 _runtime: RuntimeHandle,
94 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
95 // if object.is::<NetworkedObject>() {
96 // return Err(OperationError::Unhandled);
97 // }
98 // trace!(
99 // "Try handling object operation {}::{} with remote",
100 // object.kind(),
101 // operation.kind().operation_name
102 // );
103 // TODO:
104 // Ideally we support pass-through on object types that aren't used locally.
105 // For now, we aren't worrying about that.
106 let object_meta = object.vtable().clone();
107
108 let operation_meta = operation.vtable().clone();
109
110 // trace!(
111 // "Serializing with {}::{}",
112 // operation.kind().object_name,
113 // operation.kind().operation_name
114 // );
115
116 let object_home_uri = unsafe { (object_meta.home_uri)(&object) };
117
118 if let Some(home_uri) = state.home_uri {
119 if &home_uri == object_home_uri.as_ref() {
120 // This isn't a remote request, requests aren't supposed to hit this layer
121 // if they're not remote.
122 // warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(),
123 // operation.kind().operation_name);
124
125 return Err(OperationError::Unhandled);
126 }
127 }
128
129 // trace!(
130 // "Handling object operation {}::{} sending payload",
131 // object.kind(),
132 // operation.kind().operation_name
133 // );
134
135 let object = NetworkedObject(unsafe { (object_meta.to_str)(object).as_ref().to_string() });
136
137 let payload = unsafe { (operation_meta.serialize)(&operation) }.unwrap();
138 let payload = Vec::from(payload.as_ref());
139
140 let operation = NetworkedOperation::new(operation_meta.kind().to_string(), payload);
141
142 // let authenticated = Authenticated::new(object, operation);
143
144 let message = GiteratedMessage {
145 object,
146 operation: NetworkedOperation::operation_name().to_string(),
147 payload: operation,
148 };
149
150 let authenticated = Authenticated::new(message);
151
152 let mut socket: WebSocketStream<MaybeTlsStream<TcpStream>> = connect_to(
153 &Instance::from_str(&object_home_uri).unwrap(),
154 &Some(("127.0.0.1:1111").parse().unwrap()),
155 )
156 .await
157 .unwrap();
158
159 // TODO AUTH
160
161 let result: Result<Vec<u8>, OperationError<Vec<u8>>> =
162 send_expect(&mut socket, authenticated).await;
163
164 match result {
165 Ok(success) => {
166 let success = unsafe { (operation_meta.deserialize_success)(&success) }.unwrap();
167
168 Ok(success)
169 }
170 Err(err) => Err(match err {
171 OperationError::Operation(failure) => {
172 let failure = unsafe { (operation_meta.deserialize_failure)(&failure) }.unwrap();
173
174 OperationError::Operation(failure)
175 }
176 OperationError::Internal(internal) => OperationError::Internal(internal),
177 OperationError::Unhandled => OperationError::Unhandled,
178 }),
179 }
180 }
181
182 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
183
184 async fn connect_to(
185 instance: &Instance,
186
187 socket_addr: &Option<SocketAddr>,
188 ) -> Result<Socket, Error> {
189 if let Some(addr) = socket_addr {
190 info!(
191 "Connecting to {}",
192 format!("ws://{}/.giterated/daemon/", addr)
193 );
194
195 let (websocket, _response) =
196 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
197
198 info!("Connection established with {}", addr);
199
200 Ok(websocket)
201 } else {
202 info!(
203 "Connecting to {}",
204 format!("wss://{}/.giterated/daemon/", instance.0)
205 );
206
207 let (websocket, _response) =
208 connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?;
209
210 info!("Connection established with {}", instance.0);
211
212 Ok(websocket)
213 }
214 }
215
216 async fn send_expect<O: GiteratedObject, D: GiteratedOperation<O>>(
217 socket: &mut Socket,
218 message: Authenticated<O, D>,
219 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
220 let payload = bincode::serialize(&message.into_payload()).unwrap();
221
222 socket.send(Message::Binary(payload)).await.unwrap();
223
224 while let Some(message) = socket.next().await {
225 let payload = match message.unwrap() {
226 Message::Binary(payload) => payload,
227
228 _ => {
229 continue;
230 }
231 };
232
233 let raw_result =
234 bincode::deserialize::<Result<Vec<u8>, NetworkOperationError<Vec<u8>>>>(&payload)
235 .map_err(|e| OperationError::Internal(Error::from(e)))?;
236
237 trace!(
238 "Received response for networked operation {}::{}.",
239 O::object_name(),
240 D::operation_name()
241 );
242
243 return match raw_result {
244 Ok(success) => Ok(success),
245 Err(err) => Err(match err {
246 NetworkOperationError::Operation(operation_error) => {
247 OperationError::Operation(operation_error)
248 }
249 NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()),
250 NetworkOperationError::Unhandled => OperationError::Unhandled,
251 }),
252 };
253 }
254
255 panic!()
256 }
257