JavaScript is disabled, refresh for a better experience. ambee/giterated-api

ambee/giterated-api

Git repository hosting, collaboration, and discovery for the Fediverse.

Initial Commit

Amber - ⁨2⁩ years ago

commit: ⁨780cc59

⁨src/lib.rs⁩ - ⁨6421⁩ bytes
Raw
1 use std::{error::Error, net::SocketAddr, str::FromStr};
2
3 use futures_util::{SinkExt, StreamExt};
4 use giterated_daemon::{
5 command::{
6 repository::{
7 CreateRepositoryCommand, RepositoryInfoRequest, RepositoryMessage,
8 RepositoryMessageKind, RepositoryRequest, RepositoryResponse,
9 },
10 MessageKind,
11 },
12 handshake::{HandshakeFinalize, HandshakeMessage, InitiateHandshake},
13 model::{
14 instance::Instance,
15 repository::{Repository, RepositoryView},
16 },
17 };
18 use serde::Serialize;
19 use tokio::{
20 io::{AsyncRead, AsyncWrite},
21 net::{TcpSocket, TcpStream},
22 };
23 use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
24
25 type Socket = WebSocketStream<MaybeTlsStream<TcpStream>>;
26
27 #[macro_use]
28 extern crate tracing;
29
30 pub struct GiteratedApi;
31
32 impl GiteratedApi {
33 pub async fn create_repository(
34 target: Instance,
35 request: CreateRepositoryCommand,
36 ) -> Result<bool, Box<dyn Error>> {
37 let mut socket = Self::connect_to(&target.url).await?;
38 let target = Repository {
39 name: request.name.clone(),
40 instance: target,
41 };
42
43 Self::send_message(
44 &MessageKind::Repository(RepositoryMessage {
45 target,
46 command: RepositoryMessageKind::Request(RepositoryRequest::CreateRepository(
47 request,
48 )),
49 }),
50 &mut socket,
51 )
52 .await?;
53
54 while let Ok(payload) = Self::next_payload(&mut socket).await {
55 if let Ok(MessageKind::Repository(RepositoryMessage {
56 command:
57 RepositoryMessageKind::Response(RepositoryResponse::CreateRepository(_response)),
58 ..
59 })) = serde_json::from_slice(&payload)
60 {
61 return Ok(true);
62 }
63 }
64
65 unreachable!()
66 }
67
68 pub async fn repository_info(repository: Repository) -> Result<RepositoryView, Box<dyn Error>> {
69 let mut socket = Self::connect_to(&repository.instance.url).await?;
70
71 Self::send_message(
72 &MessageKind::Repository(RepositoryMessage {
73 target: repository.clone(),
74 command: RepositoryMessageKind::Request(RepositoryRequest::RepositoryInfo(
75 RepositoryInfoRequest,
76 )),
77 }),
78 &mut socket,
79 )
80 .await?;
81
82 while let Ok(payload) = Self::next_payload(&mut socket).await {
83 if let Ok(MessageKind::Repository(RepositoryMessage {
84 command:
85 RepositoryMessageKind::Response(RepositoryResponse::RepositoryInfo(response)),
86 ..
87 })) = serde_json::from_slice(&payload)
88 {
89 return Ok(response);
90 }
91 }
92
93 unreachable!()
94 }
95
96 async fn connect_to(url: impl ToString) -> Result<Socket, Box<dyn Error>> {
97 let url = url.to_string();
98 let (mut websocket, _response) = connect_async(&url).await?;
99 Self::handle_handshake(&mut websocket).await?;
100
101 Ok(websocket)
102 }
103
104 async fn handle_handshake(socket: &mut Socket) -> Result<(), Box<dyn Error>> {
105 // Send handshake initiation
106
107 Self::send_message(
108 &MessageKind::Handshake(HandshakeMessage::Initiate(InitiateHandshake {
109 identity: Instance {
110 url: String::from("foo.com"),
111 },
112 version: String::from("0.1.0"),
113 })),
114 socket,
115 )
116 .await?;
117
118 while let Some(message) = socket.next().await {
119 let message = match message {
120 Ok(message) => message,
121 Err(err) => {
122 error!("Error reading message: {:?}", err);
123 continue;
124 }
125 };
126
127 let payload = match message {
128 Message::Text(text) => text.into_bytes(),
129 Message::Binary(bytes) => bytes,
130 Message::Ping(_) => continue,
131 Message::Pong(_) => continue,
132 Message::Close(_) => {
133 panic!()
134 }
135 _ => unreachable!(),
136 };
137
138 let message = match serde_json::from_slice::<MessageKind>(&payload) {
139 Ok(message) => message,
140 Err(err) => {
141 error!("Error deserializing message: {:?}", err);
142 continue;
143 }
144 };
145
146 if let MessageKind::Handshake(handshake) = message {
147 match handshake {
148 HandshakeMessage::Initiate(_) => unimplemented!(),
149 HandshakeMessage::Response(_) => {
150 // Send HandshakeMessage::Finalize
151 Self::send_message(
152 &MessageKind::Handshake(HandshakeMessage::Finalize(
153 HandshakeFinalize { success: true },
154 )),
155 socket,
156 )
157 .await?;
158 }
159 HandshakeMessage::Finalize(finalize) => {
160 if finalize.success {
161 return Ok(());
162 } else {
163 panic!()
164 }
165 }
166 }
167 }
168 }
169
170 Ok(())
171 }
172
173 async fn send_message<T: Serialize>(
174 message: &T,
175 socket: &mut Socket,
176 ) -> Result<(), Box<dyn Error>> {
177 socket
178 .send(Message::Binary(serde_json::to_vec(&message).unwrap()))
179 .await?;
180 Ok(())
181 }
182
183 async fn next_payload(socket: &mut Socket) -> Result<Vec<u8>, Box<dyn Error>> {
184 while let Some(message) = socket.next().await {
185 let message = message?;
186
187 match message {
188 Message::Text(text) => return Ok(text.into_bytes()),
189 Message::Binary(bytes) => return Ok(bytes),
190 Message::Ping(_) => continue,
191 Message::Pong(_) => continue,
192 Message::Close(_) => {
193 panic!()
194 }
195 _ => unreachable!(),
196 }
197 }
198
199 unreachable!()
200 }
201 }
202