1use crate::async_utils::DroppingOnce;
2use crate::traktor_api::model::{
3 AppMessage, ConnectionResponse, InitializeRequest, ServerMessage, UpdateRequest,
4};
5use crate::traktor_api::{StateUpdate, ID};
6use bytes::Bytes;
7use iced::futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
8use iced::futures::channel::oneshot;
9use iced::futures::{stream, TryFutureExt};
10use iced::futures::{SinkExt, Stream, StreamExt};
11use libmdns::{Responder, Service};
12use serde::de::DeserializeOwned;
13use std::collections::HashMap;
14use std::convert::Infallible;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use tokio::sync::Mutex;
19use uuid::Uuid;
20use warp::http::StatusCode;
21use warp::Filter;
22
23const MAX_QUEUE_LENGTH: usize = 20;
24
25struct TraktorServer {
26 output: UnboundedSender<ServerMessage>,
27
28 debug_logging: bool,
29 session_id: String,
30
31 is_initialized: bool,
32 queue: Vec<StateUpdate>,
33
34 deck_files: (String, String, String, String),
35 loaded_images: Vec<String>,
36 pending_images: Vec<String>,
37
38 cover_socket_id: usize,
39 cover_sockets: HashMap<usize, UnboundedSender<warp::ws::Message>>,
40}
41
42impl TraktorServer {
43 pub fn new(output: UnboundedSender<ServerMessage>) -> Self {
44 TraktorServer {
45 output,
46
47 debug_logging: false,
48 session_id: "".to_owned(),
49
50 is_initialized: false,
51 queue: Vec::new(),
52
53 deck_files: Default::default(),
54 loaded_images: Vec::new(),
55 pending_images: Vec::new(),
56
57 cover_socket_id: 0,
58 cover_sockets: HashMap::new(),
59 }
60 }
61
62 async fn send_message(&mut self, message: ServerMessage) {
63 let _ = self.output.send(message).await;
64 }
65
66 async fn send_messages(&mut self, messages: impl IntoIterator<Item=ServerMessage>) {
67 let _ = self
68 .output
69 .send_all(&mut stream::iter(messages).map(Ok))
70 .await;
71 }
72
73 pub async fn send_ready(&mut self, app_input_sender: UnboundedSender<AppMessage>) {
74 self.send_message(ServerMessage::Ready(app_input_sender))
75 .await
76 }
77
78 pub fn reconnect(&mut self, debug_logging: bool) {
79 self.session_id = Uuid::new_v4().to_string();
80 self.debug_logging = debug_logging;
81
82 self.is_initialized = false;
83 self.queue.clear();
84 }
85
86 fn get_required_images(&self) -> Vec<String> {
87 let mut required_images: Vec<String> = vec![
88 &self.deck_files.0,
89 &self.deck_files.1,
90 &self.deck_files.2,
91 &self.deck_files.3,
92 ]
93 .into_iter()
94 .filter(|&f| !f.is_empty())
95 .map(|f| f.to_owned())
96 .collect();
97 required_images.dedup();
98
99 required_images
100 }
101
102 async fn on_update_deck_files(&mut self) {
103 let required_images = self.get_required_images();
104
105 self.loaded_images.retain(|i| required_images.contains(i));
106 self.pending_images.retain(|i| required_images.contains(i));
107
108 let new_images = required_images
109 .iter()
110 .filter(|&i| !self.loaded_images.contains(i) && !self.pending_images.contains(i));
111
112 for img in new_images {
113 for socket in self.cover_sockets.values_mut() {
114 _ = socket.send(warp::ws::Message::text(img)).await;
115 }
116 }
117 }
118
119 async fn handle_connect(&mut self) -> warp::reply::Json {
120 warp::reply::json(&ConnectionResponse {
121 session_id: self.session_id.to_owned(),
122 debug_logging: self.debug_logging,
123 })
124 }
125
126 async fn handle_init(&mut self, request: InitializeRequest) -> impl warp::Reply + use<> {
127 if request.session_id == self.session_id {
128 let time_offset_ms = SystemTime::now()
129 .duration_since(UNIX_EPOCH)
130 .map(|d| (request.timestamp as i64) - (d.as_millis() as i64))
131 .unwrap_or(0);
132
133 self.deck_files.0 = request.state.decks.0.content.file_path.clone();
134 self.deck_files.1 = request.state.decks.1.content.file_path.clone();
135 self.deck_files.2 = request.state.decks.2.content.file_path.clone();
136 self.deck_files.3 = request.state.decks.3.content.file_path.clone();
137 self.on_update_deck_files().await;
138
139 self.send_message(ServerMessage::Connect {
140 time_offset_ms,
141 initial_state: Box::new(request.state),
142 })
143 .await;
144
145 let mut messages = self
146 .queue
147 .drain(..)
148 .map(ServerMessage::Update)
149 .collect::<Vec<_>>();
150 self.send_messages(messages.drain(..)).await;
151
152 self.is_initialized = true;
153 }
154
155 self.session_id.to_owned()
156 }
157
158 async fn handle_update(
159 &mut self,
160 session_id: String,
161 update: StateUpdate,
162 ) -> impl warp::Reply + use < > {
163 if session_id == self.session_id {
164 match &update {
165 StateUpdate::DeckContent(ID::A, content) => {
166 self.deck_files.0 = content.file_path.clone()
167 }
168 StateUpdate::DeckContent(ID::B, content) => {
169 self.deck_files.1 = content.file_path.clone()
170 }
171 StateUpdate::DeckContent(ID::C, content) => {
172 self.deck_files.2 = content.file_path.clone()
173 }
174 StateUpdate::DeckContent(ID::D, content) => {
175 self.deck_files.3 = content.file_path.clone()
176 }
177 _ => {}
178 }
179 self.on_update_deck_files().await;
180
181 if self.is_initialized {
182 self.send_message(ServerMessage::Update(update)).await;
183 } else {
184 self.queue.push(update);
185
186 if self.queue.len() > MAX_QUEUE_LENGTH {
187 self.reconnect(self.debug_logging);
188 }
189 }
190 }
191
192 self.session_id.to_owned()
193 }
194
195 async fn handle_cover(&mut self, path: String, data: Bytes) -> StatusCode {
196 if data.is_empty() {
197 return StatusCode::BAD_REQUEST;
198 }
199
200 if !self.get_required_images().contains(&path) {
201 return StatusCode::OK;
202 }
203
204 println!("cover received for \"{}\"", path);
205
206 self.pending_images.retain(|i| i != &path);
207 if !self.loaded_images.contains(&path) {
208 self.loaded_images.push(path.clone());
209 }
210
211 self.send_message(ServerMessage::CoverImage { path, data })
212 .await;
213 StatusCode::ACCEPTED
214 }
215
216 async fn handle_socket_connect(&mut self, mut tx: UnboundedSender<warp::ws::Message>) -> usize {
217 while self.cover_sockets.contains_key(&self.cover_socket_id) {
218 self.cover_socket_id += 1;
219 }
220
221 for img in &self.pending_images {
222 _ = tx.send(warp::ws::Message::text(img)).await;
223 }
224
225 self.cover_sockets.insert(self.cover_socket_id, tx);
226 self.cover_socket_id
227 }
228
229 fn handle_socket_disconnect(&mut self, id: usize) {
230 self.cover_sockets.remove(&id);
231 }
232
233 async fn handle_log(&mut self, msg: String) -> impl warp::Reply + use < > {
234 self.send_message(ServerMessage::Log(msg)).await;
235 StatusCode::CREATED
236 }
237}
238
239impl TraktorServer {
240 pub fn routes(
241 state: Arc<Mutex<Self>>,
242 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone + use < > {
243 Self::is_started(state.clone())
244 .and(
245 Self::route_connect(state.clone())
246 .or(Self::route_init(state.clone()))
247 .or(Self::route_update(state.clone()))
248 .or(Self::route_log(state.clone()))
249 .or(Self::route_cover(state.clone())),
250 )
251 .map(|_, reply| reply)
252 }
253
254 fn with_state(
255 state: Arc<Mutex<Self>>,
256 ) -> impl Filter<Extract=(Arc<Mutex<Self>>,), Error=Infallible> + Clone {
257 warp::any().map(move || state.clone())
258 }
259
260 fn is_started(
261 state: Arc<Mutex<Self>>,
262 ) -> impl Filter<Extract=((),), Error=warp::Rejection> + Clone {
263 warp::any()
264 .and(Self::with_state(state))
265 .and_then(async |state: Arc<Mutex<Self>>| {
266 let state = state.lock().await;
267
268 if state.session_id.is_empty() {
269 Err(warp::reject::not_found())
270 } else {
271 Ok(())
272 }
273 })
274 }
275
276 fn json_body<T: DeserializeOwned + Send>()
277 -> impl Filter<Extract=(T,), Error=warp::Rejection> + Clone {
278 warp::body::json()
279 }
280
281 fn route_connect(
282 state: Arc<Mutex<Self>>,
283 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
284 warp::path!("connect")
285 .and(Self::with_state(state))
286 .then(async |state: Arc<Mutex<Self>>| state.lock().await.handle_connect().await)
287 }
288
289 fn route_init(
290 state: Arc<Mutex<Self>>,
291 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
292 warp::path!("init")
293 .and(warp::post())
294 .and(Self::with_state(state))
295 .and(Self::json_body())
296 .then(
297 async |state: Arc<Mutex<Self>>, request: InitializeRequest| {
298 state.lock().await.handle_init(request).await
299 },
300 )
301 }
302
303 fn route_update(
304 state: Arc<Mutex<Self>>,
305 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
306 warp::path("update")
307 .and(warp::post())
308 .and(Self::with_state(state))
309 .and(Self::route_update_sub_routes())
310 .then(async |state: Arc<Mutex<Self>>, (session_id, update)| {
311 state.lock().await.handle_update(session_id, update).await
312 })
313 }
314
315 fn route_update_sub_routes()
316 -> impl Filter<Extract=((String, StateUpdate),), Error=warp::Rejection> + Clone {
317 warp::path!("mixer")
318 .and(Self::json_body())
319 .then(async move |req: UpdateRequest<_>| {
320 (req.session_id, StateUpdate::Mixer(req.state))
321 })
322 .or(warp::path!("channel0").and(Self::json_body()).then(
323 async move |req: UpdateRequest<_>| {
324 (req.session_id, StateUpdate::Channel(ID::A, req.state))
325 },
326 ))
327 .unify()
328 .or(warp::path!("channel1").and(Self::json_body()).then(
329 async move |req: UpdateRequest<_>| {
330 (req.session_id, StateUpdate::Channel(ID::B, req.state))
331 },
332 ))
333 .unify()
334 .or(warp::path!("channel2").and(Self::json_body()).then(
335 async move |req: UpdateRequest<_>| {
336 (req.session_id, StateUpdate::Channel(ID::C, req.state))
337 },
338 ))
339 .unify()
340 .or(warp::path!("channel3").and(Self::json_body()).then(
341 async move |req: UpdateRequest<_>| {
342 (req.session_id, StateUpdate::Channel(ID::D, req.state))
343 },
344 ))
345 .unify()
346 .or(warp::path!("deck0content").and(Self::json_body()).then(
347 async move |req: UpdateRequest<_>| {
348 (req.session_id, StateUpdate::DeckContent(ID::A, req.state))
349 },
350 ))
351 .unify()
352 .or(warp::path!("deck1content").and(Self::json_body()).then(
353 async move |req: UpdateRequest<_>| {
354 (req.session_id, StateUpdate::DeckContent(ID::B, req.state))
355 },
356 ))
357 .unify()
358 .or(warp::path!("deck2content").and(Self::json_body()).then(
359 async move |req: UpdateRequest<_>| {
360 (req.session_id, StateUpdate::DeckContent(ID::C, req.state))
361 },
362 ))
363 .unify()
364 .or(warp::path!("deck3content").and(Self::json_body()).then(
365 async move |req: UpdateRequest<_>| {
366 (req.session_id, StateUpdate::DeckContent(ID::D, req.state))
367 },
368 ))
369 .unify()
370 .or(warp::path!("deck0playstate").and(Self::json_body()).then(
371 async move |req: UpdateRequest<_>| {
372 (req.session_id, StateUpdate::DeckPlayState(ID::A, req.state))
373 },
374 ))
375 .unify()
376 .or(warp::path!("deck1playstate").and(Self::json_body()).then(
377 async move |req: UpdateRequest<_>| {
378 (req.session_id, StateUpdate::DeckPlayState(ID::B, req.state))
379 },
380 ))
381 .unify()
382 .or(warp::path!("deck2playstate").and(Self::json_body()).then(
383 async move |req: UpdateRequest<_>| {
384 (req.session_id, StateUpdate::DeckPlayState(ID::C, req.state))
385 },
386 ))
387 .unify()
388 .or(warp::path!("deck3playstate").and(Self::json_body()).then(
389 async move |req: UpdateRequest<_>| {
390 (req.session_id, StateUpdate::DeckPlayState(ID::D, req.state))
391 },
392 ))
393 .unify()
394 }
395
396 fn route_cover(
397 state: Arc<Mutex<Self>>,
398 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
399 warp::path!("cover").and(
400 Self::route_cover_upload(state.clone()).or(Self::route_cover_socket(state.clone())),
401 )
402 }
403
404 fn route_cover_upload(
405 state: Arc<Mutex<Self>>,
406 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
407 warp::post()
408 .and(Self::with_state(state))
409 .and(warp::body::bytes())
410 .and(warp::query::<HashMap<String, String>>())
411 .then(
412 async |state: Arc<Mutex<Self>>, body: Bytes, query: HashMap<String, String>| {
413 match query.get("path") {
414 Some(path) => state.lock().await.handle_cover(path.to_owned(), body).await,
415 None => StatusCode::BAD_REQUEST,
416 }
417 },
418 )
419 }
420
421 fn route_cover_socket(
422 state: Arc<Mutex<Self>>,
423 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
424 warp::ws()
425 .and(Self::with_state(state))
426 .map(|ws: warp::ws::Ws, state: Arc<Mutex<Self>>| {
427 ws.on_upgrade(move |socket| async move {
428 let (mut ws_tx, mut ws_rx) = socket.split();
429 let (tx, mut rx) = iced::futures::channel::mpsc::unbounded();
430
431 tokio::task::spawn(async move {
432 while let Some(message) = rx.next().await {
433 ws_tx
434 .send(message)
435 .unwrap_or_else(|e| {
436 println!("websocket send error: {}", e);
437 })
438 .await;
439 }
440 });
441
442 let socket_id = state.lock().await.handle_socket_connect(tx).await;
443 println!("websocket connected");
444
445 while let Some(result) = ws_rx.next().await {
446 match result {
447 Ok(_) => {}
448 Err(e) => {
449 println!("websocket error: {}", e);
450 break;
451 }
452 };
453 }
454
455 state.lock().await.handle_socket_disconnect(socket_id);
456 println!("websocket disconnected");
457 })
458 })
459 }
460
461 fn route_log(
462 state: Arc<Mutex<Self>>,
463 ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
464 warp::path!("log")
465 .and(warp::post())
466 .and(Self::with_state(state))
467 .and(warp::body::bytes())
468 .then(async |state: Arc<Mutex<Self>>, body: Bytes| {
469 state
470 .lock()
471 .await
472 .handle_log(String::from_utf8_lossy(&body).into_owned())
473 .await
474 })
475 }
476}
477
478async fn server_main(
479 addr: SocketAddr,
480 output: UnboundedSender<ServerMessage>,
481 mut input: UnboundedReceiver<AppMessage>,
482 input_send: UnboundedSender<AppMessage>,
483 cancelled: oneshot::Receiver<()>,
484) {
485 let state = Arc::new(Mutex::new(TraktorServer::new(output)));
486 let routes = TraktorServer::routes(state.clone());
487
488 println!("starting traktor server on {}", addr);
489 let service = advertise_server(addr);
490
491 let Ok(listener) = tokio::net::TcpListener::bind(addr).await else {
492 println!("could not start traktor server on {}", addr);
493
494 drop(service);
495 return;
496 };
497 let server = warp::serve(routes).incoming(listener).graceful(async {
498 cancelled.await.ok();
499 });
500
501 tokio::task::spawn(server.run());
502
503 state.lock().await.send_ready(input_send).await;
504 loop {
505 match input.select_next_some().await {
506 AppMessage::Reconnect { debug_logging } => state.lock().await.reconnect(debug_logging),
507 }
508 }
509}
510
511fn advertise_server(addr: SocketAddr) -> Service {
512 let addr_vec = if !addr.ip().is_unspecified() {
513 [addr.ip()].to_vec()
514 } else {
515 Vec::new()
516 };
517 let responder = Responder::new_with_ip_list(addr_vec).expect("could not create responder");
518 let svc = responder.register(
519 "_http._tcp",
520 "traktor-di-webserver",
521 addr.port(),
522 &["path=/"],
523 );
524 println!("advertising traktor server on {}", addr);
525 svc
526}
527
528pub fn run_server(addr: SocketAddr) -> impl Stream<Item=ServerMessage> {
529 let (output, output_receive) = iced::futures::channel::mpsc::unbounded();
530 let (input_send, input) = iced::futures::channel::mpsc::unbounded();
531 let (cancel, cancelled) = oneshot::channel();
532
533 let runner = DroppingOnce::new(
534 server_main(addr, output, input, input_send, cancelled),
535 move || {
536 println!("stopping traktor server on {}", addr);
537 let _ = cancel.send(());
538 },
539 )
540 .filter_map(|_| async { None });
541
542 stream::select(output_receive, runner)
543}