1use crate::async_utils::DroppingOnce;
2use crate::traktor_api::model::{
3 AppMessage, ConnectionResponse, InitializeRequest, ServerMessage, UpdateRequest,
4};
5use crate::traktor_api::{ID, StateUpdate};
6use bytes::Bytes;
7use iced::futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
8use iced::futures::channel::oneshot;
9use iced::futures::{SinkExt, Stream, StreamExt};
10use iced::futures::{TryFutureExt, stream};
11use serde::de::DeserializeOwned;
12use std::collections::HashMap;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::Mutex;
18use uuid::Uuid;
19use warp::Filter;
20use warp::http::StatusCode;
21
22const MAX_QUEUE_LENGTH: usize = 20;
23
24struct TraktorServer {
25 output: UnboundedSender<ServerMessage>,
26
27 debug_logging: bool,
28 session_id: String,
29
30 is_initialized: bool,
31 queue: Vec<StateUpdate>,
32
33 deck_files: [String; 4],
34 loaded_images: Vec<String>,
35 pending_images: Vec<String>,
36
37 cover_socket_id: usize,
38 cover_sockets: HashMap<usize, UnboundedSender<warp::ws::Message>>,
39}
40
41impl TraktorServer {
42 pub fn new(output: UnboundedSender<ServerMessage>) -> Self {
43 TraktorServer {
44 output,
45
46 debug_logging: false,
47 session_id: "".to_owned(),
48
49 is_initialized: false,
50 queue: Vec::new(),
51
52 deck_files: Default::default(),
53 loaded_images: Vec::new(),
54 pending_images: Vec::new(),
55
56 cover_socket_id: 0,
57 cover_sockets: HashMap::new(),
58 }
59 }
60
61 async fn send_message(&mut self, message: ServerMessage) {
62 let _ = self.output.send(message).await;
63 }
64
65 async fn send_messages(&mut self, messages: impl IntoIterator<Item = ServerMessage>) {
66 let _ = self
67 .output
68 .send_all(&mut stream::iter(messages).map(Ok))
69 .await;
70 }
71
72 pub async fn send_ready(&mut self, app_input_sender: UnboundedSender<AppMessage>) {
73 self.send_message(ServerMessage::Ready(app_input_sender))
74 .await
75 }
76
77 pub fn reconnect(&mut self, debug_logging: bool) {
78 self.session_id = Uuid::new_v4().to_string();
79 self.debug_logging = debug_logging;
80
81 self.is_initialized = false;
82 self.queue.clear();
83 }
84
85 fn get_required_images(&self) -> Vec<String> {
86 let mut required_images: Vec<String> = self
87 .deck_files
88 .iter()
89 .filter(|&f| !f.is_empty())
90 .map(|f| f.to_owned())
91 .collect();
92 required_images.dedup();
93
94 required_images
95 }
96
97 async fn on_update_deck_files(&mut self) {
98 let required_images = self.get_required_images();
99
100 self.loaded_images.retain(|i| required_images.contains(i));
101 self.pending_images.retain(|i| required_images.contains(i));
102
103 let new_images = required_images
104 .iter()
105 .filter(|&i| !self.loaded_images.contains(i) && !self.pending_images.contains(i));
106
107 for img in new_images {
108 for socket in self.cover_sockets.values_mut() {
109 _ = socket.send(warp::ws::Message::text(img)).await;
110 }
111 }
112 }
113
114 async fn handle_connect(&mut self) -> warp::reply::Json {
115 warp::reply::json(&ConnectionResponse {
116 session_id: self.session_id.to_owned(),
117 debug_logging: self.debug_logging,
118 })
119 }
120
121 async fn handle_init(&mut self, request: InitializeRequest) -> impl warp::Reply + use<> {
122 if request.session_id == self.session_id {
123 let time_offset_ms = SystemTime::now()
124 .duration_since(UNIX_EPOCH)
125 .map(|d| (request.timestamp as i64) - (d.as_millis() as i64))
126 .unwrap_or(0);
127
128 for i in 0..4 {
129 self.deck_files[i] = request.state.decks[i].content.file_path.clone();
130 }
131 self.on_update_deck_files().await;
132
133 self.send_message(ServerMessage::Connect {
134 time_offset_ms,
135 initial_state: Box::new(request.state),
136 })
137 .await;
138
139 let mut messages = self
140 .queue
141 .drain(..)
142 .map(ServerMessage::Update)
143 .collect::<Vec<_>>();
144 self.send_messages(messages.drain(..)).await;
145
146 self.is_initialized = true;
147 }
148
149 self.session_id.to_owned()
150 }
151
152 async fn handle_update(
153 &mut self,
154 session_id: String,
155 update: StateUpdate,
156 ) -> impl warp::Reply + use<> {
157 if session_id == self.session_id {
158 if let StateUpdate::DeckContent(id, content) = &update {
159 self.deck_files[*id as usize] = content.file_path.clone()
160 }
161 self.on_update_deck_files().await;
162
163 if self.is_initialized {
164 self.send_message(ServerMessage::Update(update)).await;
165 } else {
166 self.queue.push(update);
167
168 if self.queue.len() > MAX_QUEUE_LENGTH {
169 self.reconnect(self.debug_logging);
170 }
171 }
172 }
173
174 self.session_id.to_owned()
175 }
176
177 async fn handle_cover(&mut self, path: String, data: Bytes) -> StatusCode {
178 if data.is_empty() {
179 return StatusCode::BAD_REQUEST;
180 }
181
182 if !self.get_required_images().contains(&path) {
183 return StatusCode::OK;
184 }
185
186 println!("cover received for \"{}\"", path);
187
188 self.pending_images.retain(|i| i != &path);
189 if !self.loaded_images.contains(&path) {
190 self.loaded_images.push(path.clone());
191 }
192
193 self.send_message(ServerMessage::CoverImage { path, data })
194 .await;
195 StatusCode::ACCEPTED
196 }
197
198 async fn handle_socket_connect(
199 &mut self,
200 mut tx: UnboundedSender<warp::ws::Message>,
201 addr: Option<SocketAddr>,
202 ) -> usize {
203 while self.cover_sockets.contains_key(&self.cover_socket_id) {
204 self.cover_socket_id += 1;
205 }
206
207 for img in &self.pending_images {
208 _ = tx.send(warp::ws::Message::text(img)).await;
209 }
210
211 self.cover_sockets.insert(self.cover_socket_id, tx);
212 self.send_message(ServerMessage::ClientChanged(addr)).await;
213 self.cover_socket_id
214 }
215
216 async fn handle_socket_disconnect(&mut self, id: usize) {
217 self.cover_sockets.remove(&id);
218 self.send_message(ServerMessage::ClientChanged(None)).await;
219 }
220
221 async fn handle_log(&mut self, msg: String) -> impl warp::Reply + use<> {
222 self.send_message(ServerMessage::Log(msg)).await;
223 StatusCode::CREATED
224 }
225}
226
227impl TraktorServer {
228 pub fn routes(
229 state: Arc<Mutex<Self>>,
230 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone + use<> {
231 Self::is_started(state.clone())
232 .and(
233 Self::route_connect(state.clone())
234 .or(Self::route_init(state.clone()))
235 .or(Self::route_update(state.clone()))
236 .or(Self::route_log(state.clone()))
237 .or(Self::route_cover(state.clone())),
238 )
239 .map(|_, reply| reply)
240 }
241
242 fn with_state(
243 state: Arc<Mutex<Self>>,
244 ) -> impl Filter<Extract = (Arc<Mutex<Self>>,), Error = Infallible> + Clone {
245 warp::any().map(move || state.clone())
246 }
247
248 fn is_started(
249 state: Arc<Mutex<Self>>,
250 ) -> impl Filter<Extract = ((),), Error = warp::Rejection> + Clone {
251 warp::any()
252 .and(Self::with_state(state))
253 .and_then(async |state: Arc<Mutex<Self>>| {
254 let state = state.lock().await;
255
256 if state.session_id.is_empty() {
257 Err(warp::reject::not_found())
258 } else {
259 Ok(())
260 }
261 })
262 }
263
264 fn json_body<T: DeserializeOwned + Send>()
265 -> impl Filter<Extract = (T,), Error = warp::Rejection> + Clone {
266 warp::body::json()
267 }
268
269 fn route_connect(
270 state: Arc<Mutex<Self>>,
271 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
272 warp::path!("connect")
273 .and(Self::with_state(state))
274 .then(async |state: Arc<Mutex<Self>>| state.lock().await.handle_connect().await)
275 }
276
277 fn route_init(
278 state: Arc<Mutex<Self>>,
279 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
280 warp::path!("init")
281 .and(warp::post())
282 .and(Self::with_state(state))
283 .and(Self::json_body())
284 .then(
285 async |state: Arc<Mutex<Self>>, request: InitializeRequest| {
286 state.lock().await.handle_init(request).await
287 },
288 )
289 }
290
291 fn route_update(
292 state: Arc<Mutex<Self>>,
293 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
294 warp::path("update")
295 .and(warp::post())
296 .and(Self::with_state(state))
297 .and(Self::route_update_sub_routes())
298 .then(async |state: Arc<Mutex<Self>>, (session_id, update)| {
299 state.lock().await.handle_update(session_id, update).await
300 })
301 }
302
303 fn route_update_sub_routes()
304 -> impl Filter<Extract = ((String, StateUpdate),), Error = warp::Rejection> + Clone {
305 warp::path!("mixer")
306 .and(Self::json_body())
307 .then(async move |req: UpdateRequest<_>| {
308 (req.session_id, StateUpdate::Mixer(req.state))
309 })
310 .or(warp::path!("channel0").and(Self::json_body()).then(
311 async move |req: UpdateRequest<_>| {
312 (req.session_id, StateUpdate::Channel(ID::A, req.state))
313 },
314 ))
315 .unify()
316 .or(warp::path!("channel1").and(Self::json_body()).then(
317 async move |req: UpdateRequest<_>| {
318 (req.session_id, StateUpdate::Channel(ID::B, req.state))
319 },
320 ))
321 .unify()
322 .or(warp::path!("channel2").and(Self::json_body()).then(
323 async move |req: UpdateRequest<_>| {
324 (req.session_id, StateUpdate::Channel(ID::C, req.state))
325 },
326 ))
327 .unify()
328 .or(warp::path!("channel3").and(Self::json_body()).then(
329 async move |req: UpdateRequest<_>| {
330 (req.session_id, StateUpdate::Channel(ID::D, req.state))
331 },
332 ))
333 .unify()
334 .or(warp::path!("deck0content").and(Self::json_body()).then(
335 async move |req: UpdateRequest<_>| {
336 (req.session_id, StateUpdate::DeckContent(ID::A, req.state))
337 },
338 ))
339 .unify()
340 .or(warp::path!("deck1content").and(Self::json_body()).then(
341 async move |req: UpdateRequest<_>| {
342 (req.session_id, StateUpdate::DeckContent(ID::B, req.state))
343 },
344 ))
345 .unify()
346 .or(warp::path!("deck2content").and(Self::json_body()).then(
347 async move |req: UpdateRequest<_>| {
348 (req.session_id, StateUpdate::DeckContent(ID::C, req.state))
349 },
350 ))
351 .unify()
352 .or(warp::path!("deck3content").and(Self::json_body()).then(
353 async move |req: UpdateRequest<_>| {
354 (req.session_id, StateUpdate::DeckContent(ID::D, req.state))
355 },
356 ))
357 .unify()
358 .or(warp::path!("deck0playstate").and(Self::json_body()).then(
359 async move |req: UpdateRequest<_>| {
360 (req.session_id, StateUpdate::DeckPlayState(ID::A, req.state))
361 },
362 ))
363 .unify()
364 .or(warp::path!("deck1playstate").and(Self::json_body()).then(
365 async move |req: UpdateRequest<_>| {
366 (req.session_id, StateUpdate::DeckPlayState(ID::B, req.state))
367 },
368 ))
369 .unify()
370 .or(warp::path!("deck2playstate").and(Self::json_body()).then(
371 async move |req: UpdateRequest<_>| {
372 (req.session_id, StateUpdate::DeckPlayState(ID::C, req.state))
373 },
374 ))
375 .unify()
376 .or(warp::path!("deck3playstate").and(Self::json_body()).then(
377 async move |req: UpdateRequest<_>| {
378 (req.session_id, StateUpdate::DeckPlayState(ID::D, req.state))
379 },
380 ))
381 .unify()
382 }
383
384 fn route_cover(
385 state: Arc<Mutex<Self>>,
386 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
387 warp::path!("cover").and(
388 Self::route_cover_upload(state.clone()).or(Self::route_cover_socket(state.clone())),
389 )
390 }
391
392 fn route_cover_upload(
393 state: Arc<Mutex<Self>>,
394 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
395 warp::post()
396 .and(Self::with_state(state))
397 .and(warp::body::bytes())
398 .and(warp::query::<HashMap<String, String>>())
399 .then(
400 async |state: Arc<Mutex<Self>>, body: Bytes, query: HashMap<String, String>| {
401 match query.get("path") {
402 Some(path) => state.lock().await.handle_cover(path.to_owned(), body).await,
403 None => StatusCode::BAD_REQUEST,
404 }
405 },
406 )
407 }
408
409 fn route_cover_socket(
410 state: Arc<Mutex<Self>>,
411 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
412 warp::ws()
413 .and(Self::with_state(state))
414 .and(warp::addr::remote())
415 .map(
416 |ws: warp::ws::Ws, state: Arc<Mutex<Self>>, addr: Option<SocketAddr>| {
417 ws.on_upgrade(move |socket| async move {
418 let (mut ws_tx, mut ws_rx) = socket.split();
419 let (tx, mut rx) = iced::futures::channel::mpsc::unbounded();
420
421 tokio::task::spawn(async move {
422 while let Some(message) = rx.next().await {
423 ws_tx
424 .send(message)
425 .unwrap_or_else(|e| {
426 println!("websocket send error: {}", e);
427 })
428 .await;
429 }
430 });
431
432 let socket_id = state.lock().await.handle_socket_connect(tx, addr).await;
433 println!("websocket connected");
434
435 while let Some(result) = ws_rx.next().await {
436 match result {
437 Ok(_) => {}
438 Err(e) => {
439 println!("websocket error: {}", e);
440 break;
441 }
442 };
443 }
444
445 state.lock().await.handle_socket_disconnect(socket_id).await;
446 println!("websocket disconnected");
447 })
448 },
449 )
450 }
451
452 fn route_log(
453 state: Arc<Mutex<Self>>,
454 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
455 warp::path!("log")
456 .and(warp::post())
457 .and(Self::with_state(state))
458 .and(warp::body::bytes())
459 .then(async |state: Arc<Mutex<Self>>, body: Bytes| {
460 state
461 .lock()
462 .await
463 .handle_log(String::from_utf8_lossy(&body).into_owned())
464 .await
465 })
466 }
467}
468
469async fn server_main(
470 addr: SocketAddr,
471 output: UnboundedSender<ServerMessage>,
472 mut input: UnboundedReceiver<AppMessage>,
473 input_send: UnboundedSender<AppMessage>,
474 cancelled: oneshot::Receiver<()>,
475) {
476 let state = Arc::new(Mutex::new(TraktorServer::new(output)));
477 let routes = TraktorServer::routes(state.clone());
478
479 println!("starting traktor server on {}", addr);
480
481 let Ok(listener) = tokio::net::TcpListener::bind(addr).await else {
482 println!("could not start traktor server on {}", addr);
483 return;
484 };
485
486 let state_clone = state.clone();
487 let server = warp::serve(routes).incoming(listener).graceful(async move {
488 cancelled.await.ok();
489 for socket in state_clone.lock().await.cover_sockets.values_mut() {
490 let _ = socket.send(warp::ws::Message::close()).await;
491 }
492 });
493
494 tokio::task::spawn(server.run());
495
496 #[cfg(feature = "mdns")]
497 let service;
498 #[cfg(feature = "mdns")]
499 {
500 service = advertise_server(addr);
501 if service.is_err() {
502 drop(service)
503 }
504 }
505
506 state.lock().await.send_ready(input_send).await;
507 loop {
508 match input.select_next_some().await {
509 AppMessage::Reconnect { debug_logging } => state.lock().await.reconnect(debug_logging),
510 }
511 }
512}
513
514#[cfg(feature = "mdns")]
515fn advertise_server(addr: SocketAddr) -> Result<libmdns::Service, std::io::Error> {
516 let addr_vec = if !addr.ip().is_unspecified() {
517 [addr.ip()].to_vec()
518 } else {
519 Vec::new()
520 };
521 let responder = libmdns::Responder::new_with_ip_list(addr_vec)?;
522 let svc = responder.register(
523 "_http._tcp",
524 "traktor-di-webserver",
525 addr.port(),
526 &["path=/"],
527 );
528 println!("advertising traktor server on {}", addr);
529 Ok(svc)
530}
531
532pub fn run_server(addr: SocketAddr) -> impl Stream<Item = ServerMessage> {
533 let (output, output_receive) = iced::futures::channel::mpsc::unbounded();
534 let (input_send, input) = iced::futures::channel::mpsc::unbounded();
535 let (cancel, cancelled) = oneshot::channel();
536
537 let runner = DroppingOnce::new(
538 server_main(addr, output, input, input_send, cancelled),
539 move || {
540 println!("stopping traktor server on {}", addr);
541 let _ = cancel.send(());
542 },
543 )
544 .filter_map(|_| async { None });
545
546 stream::select(output_receive, runner)
547}
548
549#[cfg(test)]
553mod tests {
554 use super::*;
555 use iced::futures::channel::mpsc as iced_mpsc;
556 use serde_json::json;
557 use tokio::time::{Duration, timeout};
558
559 fn new_state() -> (Arc<Mutex<TraktorServer>>, UnboundedReceiver<ServerMessage>) {
563 let (tx, rx) = iced_mpsc::unbounded();
564 (Arc::new(Mutex::new(TraktorServer::new(tx))), rx)
565 }
566
567 fn full_state_json(file_path: &str) -> serde_json::Value {
570 json!({
571 "mixer": {
572 "xFader": 0.5, "masterVolume": 1.0,
573 "cueVolume": 0.5, "cueMix": 0.5, "micVolume": 0.0
574 },
575 "channel0": {"cue": false, "volume": 1.0, "xFaderLeft": true, "xFaderRight": false},
576 "channel1": {"cue": false, "volume": 0.5, "xFaderLeft": false, "xFaderRight": true},
577 "channel2": {"cue": false, "volume": 0.0, "xFaderLeft": false, "xFaderRight": false},
578 "channel3": {"cue": false, "volume": 0.0, "xFaderLeft": false, "xFaderRight": false},
579 "deck0content": {
580 "isLoaded": true, "number": 1, "title": "Test Track",
581 "artist": "Test Artist", "album": "", "genre": "Waltz",
582 "comment": "", "comment2": "", "label": "", "key": "",
583 "filePath": file_path, "trackLength": 180.0, "bpm": 90.0
584 },
585 "deck1content": {
586 "isLoaded": false, "number": 0, "title": "", "artist": "",
587 "album": "", "genre": "", "comment": "", "comment2": "", "label": "",
588 "key": "", "filePath": "", "trackLength": 0.0, "bpm": 0.0
589 },
590 "deck2content": {
591 "isLoaded": false, "number": 0, "title": "", "artist": "",
592 "album": "", "genre": "", "comment": "", "comment2": "", "label": "",
593 "key": "", "filePath": "", "trackLength": 0.0, "bpm": 0.0
594 },
595 "deck3content": {
596 "isLoaded": false, "number": 0, "title": "", "artist": "",
597 "album": "", "genre": "", "comment": "", "comment2": "", "label": "",
598 "key": "", "filePath": "", "trackLength": 0.0, "bpm": 0.0
599 },
600 "deck0playstate": {"timestamp": 0, "position": 0.0, "speed": 1.0},
601 "deck1playstate": {"timestamp": 0, "position": 0.0, "speed": 0.0},
602 "deck2playstate": {"timestamp": 0, "position": 0.0, "speed": 0.0},
603 "deck3playstate": {"timestamp": 0, "position": 0.0, "speed": 0.0}
604 })
605 }
606
607 fn init_body(session_id: &str, file_path: &str) -> serde_json::Value {
608 json!({
609 "sessionId": session_id,
610 "timestamp": 1_700_000_000_000_u64,
611 "state": full_state_json(file_path)
612 })
613 }
614
615 fn mixer_update_body(session_id: &str) -> serde_json::Value {
616 json!({
617 "sessionId": session_id,
618 "state": {
619 "xFader": 0.3, "masterVolume": 0.8,
620 "cueVolume": 0.5, "cueMix": 0.0, "micVolume": 0.0
621 }
622 })
623 }
624
625 async fn recv_msg(rx: &mut UnboundedReceiver<ServerMessage>) -> ServerMessage {
627 timeout(Duration::from_millis(500), rx.next())
628 .await
629 .expect("timeout waiting for ServerMessage")
630 .expect("channel closed unexpectedly")
631 }
632
633 async fn assert_no_msg(rx: &mut UnboundedReceiver<ServerMessage>) {
635 let result = timeout(Duration::from_millis(500), rx.next()).await;
636 assert!(
637 result.is_err(),
638 "expected no message but one arrived - {:?}",
639 result.unwrap()
640 );
641 }
642
643 #[tokio::test]
652 async fn connect_before_reconnect_returns_404() {
653 let (state, _rx) = new_state();
654 let filter = TraktorServer::routes(state);
655
656 let res1 = warp::test::request().path("/connect").reply(&filter).await;
657 let res2 = warp::test::request().path("/update").reply(&filter).await;
658 let res3 = warp::test::request().path("/log").reply(&filter).await;
659 let res4 = warp::test::request().path("/cover").reply(&filter).await;
660
661 assert_eq!(res1.status(), 404);
662 assert_eq!(res2.status(), 404);
663 assert_eq!(res3.status(), 404);
664 assert_eq!(res4.status(), 404);
665 }
666
667 #[tokio::test]
669 async fn connect_after_reconnect_returns_session_id_and_debug_flag() {
670 let (state, _rx) = new_state();
671 state.lock().await.reconnect(false);
672 let session_id = state.lock().await.session_id.clone();
673
674 let filter = TraktorServer::routes(state);
675 let res = warp::test::request().path("/connect").reply(&filter).await;
676
677 assert_eq!(res.status(), 200);
678 let body: serde_json::Value = serde_json::from_slice(res.body()).unwrap();
679 assert_eq!(body["sessionId"], session_id);
680 assert_eq!(body["debugLogging"], false);
681 }
682
683 #[tokio::test]
685 async fn connect_returns_debug_logging_true_when_set() {
686 let (state, _rx) = new_state();
687 state.lock().await.reconnect(true);
688
689 let filter = TraktorServer::routes(state);
690 let res = warp::test::request().path("/connect").reply(&filter).await;
691
692 let body: serde_json::Value = serde_json::from_slice(res.body()).unwrap();
693 assert_eq!(body["debugLogging"], true);
694 }
695
696 #[tokio::test]
698 async fn reconnect_generates_new_session_id_each_time() {
699 let (state, _rx) = new_state();
700
701 state.lock().await.reconnect(false);
702 let filter = TraktorServer::routes(state.clone());
703 let res1 = warp::test::request().path("/connect").reply(&filter).await;
704 let body1: serde_json::Value = serde_json::from_slice(res1.body()).unwrap();
705
706 state.lock().await.reconnect(false);
707 let res2 = warp::test::request().path("/connect").reply(&filter).await;
708 let body2: serde_json::Value = serde_json::from_slice(res2.body()).unwrap();
709
710 assert_ne!(body1["sessionId"], body2["sessionId"]);
711 }
712
713 #[tokio::test]
718 async fn init_correct_session_emits_connect_and_returns_session_id() {
719 let (state, mut rx) = new_state();
720 state.lock().await.reconnect(false);
721 let session_id = state.lock().await.session_id.clone();
722
723 let filter = TraktorServer::routes(state);
724 let body = init_body(&session_id, "/music/track.mp3");
725
726 let res = warp::test::request()
727 .method("POST")
728 .path("/init")
729 .json(&body)
730 .reply(&filter)
731 .await;
732
733 assert_eq!(res.status(), 200);
734 assert_eq!(res.body().as_ref(), session_id.as_bytes());
735
736 let msg = recv_msg(&mut rx).await;
737 assert!(matches!(msg, ServerMessage::Connect { .. }));
738 }
739
740 #[tokio::test]
742 async fn init_wrong_session_id_is_ignored() {
743 let (state, mut rx) = new_state();
744 state.lock().await.reconnect(false);
745
746 let filter = TraktorServer::routes(state);
747 let body = init_body("00000000-0000-0000-0000-000000000000", "/music/track.mp3");
748
749 let res = warp::test::request()
750 .method("POST")
751 .path("/init")
752 .json(&body)
753 .reply(&filter)
754 .await;
755
756 assert_eq!(res.status(), 200);
758 assert_no_msg(&mut rx).await;
759 }
760
761 #[tokio::test]
764 async fn queued_updates_are_flushed_on_init() {
765 let (state, mut rx) = new_state();
766 state.lock().await.reconnect(false);
767 let session_id = state.lock().await.session_id.clone();
768
769 let filter = TraktorServer::routes(state);
770
771 for _ in 0..2 {
773 warp::test::request()
774 .method("POST")
775 .path("/update/mixer")
776 .json(&mixer_update_body(&session_id))
777 .reply(&filter)
778 .await;
779 }
780
781 warp::test::request()
783 .method("POST")
784 .path("/init")
785 .json(&init_body(&session_id, ""))
786 .reply(&filter)
787 .await;
788
789 let connect = recv_msg(&mut rx).await;
790 assert!(matches!(connect, ServerMessage::Connect { .. }));
791
792 let upd1 = recv_msg(&mut rx).await;
793 assert!(matches!(upd1, ServerMessage::Update(_)));
794
795 let upd2 = recv_msg(&mut rx).await;
796 assert!(matches!(upd2, ServerMessage::Update(_)));
797 }
798
799 #[tokio::test]
804 async fn update_mixer_correct_session_emits_update() {
805 let (state, mut rx) = new_state();
806 state.lock().await.reconnect(false);
807 let session_id = state.lock().await.session_id.clone();
808
809 let filter = TraktorServer::routes(state);
810
811 warp::test::request()
813 .method("POST")
814 .path("/init")
815 .json(&init_body(&session_id, ""))
816 .reply(&filter)
817 .await;
818 let _ = recv_msg(&mut rx).await; let res = warp::test::request()
822 .method("POST")
823 .path("/update/mixer")
824 .json(&mixer_update_body(&session_id))
825 .reply(&filter)
826 .await;
827
828 assert_eq!(res.status(), 200);
829 let msg = recv_msg(&mut rx).await;
830 assert!(matches!(msg, ServerMessage::Update(StateUpdate::Mixer(_))));
831 }
832
833 #[tokio::test]
835 async fn update_wrong_session_id_is_dropped() {
836 let (state, mut rx) = new_state();
837 state.lock().await.reconnect(false);
838 let session_id = state.lock().await.session_id.clone();
839 let filter = TraktorServer::routes(state);
840
841 warp::test::request()
843 .method("POST")
844 .path("/init")
845 .json(&init_body(&session_id, ""))
846 .reply(&filter)
847 .await;
848 let _ = recv_msg(&mut rx).await; warp::test::request()
851 .method("POST")
852 .path("/update/mixer")
853 .json(&mixer_update_body("wrong-session-id"))
854 .reply(&filter)
855 .await;
856
857 assert_no_msg(&mut rx).await;
858 }
859
860 #[tokio::test]
862 async fn update_before_init_goes_to_queue_not_forwarded() {
863 let (state, mut rx) = new_state();
864 state.lock().await.reconnect(false);
865 let session_id = state.lock().await.session_id.clone();
866 let filter = TraktorServer::routes(state.clone());
867
868 warp::test::request()
869 .method("POST")
870 .path("/update/mixer")
871 .json(&mixer_update_body(&session_id))
872 .reply(&filter)
873 .await;
874
875 assert_no_msg(&mut rx).await;
876 assert_eq!(state.lock().await.queue.len(), 1);
877 }
878
879 #[tokio::test]
882 async fn queue_overflow_triggers_reconnect() {
883 let (state, _rx) = new_state();
884 state.lock().await.reconnect(false);
885 let session_before = state.lock().await.session_id.clone();
886
887 let filter = TraktorServer::routes(state.clone());
888
889 for _ in 0..=MAX_QUEUE_LENGTH {
891 warp::test::request()
892 .method("POST")
893 .path("/update/mixer")
894 .json(&mixer_update_body(&session_before))
895 .reply(&filter)
896 .await;
897 }
898
899 let session_after = state.lock().await.session_id.clone();
900 assert_ne!(
901 session_before, session_after,
902 "session ID should change after queue overflow"
903 );
904 }
905
906 #[tokio::test]
911 async fn cover_upload_for_required_path_emits_cover_image() {
912 let (state, mut rx) = new_state();
913 state.lock().await.reconnect(false);
914 let session_id = state.lock().await.session_id.clone();
915 let filter = TraktorServer::routes(state.clone());
916
917 warp::test::request()
919 .method("POST")
920 .path("/init")
921 .json(&init_body(&session_id, "/music/track.mp3"))
922 .reply(&filter)
923 .await;
924 let _ = recv_msg(&mut rx).await; let res = warp::test::request()
927 .method("POST")
928 .path("/cover?path=/music/track.mp3")
929 .body(b"\xff\xd8\xff\xe0fake_jpeg".as_slice()) .reply(&filter)
931 .await;
932
933 assert_eq!(res.status(), 202);
934 let msg = recv_msg(&mut rx).await;
935 assert!(
936 matches!(msg, ServerMessage::CoverImage { path, .. } if path == "/music/track.mp3")
937 );
938 }
939
940 #[tokio::test]
942 async fn cover_upload_empty_body_returns_400() {
943 let (state, mut rx) = new_state();
944 state.lock().await.reconnect(false);
945 let session_id = state.lock().await.session_id.clone();
946 let filter = TraktorServer::routes(state.clone());
947
948 warp::test::request()
949 .method("POST")
950 .path("/init")
951 .json(&init_body(&session_id, "/music/track.mp3"))
952 .reply(&filter)
953 .await;
954 let _ = recv_msg(&mut rx).await;
955
956 let res = warp::test::request()
957 .method("POST")
958 .path("/cover?path=/music/track.mp3")
959 .body(b"".as_slice())
960 .reply(&filter)
961 .await;
962
963 assert_eq!(res.status(), 400);
964 assert_no_msg(&mut rx).await;
965 }
966
967 #[tokio::test]
970 async fn cover_upload_for_unknown_path_returns_200_no_message() {
971 let (state, mut rx) = new_state();
972 state.lock().await.reconnect(false);
973 let session_id = state.lock().await.session_id.clone();
974 let filter = TraktorServer::routes(state.clone());
975
976 warp::test::request()
978 .method("POST")
979 .path("/init")
980 .json(&init_body(&session_id, ""))
981 .reply(&filter)
982 .await;
983 let _ = recv_msg(&mut rx).await;
984
985 let res = warp::test::request()
986 .method("POST")
987 .path("/cover?path=/music/not-loaded.mp3")
988 .body(b"\xff\xd8\xff".as_slice())
989 .reply(&filter)
990 .await;
991
992 assert_eq!(res.status(), 200);
993 assert_no_msg(&mut rx).await;
994 }
995
996 #[tokio::test]
998 async fn cover_upload_missing_path_query_param_returns_400() {
999 let (state, _rx) = new_state();
1000 state.lock().await.reconnect(false);
1001 let filter = TraktorServer::routes(state);
1002
1003 let res = warp::test::request()
1004 .method("POST")
1005 .path("/cover") .body(b"data".as_slice())
1007 .reply(&filter)
1008 .await;
1009
1010 assert_eq!(res.status(), 400);
1011 }
1012
1013 #[tokio::test]
1017 async fn log_emits_log_message() {
1018 let (state, mut rx) = new_state();
1019 state.lock().await.reconnect(true); let filter = TraktorServer::routes(state);
1021
1022 let res = warp::test::request()
1023 .method("POST")
1024 .path("/log")
1025 .body(b"deck A loaded")
1026 .reply(&filter)
1027 .await;
1028
1029 assert_eq!(res.status(), 201);
1030 let msg = recv_msg(&mut rx).await;
1031 assert!(matches!(msg, ServerMessage::Log(s) if s == "deck A loaded"));
1032 }
1033
1034 mod integration {
1039 use super::*;
1040 use iced::futures::StreamExt as IcedStreamExt;
1041 use iced::futures::channel::mpsc::UnboundedSender as IcedSender;
1042 use tokio::sync::mpsc as tokio_mpsc;
1043 use tokio::time::error::Elapsed;
1044 struct TestServer {
1047 pub addr: SocketAddr,
1048 pub session_id: String,
1049 messages: tokio_mpsc::UnboundedReceiver<ServerMessage>,
1050 app_tx: IcedSender<AppMessage>,
1051 _task: tokio::task::JoinHandle<()>,
1052 }
1053
1054 impl TestServer {
1055 async fn start() -> Self {
1058 const MAX_START_ATTEMPTS: usize = 10;
1059 const READY_TIMEOUT: Duration = Duration::from_secs(2);
1060
1061 for attempt in 1..=MAX_START_ATTEMPTS {
1062 let port = {
1064 let l = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1065 l.local_addr().unwrap().port()
1066 };
1067
1068 let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
1069 let (col_tx, col_rx) = tokio_mpsc::unbounded_channel::<ServerMessage>();
1070
1071 let task = tokio::spawn(async move {
1072 let mut stream = Box::pin(run_server(addr));
1073 while let Some(msg) = stream.next().await {
1074 if col_tx.send(msg).is_err() {
1075 break;
1076 }
1077 }
1078 });
1079
1080 let mut server = Self {
1081 addr,
1082 session_id: String::new(),
1083 messages: col_rx,
1084 app_tx: Self::dummy_sender(), _task: task,
1086 };
1087
1088 if timeout(READY_TIMEOUT, server.wait_ready()).await.is_ok() {
1089 server.session_id = server.fetch_session_id().await;
1090 return server;
1091 }
1092
1093 server._task.abort();
1094
1095 if attempt == MAX_START_ATTEMPTS {
1096 panic!(
1097 "test server failed to reach Ready after {} attempts",
1098 MAX_START_ATTEMPTS
1099 );
1100 }
1101 }
1102
1103 unreachable!("loop above must return or panic")
1104 }
1105
1106 fn dummy_sender() -> IcedSender<AppMessage> {
1107 let (tx, _) = iced::futures::channel::mpsc::unbounded();
1108 tx
1109 }
1110
1111 async fn wait_ready(&mut self) {
1114 timeout(Duration::from_secs(5), async {
1115 loop {
1116 match self.messages.recv().await {
1117 Some(ServerMessage::Ready(tx)) => {
1118 tx.unbounded_send(AppMessage::Reconnect {
1119 debug_logging: false,
1120 })
1121 .ok();
1122 self.app_tx = tx;
1123 tokio::time::sleep(Duration::from_millis(500)).await;
1125 break;
1126 }
1127 Some(_) => {}
1128 None => panic!("server stream ended before Ready"),
1129 }
1130 }
1131 })
1132 .await
1133 .expect("timed out waiting for server Ready");
1134 }
1135
1136 async fn fetch_session_id(&self) -> String {
1137 let url = format!("http://{}/connect", self.addr);
1138 timeout(Duration::from_secs(5), async {
1139 loop {
1140 let resp = reqwest::get(&url).await.expect("GET /connect failed");
1141 if resp.status().is_success()
1142 && let Ok(json) = resp.json::<serde_json::Value>().await
1143 && let Some(session_id) = json.get("sessionId").and_then(|v| v.as_str())
1144 && !session_id.is_empty()
1145 {
1146 break session_id.to_owned();
1147 }
1148
1149 tokio::time::sleep(Duration::from_millis(50)).await;
1150 }
1151 })
1152 .await
1153 .expect("timed out waiting for /connect sessionId")
1154 }
1155
1156 async fn post_json(&self, path: &str, body: &serde_json::Value) -> reqwest::Response {
1158 let req = reqwest::Client::new()
1159 .post(format!("http://{}{}", self.addr, path))
1160 .json(body);
1161 req.send().await.expect("HTTP POST failed")
1162 }
1163
1164 async fn recv(&mut self) -> ServerMessage {
1166 timeout(Duration::from_secs(2), self.messages.recv())
1167 .await
1168 .expect("timeout waiting for ServerMessage")
1169 .expect("message channel closed")
1170 }
1171 }
1172
1173 impl Drop for TestServer {
1174 fn drop(&mut self) {
1175 self._task.abort();
1176 }
1177 }
1178
1179 #[tokio::test]
1184 async fn server_starts_and_connect_returns_session_id() {
1185 let server = TestServer::start().await;
1186
1187 assert!(
1188 !server.session_id.is_empty(),
1189 "session ID should be non-empty"
1190 );
1191 assert!(
1193 server.session_id.len() == 36
1194 && server.session_id.chars().filter(|&c| c == '-').count() == 4,
1195 "session ID should look like a UUID, got: {}",
1196 server.session_id
1197 );
1198 }
1199
1200 #[tokio::test]
1203 async fn full_handshake_and_mixer_update_round_trip() {
1204 let mut server = TestServer::start().await;
1205
1206 let init = init_body(&server.session_id, "/music/track.mp3");
1208 let res = server.post_json("/init", &init).await;
1209 assert_eq!(res.status(), 200);
1210
1211 let msg = server.recv().await;
1212 assert!(matches!(msg, ServerMessage::Connect { .. }));
1213
1214 let mixer = mixer_update_body(&server.session_id);
1216 server.post_json("/update/mixer", &mixer).await;
1217
1218 let msg = server.recv().await;
1219 assert!(matches!(msg, ServerMessage::Update(StateUpdate::Mixer(_))));
1220 }
1221
1222 #[tokio::test]
1225 async fn cover_websocket_receives_file_path_on_deck_update() {
1226 use futures_util::StreamExt as FutStreamExt;
1227
1228 let mut server = TestServer::start().await;
1229
1230 let ws_url = format!("ws://{}/cover", server.addr);
1232 let (mut ws_stream, _) = tokio_tungstenite::connect_async(&ws_url)
1233 .await
1234 .expect("WebSocket connection failed");
1235
1236 let init = init_body(&server.session_id, "");
1238 server.post_json("/init", &init).await;
1239 let _ = server.recv().await; let deck_update = json!({
1243 "sessionId": server.session_id,
1244 "state": {
1245 "isLoaded": true, "number": 2, "title": "New Track",
1246 "artist": "DJ Test", "album": "", "genre": "Tango",
1247 "comment": "", "comment2": "", "label": "", "key": "",
1248 "filePath": "/music/new_track.mp3",
1249 "trackLength": 240.0, "bpm": 120.0
1250 }
1251 });
1252 server.post_json("/update/deck0content", &deck_update).await;
1253
1254 let ws_msg = timeout(Duration::from_secs(2), ws_stream.next())
1256 .await
1257 .expect("WS timeout")
1258 .expect("WS stream ended")
1259 .expect("WS error");
1260
1261 assert_eq!(ws_msg.into_text().unwrap(), "/music/new_track.mp3");
1262 }
1263
1264 #[tokio::test]
1267 async fn server_releases_port_after_drop() {
1268 let server = TestServer::start().await;
1269 let addr = server.addr;
1270
1271 drop(server);
1272
1273 tokio::time::sleep(Duration::from_millis(500)).await;
1275
1276 let rebind = timeout(Duration::from_secs(2), async {
1277 loop {
1278 match tokio::net::TcpListener::bind(addr).await {
1279 Ok(listener) => break Ok::<tokio::net::TcpListener, Elapsed>(listener),
1280 Err(_) => tokio::time::sleep(Duration::from_millis(25)).await,
1281 }
1282 }
1283 })
1284 .await;
1285
1286 assert!(
1287 matches!(rebind, Ok(Ok(_))),
1288 "port should be available after server drop"
1289 );
1290 }
1291
1292 #[tokio::test]
1297 async fn cover_loader_connect_disconnect_changes_state() {
1298 let (state, mut rx) = new_state();
1299 let addr: SocketAddr = "1.2.3.4:5678".parse().unwrap();
1300
1301 let (tx, _ws_rx) = iced_mpsc::unbounded();
1302 let id = state
1303 .lock()
1304 .await
1305 .handle_socket_connect(tx, Some(addr))
1306 .await;
1307
1308 let msg = recv_msg(&mut rx).await;
1309 assert!(matches!(
1310 &msg,
1311 ServerMessage::ClientChanged(addr)
1312 if addr.is_some()
1313 ));
1314
1315 state.lock().await.handle_socket_disconnect(id).await;
1316
1317 let msg = recv_msg(&mut rx).await;
1318 assert!(matches!(
1319 &msg,
1320 ServerMessage::ClientChanged(client) if client.is_none()
1321 ));
1322 }
1323 }
1324}