JavaScript is disabled, refresh for a better experience. ambee/giterated

ambee/giterated

Git repository hosting, collaboration, and discovery for the Fediverse.

MOre pre vtable changes

Amber - ⁨1⁩ year ago

parent: tbd commit: ⁨9cfa135

⁨plugins/giterated-protocol/src/handlers.rs⁩ - ⁨7625⁩ bytes
Raw
1 use std::{fmt::Display, net::SocketAddr, str::FromStr};
2
3 use anyhow::Error;
4 use futures_util::{SinkExt, StreamExt};
5 use giterated_models::{
6 error::{NetworkOperationError, OperationError},
7 instance::Instance,
8 object::GiteratedObject,
9 operation::GiteratedOperation,
10 };
11 use giterated_plugin::{
12 new_stack::handle::RuntimeHandle, AnyFailure, AnyObject, AnyOperation, AnySuccess,
13 };
14 use serde::{Deserialize, Serialize};
15 use tokio::net::TcpStream;
16 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
17
18 use crate::{Authenticated, GiteratedMessage, ProtocolState, RemoteError};
19
20 pub async fn handle_network_operation<OS: Send + Sync + Clone + 'static>(
21 _state: ProtocolState,
22 object: NetworkedObject,
23 operation: NetworkedOperation,
24 runtime: RuntimeHandle,
25 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
26 trace!("Handle network operation {}", operation.name);
27
28 runtime
29 .handle_serialized(&object.0, &operation.name, &operation.payload)
30 .await
31 }
32
33 #[derive(Clone, Debug, Serialize, Deserialize)]
34 pub struct NetworkedObject(pub String);
35
36 impl FromStr for NetworkedObject {
37 type Err = ();
38
39 fn from_str(s: &str) -> Result<Self, Self::Err> {
40 Ok(NetworkedObject(s.to_string()))
41 }
42 }
43
44 impl Display for NetworkedObject {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.write_str(&self.0)
47 }
48 }
49
50 impl GiteratedObject for NetworkedObject {
51 fn object_name() -> &'static str {
52 "networked_object"
53 }
54
55 fn from_object_str(_object_str: &str) -> Result<Self, Error> {
56 todo!()
57 }
58
59 fn home_uri(&self) -> String {
60 todo!()
61 }
62 }
63
64 #[derive(Clone, Debug, Serialize, Deserialize)]
65 pub struct NetworkedOperation {
66 pub name: String,
67 pub payload: Vec<u8>,
68 }
69
70 impl NetworkedOperation {
71 pub fn new(name: String, payload: Vec<u8>) -> Self {
72 Self { name, payload }
73 }
74 }
75
76 impl GiteratedOperation<NetworkedObject> for NetworkedOperation {
77 type Success = Vec<u8>;
78
79 type Failure = Vec<u8>;
80
81 fn operation_name() -> &'static str {
82 "networked_operation"
83 }
84 }
85
86 /// Handler which will attempt to resolve any operation that doesn't resolve locally
87 /// against a remote instance.
88 pub async fn try_handle_with_remote(
89 state: ProtocolState,
90 object: AnyObject,
91 operation: AnyOperation,
92 _runtime: RuntimeHandle,
93 ) -> Result<AnySuccess, OperationError<AnyFailure>> {
94 // if object.is::<NetworkedObject>() {
95 // return Err(OperationError::Unhandled);
96 // }
97 // trace!(
98 // "Try handling object operation {}::{} with remote",
99 // object.kind(),
100 // operation.kind().operation_name
101 // );
102 // TODO:
103 // Ideally we support pass-through on object types that aren't used locally.
104 // For now, we aren't worrying about that.
105 let object_meta = object.vtable();
106
107 let operation_meta = operation.vtable();
108
109 // trace!(
110 // "Serializing with {}::{}",
111 // operation.kind().object_name,
112 // operation.kind().operation_name
113 // );
114
115 let object_home_uri = unsafe { (object_meta.home_uri)(&object) };
116
117 if let Some(home_uri) = state.home_uri {
118 if &home_uri == object_home_uri.as_ref() {
119 // This isn't a remote request, requests aren't supposed to hit this layer
120 // if they're not remote.
121 // warn!("Try handling object operation {}::{}, resolved object home uri as local home uri. This is a bug.", object.kind(),
122 // operation.kind().operation_name);
123
124 return Err(OperationError::Unhandled);
125 }
126 }
127
128 // trace!(
129 // "Handling object operation {}::{} sending payload",
130 // object.kind(),
131 // operation.kind().operation_name
132 // );
133
134 // let object = NetworkedObject(unsafe { (object_meta.to_str)(object).as_ref().to_string() });
135
136 let object = todo!();
137
138 let payload = unsafe { (operation_meta.serialize)(&operation) }.unwrap();
139 let payload = Vec::from(payload.as_ref());
140
141 let operation = NetworkedOperation::new(operation_meta.kind().to_string(), payload);
142
143 // let authenticated = Authenticated::new(object, operation);
144
145 let message = GiteratedMessage {
146 object,
147 operation: NetworkedOperation::operation_name().to_string(),
148 payload: operation,
149 };
150
151 let authenticated = Authenticated::new(message);
152
153 let mut socket: WebSocketStream<MaybeTlsStream<TcpStream>> = connect_to(
154 &Instance::from_str(&object_home_uri).unwrap(),
155 &Some(("127.0.0.1:1111").parse().unwrap()),
156 )
157 .await
158 .unwrap();
159
160 // TODO AUTH
161
162 let result: Result<Vec<u8>, OperationError<Vec<u8>>> =
163 send_expect(&mut socket, authenticated).await;
164
165 match result {
166 Ok(success) => {
167 let success = unsafe { (operation_meta.deserialize_success)(&success) }.unwrap();
168
169 Ok(success)
170 }
171 Err(err) => Err(match err {
172 OperationError::Operation(failure) => {
173 let failure = unsafe { (operation_meta.deserialize_failure)(&failure) }.unwrap();
174
175 OperationError::Operation(failure)
176 }
177 OperationError::Internal(internal) => OperationError::Internal(internal),
178 OperationError::Unhandled => OperationError::Unhandled,
179 }),
180 }
181 }
182
183 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
184
185 async fn connect_to(
186 instance: &Instance,
187
188 socket_addr: &Option<SocketAddr>,
189 ) -> Result<Socket, Error> {
190 if let Some(addr) = socket_addr {
191 info!(
192 "Connecting to {}",
193 format!("ws://{}/.giterated/daemon/", addr)
194 );
195
196 let (websocket, _response) =
197 connect_async(&format!("ws://{}/.giterated/daemon/", addr)).await?;
198
199 info!("Connection established with {}", addr);
200
201 Ok(websocket)
202 } else {
203 info!(
204 "Connecting to {}",
205 format!("wss://{}/.giterated/daemon/", instance.0)
206 );
207
208 let (websocket, _response) =
209 connect_async(&format!("wss://{}/.giterated/daemon/", instance.0)).await?;
210
211 info!("Connection established with {}", instance.0);
212
213 Ok(websocket)
214 }
215 }
216
217 async fn send_expect<O: GiteratedObject, D: GiteratedOperation<O>>(
218 socket: &mut Socket,
219 message: Authenticated<O, D>,
220 ) -> Result<Vec<u8>, OperationError<Vec<u8>>> {
221 let payload = bincode::serialize(&message.into_payload()).unwrap();
222
223 socket.send(Message::Binary(payload)).await.unwrap();
224
225 while let Some(message) = socket.next().await {
226 let payload = match message.unwrap() {
227 Message::Binary(payload) => payload,
228
229 _ => {
230 continue;
231 }
232 };
233
234 let raw_result =
235 bincode::deserialize::<Result<Vec<u8>, NetworkOperationError<Vec<u8>>>>(&payload)
236 .map_err(|e| OperationError::Internal(Error::from(e)))?;
237
238 trace!(
239 "Received response for networked operation {}::{}.",
240 O::object_name(),
241 D::operation_name()
242 );
243
244 return match raw_result {
245 Ok(success) => Ok(success),
246 Err(err) => Err(match err {
247 NetworkOperationError::Operation(operation_error) => {
248 OperationError::Operation(operation_error)
249 }
250 NetworkOperationError::Internal => OperationError::Internal(RemoteError.into()),
251 NetworkOperationError::Unhandled => OperationError::Unhandled,
252 }),
253 };
254 }
255
256 panic!()
257 }
258