Skip to main content

danceinterpreter_rs/traktor_api/
server.rs

1use crate::async_utils::DroppingOnce;
2use crate::traktor_api::model::{
3    AppMessage, ConnectionResponse, InitializeRequest, ServerMessage, UpdateRequest,
4};
5use crate::traktor_api::{ID, StateUpdate};
6use bytes::Bytes;
7use iced::futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
8use iced::futures::channel::oneshot;
9use iced::futures::{SinkExt, Stream, StreamExt};
10use iced::futures::{TryFutureExt, stream};
11use serde::de::DeserializeOwned;
12use std::collections::HashMap;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::Mutex;
18use uuid::Uuid;
19use warp::Filter;
20use warp::http::StatusCode;
21
22const MAX_QUEUE_LENGTH: usize = 20;
23
24struct TraktorServer {
25    output: UnboundedSender<ServerMessage>,
26
27    debug_logging: bool,
28    session_id: String,
29
30    is_initialized: bool,
31    queue: Vec<StateUpdate>,
32
33    deck_files: [String; 4],
34    loaded_images: Vec<String>,
35    pending_images: Vec<String>,
36
37    cover_socket_id: usize,
38    cover_sockets: HashMap<usize, UnboundedSender<warp::ws::Message>>,
39}
40
41impl TraktorServer {
42    pub fn new(output: UnboundedSender<ServerMessage>) -> Self {
43        TraktorServer {
44            output,
45
46            debug_logging: false,
47            session_id: "".to_owned(),
48
49            is_initialized: false,
50            queue: Vec::new(),
51
52            deck_files: Default::default(),
53            loaded_images: Vec::new(),
54            pending_images: Vec::new(),
55
56            cover_socket_id: 0,
57            cover_sockets: HashMap::new(),
58        }
59    }
60
61    async fn send_message(&mut self, message: ServerMessage) {
62        let _ = self.output.send(message).await;
63    }
64
65    async fn send_messages(&mut self, messages: impl IntoIterator<Item = ServerMessage>) {
66        let _ = self
67            .output
68            .send_all(&mut stream::iter(messages).map(Ok))
69            .await;
70    }
71
72    pub async fn send_ready(&mut self, app_input_sender: UnboundedSender<AppMessage>) {
73        self.send_message(ServerMessage::Ready(app_input_sender))
74            .await
75    }
76
77    pub fn reconnect(&mut self, debug_logging: bool) {
78        self.session_id = Uuid::new_v4().to_string();
79        self.debug_logging = debug_logging;
80
81        self.is_initialized = false;
82        self.queue.clear();
83    }
84
85    fn get_required_images(&self) -> Vec<String> {
86        let mut required_images: Vec<String> = self
87            .deck_files
88            .iter()
89            .filter(|&f| !f.is_empty())
90            .map(|f| f.to_owned())
91            .collect();
92        required_images.dedup();
93
94        required_images
95    }
96
97    async fn on_update_deck_files(&mut self) {
98        let required_images = self.get_required_images();
99
100        self.loaded_images.retain(|i| required_images.contains(i));
101        self.pending_images.retain(|i| required_images.contains(i));
102
103        let new_images = required_images
104            .iter()
105            .filter(|&i| !self.loaded_images.contains(i) && !self.pending_images.contains(i));
106
107        for img in new_images {
108            for socket in self.cover_sockets.values_mut() {
109                _ = socket.send(warp::ws::Message::text(img)).await;
110            }
111        }
112    }
113
114    async fn handle_connect(&mut self) -> warp::reply::Json {
115        warp::reply::json(&ConnectionResponse {
116            session_id: self.session_id.to_owned(),
117            debug_logging: self.debug_logging,
118        })
119    }
120
121    async fn handle_init(&mut self, request: InitializeRequest) -> impl warp::Reply + use<> {
122        if request.session_id == self.session_id {
123            let time_offset_ms = SystemTime::now()
124                .duration_since(UNIX_EPOCH)
125                .map(|d| (request.timestamp as i64) - (d.as_millis() as i64))
126                .unwrap_or(0);
127
128            for i in 0..4 {
129                self.deck_files[i] = request.state.decks[i].content.file_path.clone();
130            }
131            self.on_update_deck_files().await;
132
133            self.send_message(ServerMessage::Connect {
134                time_offset_ms,
135                initial_state: Box::new(request.state),
136            })
137            .await;
138
139            let mut messages = self
140                .queue
141                .drain(..)
142                .map(ServerMessage::Update)
143                .collect::<Vec<_>>();
144            self.send_messages(messages.drain(..)).await;
145
146            self.is_initialized = true;
147        }
148
149        self.session_id.to_owned()
150    }
151
152    async fn handle_update(
153        &mut self,
154        session_id: String,
155        update: StateUpdate,
156    ) -> impl warp::Reply + use<> {
157        if session_id == self.session_id {
158            if let StateUpdate::DeckContent(id, content) = &update {
159                self.deck_files[*id as usize] = content.file_path.clone()
160            }
161            self.on_update_deck_files().await;
162
163            if self.is_initialized {
164                self.send_message(ServerMessage::Update(update)).await;
165            } else {
166                self.queue.push(update);
167
168                if self.queue.len() > MAX_QUEUE_LENGTH {
169                    self.reconnect(self.debug_logging);
170                }
171            }
172        }
173
174        self.session_id.to_owned()
175    }
176
177    async fn handle_cover(&mut self, path: String, data: Bytes) -> StatusCode {
178        if data.is_empty() {
179            return StatusCode::BAD_REQUEST;
180        }
181
182        if !self.get_required_images().contains(&path) {
183            return StatusCode::OK;
184        }
185
186        println!("cover received for \"{}\"", path);
187
188        self.pending_images.retain(|i| i != &path);
189        if !self.loaded_images.contains(&path) {
190            self.loaded_images.push(path.clone());
191        }
192
193        self.send_message(ServerMessage::CoverImage { path, data })
194            .await;
195        StatusCode::ACCEPTED
196    }
197
198    async fn handle_socket_connect(
199        &mut self,
200        mut tx: UnboundedSender<warp::ws::Message>,
201        addr: Option<SocketAddr>,
202    ) -> usize {
203        while self.cover_sockets.contains_key(&self.cover_socket_id) {
204            self.cover_socket_id += 1;
205        }
206
207        for img in &self.pending_images {
208            _ = tx.send(warp::ws::Message::text(img)).await;
209        }
210
211        self.cover_sockets.insert(self.cover_socket_id, tx);
212        self.send_message(ServerMessage::ClientChanged(addr)).await;
213        self.cover_socket_id
214    }
215
216    async fn handle_socket_disconnect(&mut self, id: usize) {
217        self.cover_sockets.remove(&id);
218        self.send_message(ServerMessage::ClientChanged(None)).await;
219    }
220
221    async fn handle_log(&mut self, msg: String) -> impl warp::Reply + use<> {
222        self.send_message(ServerMessage::Log(msg)).await;
223        StatusCode::CREATED
224    }
225}
226
227impl TraktorServer {
228    pub fn routes(
229        state: Arc<Mutex<Self>>,
230    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone + use<> {
231        Self::is_started(state.clone())
232            .and(
233                Self::route_connect(state.clone())
234                    .or(Self::route_init(state.clone()))
235                    .or(Self::route_update(state.clone()))
236                    .or(Self::route_log(state.clone()))
237                    .or(Self::route_cover(state.clone())),
238            )
239            .map(|_, reply| reply)
240    }
241
242    fn with_state(
243        state: Arc<Mutex<Self>>,
244    ) -> impl Filter<Extract = (Arc<Mutex<Self>>,), Error = Infallible> + Clone {
245        warp::any().map(move || state.clone())
246    }
247
248    fn is_started(
249        state: Arc<Mutex<Self>>,
250    ) -> impl Filter<Extract = ((),), Error = warp::Rejection> + Clone {
251        warp::any()
252            .and(Self::with_state(state))
253            .and_then(async |state: Arc<Mutex<Self>>| {
254                let state = state.lock().await;
255
256                if state.session_id.is_empty() {
257                    Err(warp::reject::not_found())
258                } else {
259                    Ok(())
260                }
261            })
262    }
263
264    fn json_body<T: DeserializeOwned + Send>()
265    -> impl Filter<Extract = (T,), Error = warp::Rejection> + Clone {
266        warp::body::json()
267    }
268
269    fn route_connect(
270        state: Arc<Mutex<Self>>,
271    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
272        warp::path!("connect")
273            .and(Self::with_state(state))
274            .then(async |state: Arc<Mutex<Self>>| state.lock().await.handle_connect().await)
275    }
276
277    fn route_init(
278        state: Arc<Mutex<Self>>,
279    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
280        warp::path!("init")
281            .and(warp::post())
282            .and(Self::with_state(state))
283            .and(Self::json_body())
284            .then(
285                async |state: Arc<Mutex<Self>>, request: InitializeRequest| {
286                    state.lock().await.handle_init(request).await
287                },
288            )
289    }
290
291    fn route_update(
292        state: Arc<Mutex<Self>>,
293    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
294        warp::path("update")
295            .and(warp::post())
296            .and(Self::with_state(state))
297            .and(Self::route_update_sub_routes())
298            .then(async |state: Arc<Mutex<Self>>, (session_id, update)| {
299                state.lock().await.handle_update(session_id, update).await
300            })
301    }
302
303    fn route_update_sub_routes()
304    -> impl Filter<Extract = ((String, StateUpdate),), Error = warp::Rejection> + Clone {
305        warp::path!("mixer")
306            .and(Self::json_body())
307            .then(async move |req: UpdateRequest<_>| {
308                (req.session_id, StateUpdate::Mixer(req.state))
309            })
310            .or(warp::path!("channel0").and(Self::json_body()).then(
311                async move |req: UpdateRequest<_>| {
312                    (req.session_id, StateUpdate::Channel(ID::A, req.state))
313                },
314            ))
315            .unify()
316            .or(warp::path!("channel1").and(Self::json_body()).then(
317                async move |req: UpdateRequest<_>| {
318                    (req.session_id, StateUpdate::Channel(ID::B, req.state))
319                },
320            ))
321            .unify()
322            .or(warp::path!("channel2").and(Self::json_body()).then(
323                async move |req: UpdateRequest<_>| {
324                    (req.session_id, StateUpdate::Channel(ID::C, req.state))
325                },
326            ))
327            .unify()
328            .or(warp::path!("channel3").and(Self::json_body()).then(
329                async move |req: UpdateRequest<_>| {
330                    (req.session_id, StateUpdate::Channel(ID::D, req.state))
331                },
332            ))
333            .unify()
334            .or(warp::path!("deck0content").and(Self::json_body()).then(
335                async move |req: UpdateRequest<_>| {
336                    (req.session_id, StateUpdate::DeckContent(ID::A, req.state))
337                },
338            ))
339            .unify()
340            .or(warp::path!("deck1content").and(Self::json_body()).then(
341                async move |req: UpdateRequest<_>| {
342                    (req.session_id, StateUpdate::DeckContent(ID::B, req.state))
343                },
344            ))
345            .unify()
346            .or(warp::path!("deck2content").and(Self::json_body()).then(
347                async move |req: UpdateRequest<_>| {
348                    (req.session_id, StateUpdate::DeckContent(ID::C, req.state))
349                },
350            ))
351            .unify()
352            .or(warp::path!("deck3content").and(Self::json_body()).then(
353                async move |req: UpdateRequest<_>| {
354                    (req.session_id, StateUpdate::DeckContent(ID::D, req.state))
355                },
356            ))
357            .unify()
358            .or(warp::path!("deck0playstate").and(Self::json_body()).then(
359                async move |req: UpdateRequest<_>| {
360                    (req.session_id, StateUpdate::DeckPlayState(ID::A, req.state))
361                },
362            ))
363            .unify()
364            .or(warp::path!("deck1playstate").and(Self::json_body()).then(
365                async move |req: UpdateRequest<_>| {
366                    (req.session_id, StateUpdate::DeckPlayState(ID::B, req.state))
367                },
368            ))
369            .unify()
370            .or(warp::path!("deck2playstate").and(Self::json_body()).then(
371                async move |req: UpdateRequest<_>| {
372                    (req.session_id, StateUpdate::DeckPlayState(ID::C, req.state))
373                },
374            ))
375            .unify()
376            .or(warp::path!("deck3playstate").and(Self::json_body()).then(
377                async move |req: UpdateRequest<_>| {
378                    (req.session_id, StateUpdate::DeckPlayState(ID::D, req.state))
379                },
380            ))
381            .unify()
382    }
383
384    fn route_cover(
385        state: Arc<Mutex<Self>>,
386    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
387        warp::path!("cover").and(
388            Self::route_cover_upload(state.clone()).or(Self::route_cover_socket(state.clone())),
389        )
390    }
391
392    fn route_cover_upload(
393        state: Arc<Mutex<Self>>,
394    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
395        warp::post()
396            .and(Self::with_state(state))
397            .and(warp::body::bytes())
398            .and(warp::query::<HashMap<String, String>>())
399            .then(
400                async |state: Arc<Mutex<Self>>, body: Bytes, query: HashMap<String, String>| {
401                    match query.get("path") {
402                        Some(path) => state.lock().await.handle_cover(path.to_owned(), body).await,
403                        None => StatusCode::BAD_REQUEST,
404                    }
405                },
406            )
407    }
408
409    fn route_cover_socket(
410        state: Arc<Mutex<Self>>,
411    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
412        warp::ws()
413            .and(Self::with_state(state))
414            .and(warp::addr::remote())
415            .map(
416                |ws: warp::ws::Ws, state: Arc<Mutex<Self>>, addr: Option<SocketAddr>| {
417                    ws.on_upgrade(move |socket| async move {
418                        let (mut ws_tx, mut ws_rx) = socket.split();
419                        let (tx, mut rx) = iced::futures::channel::mpsc::unbounded();
420
421                        tokio::task::spawn(async move {
422                            while let Some(message) = rx.next().await {
423                                ws_tx
424                                    .send(message)
425                                    .unwrap_or_else(|e| {
426                                        println!("websocket send error: {}", e);
427                                    })
428                                    .await;
429                            }
430                        });
431
432                        let socket_id = state.lock().await.handle_socket_connect(tx, addr).await;
433                        println!("websocket connected");
434
435                        while let Some(result) = ws_rx.next().await {
436                            match result {
437                                Ok(_) => {}
438                                Err(e) => {
439                                    println!("websocket error: {}", e);
440                                    break;
441                                }
442                            };
443                        }
444
445                        state.lock().await.handle_socket_disconnect(socket_id).await;
446                        println!("websocket disconnected");
447                    })
448                },
449            )
450    }
451
452    fn route_log(
453        state: Arc<Mutex<Self>>,
454    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
455        warp::path!("log")
456            .and(warp::post())
457            .and(Self::with_state(state))
458            .and(warp::body::bytes())
459            .then(async |state: Arc<Mutex<Self>>, body: Bytes| {
460                state
461                    .lock()
462                    .await
463                    .handle_log(String::from_utf8_lossy(&body).into_owned())
464                    .await
465            })
466    }
467}
468
469async fn server_main(
470    addr: SocketAddr,
471    output: UnboundedSender<ServerMessage>,
472    mut input: UnboundedReceiver<AppMessage>,
473    input_send: UnboundedSender<AppMessage>,
474    cancelled: oneshot::Receiver<()>,
475) {
476    let state = Arc::new(Mutex::new(TraktorServer::new(output)));
477    let routes = TraktorServer::routes(state.clone());
478
479    println!("starting traktor server on {}", addr);
480
481    let Ok(listener) = tokio::net::TcpListener::bind(addr).await else {
482        println!("could not start traktor server on {}", addr);
483        return;
484    };
485
486    let state_clone = state.clone();
487    let server = warp::serve(routes).incoming(listener).graceful(async move {
488        cancelled.await.ok();
489        for socket in state_clone.lock().await.cover_sockets.values_mut() {
490            let _ = socket.send(warp::ws::Message::close()).await;
491        }
492    });
493
494    tokio::task::spawn(server.run());
495
496    #[cfg(feature = "mdns")]
497    let service;
498    #[cfg(feature = "mdns")]
499    {
500        service = advertise_server(addr);
501        if service.is_err() {
502            drop(service)
503        }
504    }
505
506    state.lock().await.send_ready(input_send).await;
507    loop {
508        match input.select_next_some().await {
509            AppMessage::Reconnect { debug_logging } => state.lock().await.reconnect(debug_logging),
510        }
511    }
512}
513
514#[cfg(feature = "mdns")]
515fn advertise_server(addr: SocketAddr) -> Result<libmdns::Service, std::io::Error> {
516    let addr_vec = if !addr.ip().is_unspecified() {
517        [addr.ip()].to_vec()
518    } else {
519        Vec::new()
520    };
521    let responder = libmdns::Responder::new_with_ip_list(addr_vec)?;
522    let svc = responder.register(
523        "_http._tcp",
524        "traktor-di-webserver",
525        addr.port(),
526        &["path=/"],
527    );
528    println!("advertising traktor server on {}", addr);
529    Ok(svc)
530}
531
532pub fn run_server(addr: SocketAddr) -> impl Stream<Item = ServerMessage> {
533    let (output, output_receive) = iced::futures::channel::mpsc::unbounded();
534    let (input_send, input) = iced::futures::channel::mpsc::unbounded();
535    let (cancel, cancelled) = oneshot::channel();
536
537    let runner = DroppingOnce::new(
538        server_main(addr, output, input, input_send, cancelled),
539        move || {
540            println!("stopping traktor server on {}", addr);
541            let _ = cancel.send(());
542        },
543    )
544    .filter_map(|_| async { None });
545
546    stream::select(output_receive, runner)
547}
548
549////////////
550// Tests  //
551////////////
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use iced::futures::channel::mpsc as iced_mpsc;
556    use serde_json::json;
557    use tokio::time::{Duration, timeout};
558
559    // -- Shared test helpers --
560
561    /// Creates a fresh server state and returns the message receiver alongside it.
562    fn new_state() -> (Arc<Mutex<TraktorServer>>, UnboundedReceiver<ServerMessage>) {
563        let (tx, rx) = iced_mpsc::unbounded();
564        (Arc::new(Mutex::new(TraktorServer::new(tx))), rx)
565    }
566
567    /// JSON for a complete valid Traktor state.
568    /// `deck0` has `file_path` set; decks 1-3 are empty.
569    fn full_state_json(file_path: &str) -> serde_json::Value {
570        json!({
571            "mixer": {
572                "xFader": 0.5, "masterVolume": 1.0,
573                "cueVolume": 0.5, "cueMix": 0.5, "micVolume": 0.0
574            },
575            "channel0": {"cue": false, "volume": 1.0, "xFaderLeft": true, "xFaderRight": false},
576            "channel1": {"cue": false, "volume": 0.5, "xFaderLeft": false, "xFaderRight": true},
577            "channel2": {"cue": false, "volume": 0.0, "xFaderLeft": false, "xFaderRight": false},
578            "channel3": {"cue": false, "volume": 0.0, "xFaderLeft": false, "xFaderRight": false},
579            "deck0content": {
580                "isLoaded": true, "number": 1, "title": "Test Track",
581                "artist": "Test Artist", "album": "", "genre": "Waltz",
582                "comment": "", "comment2": "", "label": "", "key": "",
583                "filePath": file_path, "trackLength": 180.0, "bpm": 90.0
584            },
585            "deck1content": {
586                "isLoaded": false, "number": 0, "title": "", "artist": "",
587                "album": "", "genre": "", "comment": "", "comment2": "", "label": "",
588                "key": "", "filePath": "", "trackLength": 0.0, "bpm": 0.0
589            },
590            "deck2content": {
591                "isLoaded": false, "number": 0, "title": "", "artist": "",
592                "album": "", "genre": "", "comment": "", "comment2": "", "label": "",
593                "key": "", "filePath": "", "trackLength": 0.0, "bpm": 0.0
594            },
595            "deck3content": {
596                "isLoaded": false, "number": 0, "title": "", "artist": "",
597                "album": "", "genre": "", "comment": "", "comment2": "", "label": "",
598                "key": "", "filePath": "", "trackLength": 0.0, "bpm": 0.0
599            },
600            "deck0playstate": {"timestamp": 0, "position": 0.0, "speed": 1.0},
601            "deck1playstate": {"timestamp": 0, "position": 0.0, "speed": 0.0},
602            "deck2playstate": {"timestamp": 0, "position": 0.0, "speed": 0.0},
603            "deck3playstate": {"timestamp": 0, "position": 0.0, "speed": 0.0}
604        })
605    }
606
607    fn init_body(session_id: &str, file_path: &str) -> serde_json::Value {
608        json!({
609            "sessionId": session_id,
610            "timestamp": 1_700_000_000_000_u64,
611            "state": full_state_json(file_path)
612        })
613    }
614
615    fn mixer_update_body(session_id: &str) -> serde_json::Value {
616        json!({
617            "sessionId": session_id,
618            "state": {
619                "xFader": 0.3, "masterVolume": 0.8,
620                "cueVolume": 0.5, "cueMix": 0.0, "micVolume": 0.0
621            }
622        })
623    }
624
625    /// Awaits the next message with a 500 ms timeout, panicking on timeout.
626    async fn recv_msg(rx: &mut UnboundedReceiver<ServerMessage>) -> ServerMessage {
627        timeout(Duration::from_millis(500), rx.next())
628            .await
629            .expect("timeout waiting for ServerMessage")
630            .expect("channel closed unexpectedly")
631    }
632
633    /// Asserts that no message arrives within 500 ms.
634    async fn assert_no_msg(rx: &mut UnboundedReceiver<ServerMessage>) {
635        let result = timeout(Duration::from_millis(500), rx.next()).await;
636        assert!(
637            result.is_err(),
638            "expected no message but one arrived - {:?}",
639            result.unwrap()
640        );
641    }
642
643    // ==============================================================
644    // Unit tests — exercised through warp::test (no real TCP socket)
645    // ==============================================================
646
647    // -- Session guard --
648
649    /// Every route returns 404 while session_id is empty (server not yet
650    /// started via reconnect).
651    #[tokio::test]
652    async fn connect_before_reconnect_returns_404() {
653        let (state, _rx) = new_state();
654        let filter = TraktorServer::routes(state);
655
656        let res1 = warp::test::request().path("/connect").reply(&filter).await;
657        let res2 = warp::test::request().path("/update").reply(&filter).await;
658        let res3 = warp::test::request().path("/log").reply(&filter).await;
659        let res4 = warp::test::request().path("/cover").reply(&filter).await;
660
661        assert_eq!(res1.status(), 404);
662        assert_eq!(res2.status(), 404);
663        assert_eq!(res3.status(), 404);
664        assert_eq!(res4.status(), 404);
665    }
666
667    /// After reconnect the session_id is non-empty and /connect returns it.
668    #[tokio::test]
669    async fn connect_after_reconnect_returns_session_id_and_debug_flag() {
670        let (state, _rx) = new_state();
671        state.lock().await.reconnect(false);
672        let session_id = state.lock().await.session_id.clone();
673
674        let filter = TraktorServer::routes(state);
675        let res = warp::test::request().path("/connect").reply(&filter).await;
676
677        assert_eq!(res.status(), 200);
678        let body: serde_json::Value = serde_json::from_slice(res.body()).unwrap();
679        assert_eq!(body["sessionId"], session_id);
680        assert_eq!(body["debugLogging"], false);
681    }
682
683    /// Debug logging flag is reflected in the /connect response.
684    #[tokio::test]
685    async fn connect_returns_debug_logging_true_when_set() {
686        let (state, _rx) = new_state();
687        state.lock().await.reconnect(true);
688
689        let filter = TraktorServer::routes(state);
690        let res = warp::test::request().path("/connect").reply(&filter).await;
691
692        let body: serde_json::Value = serde_json::from_slice(res.body()).unwrap();
693        assert_eq!(body["debugLogging"], true);
694    }
695
696    /// Calling reconnect() again generates a new, different session ID.
697    #[tokio::test]
698    async fn reconnect_generates_new_session_id_each_time() {
699        let (state, _rx) = new_state();
700
701        state.lock().await.reconnect(false);
702        let filter = TraktorServer::routes(state.clone());
703        let res1 = warp::test::request().path("/connect").reply(&filter).await;
704        let body1: serde_json::Value = serde_json::from_slice(res1.body()).unwrap();
705
706        state.lock().await.reconnect(false);
707        let res2 = warp::test::request().path("/connect").reply(&filter).await;
708        let body2: serde_json::Value = serde_json::from_slice(res2.body()).unwrap();
709
710        assert_ne!(body1["sessionId"], body2["sessionId"]);
711    }
712
713    // -- /init endpoint --
714
715    /// A POST /init with the correct session ID emits ServerMessage::Connect
716    /// and returns the session ID as the response body.
717    #[tokio::test]
718    async fn init_correct_session_emits_connect_and_returns_session_id() {
719        let (state, mut rx) = new_state();
720        state.lock().await.reconnect(false);
721        let session_id = state.lock().await.session_id.clone();
722
723        let filter = TraktorServer::routes(state);
724        let body = init_body(&session_id, "/music/track.mp3");
725
726        let res = warp::test::request()
727            .method("POST")
728            .path("/init")
729            .json(&body)
730            .reply(&filter)
731            .await;
732
733        assert_eq!(res.status(), 200);
734        assert_eq!(res.body().as_ref(), session_id.as_bytes());
735
736        let msg = recv_msg(&mut rx).await;
737        assert!(matches!(msg, ServerMessage::Connect { .. }));
738    }
739
740    /// A POST /init with a wrong session ID is silently ignored.
741    #[tokio::test]
742    async fn init_wrong_session_id_is_ignored() {
743        let (state, mut rx) = new_state();
744        state.lock().await.reconnect(false);
745
746        let filter = TraktorServer::routes(state);
747        let body = init_body("00000000-0000-0000-0000-000000000000", "/music/track.mp3");
748
749        let res = warp::test::request()
750            .method("POST")
751            .path("/init")
752            .json(&body)
753            .reply(&filter)
754            .await;
755
756        // Server still returns its own session ID, but no Connect message.
757        assert_eq!(res.status(), 200);
758        assert_no_msg(&mut rx).await;
759    }
760
761    /// Updates queued before init are flushed (as Update messages) immediately
762    /// after a successful /init.
763    #[tokio::test]
764    async fn queued_updates_are_flushed_on_init() {
765        let (state, mut rx) = new_state();
766        state.lock().await.reconnect(false);
767        let session_id = state.lock().await.session_id.clone();
768
769        let filter = TraktorServer::routes(state);
770
771        // Queue two mixer updates before initializing
772        for _ in 0..2 {
773            warp::test::request()
774                .method("POST")
775                .path("/update/mixer")
776                .json(&mixer_update_body(&session_id))
777                .reply(&filter)
778                .await;
779        }
780
781        // Now init — Connect arrives, then the two queued Updates
782        warp::test::request()
783            .method("POST")
784            .path("/init")
785            .json(&init_body(&session_id, ""))
786            .reply(&filter)
787            .await;
788
789        let connect = recv_msg(&mut rx).await;
790        assert!(matches!(connect, ServerMessage::Connect { .. }));
791
792        let upd1 = recv_msg(&mut rx).await;
793        assert!(matches!(upd1, ServerMessage::Update(_)));
794
795        let upd2 = recv_msg(&mut rx).await;
796        assert!(matches!(upd2, ServerMessage::Update(_)));
797    }
798
799    // -- /update/* endpoints --
800
801    /// A POST /update/mixer with the correct session and after init emits
802    /// ServerMessage::Update(Mixer(…)).
803    #[tokio::test]
804    async fn update_mixer_correct_session_emits_update() {
805        let (state, mut rx) = new_state();
806        state.lock().await.reconnect(false);
807        let session_id = state.lock().await.session_id.clone();
808
809        let filter = TraktorServer::routes(state);
810
811        // Init first so is_initialized = true
812        warp::test::request()
813            .method("POST")
814            .path("/init")
815            .json(&init_body(&session_id, ""))
816            .reply(&filter)
817            .await;
818        let _ = recv_msg(&mut rx).await; // discard Connect
819
820        // Now send mixer update
821        let res = warp::test::request()
822            .method("POST")
823            .path("/update/mixer")
824            .json(&mixer_update_body(&session_id))
825            .reply(&filter)
826            .await;
827
828        assert_eq!(res.status(), 200);
829        let msg = recv_msg(&mut rx).await;
830        assert!(matches!(msg, ServerMessage::Update(StateUpdate::Mixer(_))));
831    }
832
833    /// An update with the wrong session ID is silently dropped.
834    #[tokio::test]
835    async fn update_wrong_session_id_is_dropped() {
836        let (state, mut rx) = new_state();
837        state.lock().await.reconnect(false);
838        let session_id = state.lock().await.session_id.clone();
839        let filter = TraktorServer::routes(state);
840
841        // Init to mark as initialized
842        warp::test::request()
843            .method("POST")
844            .path("/init")
845            .json(&init_body(&session_id, ""))
846            .reply(&filter)
847            .await;
848        let _ = recv_msg(&mut rx).await; // discard Connect
849
850        warp::test::request()
851            .method("POST")
852            .path("/update/mixer")
853            .json(&mixer_update_body("wrong-session-id"))
854            .reply(&filter)
855            .await;
856
857        assert_no_msg(&mut rx).await;
858    }
859
860    /// An update arriving before /init is placed in the queue and NOT forwarded.
861    #[tokio::test]
862    async fn update_before_init_goes_to_queue_not_forwarded() {
863        let (state, mut rx) = new_state();
864        state.lock().await.reconnect(false);
865        let session_id = state.lock().await.session_id.clone();
866        let filter = TraktorServer::routes(state.clone());
867
868        warp::test::request()
869            .method("POST")
870            .path("/update/mixer")
871            .json(&mixer_update_body(&session_id))
872            .reply(&filter)
873            .await;
874
875        assert_no_msg(&mut rx).await;
876        assert_eq!(state.lock().await.queue.len(), 1);
877    }
878
879    /// When the update queue exceeds MAX_QUEUE_LENGTH the server reconnects
880    /// (new session ID).
881    #[tokio::test]
882    async fn queue_overflow_triggers_reconnect() {
883        let (state, _rx) = new_state();
884        state.lock().await.reconnect(false);
885        let session_before = state.lock().await.session_id.clone();
886
887        let filter = TraktorServer::routes(state.clone());
888
889        // Send MAX_QUEUE_LENGTH + 1 updates without calling /init
890        for _ in 0..=MAX_QUEUE_LENGTH {
891            warp::test::request()
892                .method("POST")
893                .path("/update/mixer")
894                .json(&mixer_update_body(&session_before))
895                .reply(&filter)
896                .await;
897        }
898
899        let session_after = state.lock().await.session_id.clone();
900        assert_ne!(
901            session_before, session_after,
902            "session ID should change after queue overflow"
903        );
904    }
905
906    // -- /cover endpoint --
907
908    /// A non-empty POST /cover for a required file path emits CoverImage and
909    /// returns 202 Accepted.
910    #[tokio::test]
911    async fn cover_upload_for_required_path_emits_cover_image() {
912        let (state, mut rx) = new_state();
913        state.lock().await.reconnect(false);
914        let session_id = state.lock().await.session_id.clone();
915        let filter = TraktorServer::routes(state.clone());
916
917        // Init with a deck that has a known file path
918        warp::test::request()
919            .method("POST")
920            .path("/init")
921            .json(&init_body(&session_id, "/music/track.mp3"))
922            .reply(&filter)
923            .await;
924        let _ = recv_msg(&mut rx).await; // discard Connect
925
926        let res = warp::test::request()
927            .method("POST")
928            .path("/cover?path=/music/track.mp3")
929            .body(b"\xff\xd8\xff\xe0fake_jpeg".as_slice()) // non-empty body
930            .reply(&filter)
931            .await;
932
933        assert_eq!(res.status(), 202);
934        let msg = recv_msg(&mut rx).await;
935        assert!(
936            matches!(msg, ServerMessage::CoverImage { path, .. } if path == "/music/track.mp3")
937        );
938    }
939
940    /// An empty body returns 400 Bad Request and no message is emitted.
941    #[tokio::test]
942    async fn cover_upload_empty_body_returns_400() {
943        let (state, mut rx) = new_state();
944        state.lock().await.reconnect(false);
945        let session_id = state.lock().await.session_id.clone();
946        let filter = TraktorServer::routes(state.clone());
947
948        warp::test::request()
949            .method("POST")
950            .path("/init")
951            .json(&init_body(&session_id, "/music/track.mp3"))
952            .reply(&filter)
953            .await;
954        let _ = recv_msg(&mut rx).await;
955
956        let res = warp::test::request()
957            .method("POST")
958            .path("/cover?path=/music/track.mp3")
959            .body(b"".as_slice())
960            .reply(&filter)
961            .await;
962
963        assert_eq!(res.status(), 400);
964        assert_no_msg(&mut rx).await;
965    }
966
967    /// A cover upload for a path that is not loaded on any deck returns 200 OK
968    /// and no message is emitted.
969    #[tokio::test]
970    async fn cover_upload_for_unknown_path_returns_200_no_message() {
971        let (state, mut rx) = new_state();
972        state.lock().await.reconnect(false);
973        let session_id = state.lock().await.session_id.clone();
974        let filter = TraktorServer::routes(state.clone());
975
976        // Init with empty file paths
977        warp::test::request()
978            .method("POST")
979            .path("/init")
980            .json(&init_body(&session_id, ""))
981            .reply(&filter)
982            .await;
983        let _ = recv_msg(&mut rx).await;
984
985        let res = warp::test::request()
986            .method("POST")
987            .path("/cover?path=/music/not-loaded.mp3")
988            .body(b"\xff\xd8\xff".as_slice())
989            .reply(&filter)
990            .await;
991
992        assert_eq!(res.status(), 200);
993        assert_no_msg(&mut rx).await;
994    }
995
996    /// Missing `path` query parameter returns 400.
997    #[tokio::test]
998    async fn cover_upload_missing_path_query_param_returns_400() {
999        let (state, _rx) = new_state();
1000        state.lock().await.reconnect(false);
1001        let filter = TraktorServer::routes(state);
1002
1003        let res = warp::test::request()
1004            .method("POST")
1005            .path("/cover") // no ?path=...
1006            .body(b"data".as_slice())
1007            .reply(&filter)
1008            .await;
1009
1010        assert_eq!(res.status(), 400);
1011    }
1012
1013    // -- /log endpoint --
1014
1015    /// POST /log emits a Log message and returns 201.
1016    #[tokio::test]
1017    async fn log_emits_log_message() {
1018        let (state, mut rx) = new_state();
1019        state.lock().await.reconnect(true); // debug_logging = true
1020        let filter = TraktorServer::routes(state);
1021
1022        let res = warp::test::request()
1023            .method("POST")
1024            .path("/log")
1025            .body(b"deck A loaded")
1026            .reply(&filter)
1027            .await;
1028
1029        assert_eq!(res.status(), 201);
1030        let msg = recv_msg(&mut rx).await;
1031        assert!(matches!(msg, ServerMessage::Log(s) if s == "deck A loaded"));
1032    }
1033
1034    // ===========================================================
1035    // Integration tests — real TCP server driven via run_server()
1036    // ===========================================================
1037
1038    mod integration {
1039        use super::*;
1040        use iced::futures::StreamExt as IcedStreamExt;
1041        use iced::futures::channel::mpsc::UnboundedSender as IcedSender;
1042        use tokio::sync::mpsc as tokio_mpsc;
1043        use tokio::time::error::Elapsed;
1044        // ── Test server harness ───────────────────────────────────────────────
1045
1046        struct TestServer {
1047            pub addr: SocketAddr,
1048            pub session_id: String,
1049            messages: tokio_mpsc::UnboundedReceiver<ServerMessage>,
1050            app_tx: IcedSender<AppMessage>,
1051            _task: tokio::task::JoinHandle<()>,
1052        }
1053
1054        impl TestServer {
1055            /// Binds to a random loopback port, starts the server, waits for
1056            /// Ready, sends Reconnect, and fetches the resulting session ID.
1057            async fn start() -> Self {
1058                const MAX_START_ATTEMPTS: usize = 10;
1059                const READY_TIMEOUT: Duration = Duration::from_secs(2);
1060
1061                for attempt in 1..=MAX_START_ATTEMPTS {
1062                    // Pick a candidate loopback port for this attempt.
1063                    let port = {
1064                        let l = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1065                        l.local_addr().unwrap().port()
1066                    };
1067
1068                    let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
1069                    let (col_tx, col_rx) = tokio_mpsc::unbounded_channel::<ServerMessage>();
1070
1071                    let task = tokio::spawn(async move {
1072                        let mut stream = Box::pin(run_server(addr));
1073                        while let Some(msg) = stream.next().await {
1074                            if col_tx.send(msg).is_err() {
1075                                break;
1076                            }
1077                        }
1078                    });
1079
1080                    let mut server = Self {
1081                        addr,
1082                        session_id: String::new(),
1083                        messages: col_rx,
1084                        app_tx: Self::dummy_sender(), // replaced in wait_ready
1085                        _task: task,
1086                    };
1087
1088                    if timeout(READY_TIMEOUT, server.wait_ready()).await.is_ok() {
1089                        server.session_id = server.fetch_session_id().await;
1090                        return server;
1091                    }
1092
1093                    server._task.abort();
1094
1095                    if attempt == MAX_START_ATTEMPTS {
1096                        panic!(
1097                            "test server failed to reach Ready after {} attempts",
1098                            MAX_START_ATTEMPTS
1099                        );
1100                    }
1101                }
1102
1103                unreachable!("loop above must return or panic")
1104            }
1105
1106            fn dummy_sender() -> IcedSender<AppMessage> {
1107                let (tx, _) = iced::futures::channel::mpsc::unbounded();
1108                tx
1109            }
1110
1111            /// Waits for ServerMessage::Ready, then sends AppMessage::Reconnect
1112            /// so the server assigns a session ID.
1113            async fn wait_ready(&mut self) {
1114                timeout(Duration::from_secs(5), async {
1115                    loop {
1116                        match self.messages.recv().await {
1117                            Some(ServerMessage::Ready(tx)) => {
1118                                tx.unbounded_send(AppMessage::Reconnect {
1119                                    debug_logging: false,
1120                                })
1121                                .ok();
1122                                self.app_tx = tx;
1123                                // Give the server event loop time to process Reconnect.
1124                                tokio::time::sleep(Duration::from_millis(500)).await;
1125                                break;
1126                            }
1127                            Some(_) => {}
1128                            None => panic!("server stream ended before Ready"),
1129                        }
1130                    }
1131                })
1132                .await
1133                .expect("timed out waiting for server Ready");
1134            }
1135
1136            async fn fetch_session_id(&self) -> String {
1137                let url = format!("http://{}/connect", self.addr);
1138                timeout(Duration::from_secs(5), async {
1139                    loop {
1140                        let resp = reqwest::get(&url).await.expect("GET /connect failed");
1141                        if resp.status().is_success()
1142                            && let Ok(json) = resp.json::<serde_json::Value>().await
1143                            && let Some(session_id) = json.get("sessionId").and_then(|v| v.as_str())
1144                            && !session_id.is_empty()
1145                        {
1146                            break session_id.to_owned();
1147                        }
1148
1149                        tokio::time::sleep(Duration::from_millis(50)).await;
1150                    }
1151                })
1152                .await
1153                .expect("timed out waiting for /connect sessionId")
1154            }
1155
1156            /// Convenience: POST JSON to a path and return the response.
1157            async fn post_json(&self, path: &str, body: &serde_json::Value) -> reqwest::Response {
1158                let req = reqwest::Client::new()
1159                    .post(format!("http://{}{}", self.addr, path))
1160                    .json(body);
1161                req.send().await.expect("HTTP POST failed")
1162            }
1163
1164            /// Awaits the next ServerMessage with a 2-second timeout.
1165            async fn recv(&mut self) -> ServerMessage {
1166                timeout(Duration::from_secs(2), self.messages.recv())
1167                    .await
1168                    .expect("timeout waiting for ServerMessage")
1169                    .expect("message channel closed")
1170            }
1171        }
1172
1173        impl Drop for TestServer {
1174            fn drop(&mut self) {
1175                self._task.abort();
1176            }
1177        }
1178
1179        // ── Tests ──────────────────────────────────────────────────────────────
1180
1181        /// The server starts, binds its port, and responds to GET /connect with
1182        /// a valid UUID session ID.
1183        #[tokio::test]
1184        async fn server_starts_and_connect_returns_session_id() {
1185            let server = TestServer::start().await;
1186
1187            assert!(
1188                !server.session_id.is_empty(),
1189                "session ID should be non-empty"
1190            );
1191            // Verify UUID format (8-4-4-4-12)
1192            assert!(
1193                server.session_id.len() == 36
1194                    && server.session_id.chars().filter(|&c| c == '-').count() == 4,
1195                "session ID should look like a UUID, got: {}",
1196                server.session_id
1197            );
1198        }
1199
1200        /// Full handshake: POST /init → Connect message; POST /update/mixer →
1201        /// Update(Mixer) message.
1202        #[tokio::test]
1203        async fn full_handshake_and_mixer_update_round_trip() {
1204            let mut server = TestServer::start().await;
1205
1206            // Init
1207            let init = init_body(&server.session_id, "/music/track.mp3");
1208            let res = server.post_json("/init", &init).await;
1209            assert_eq!(res.status(), 200);
1210
1211            let msg = server.recv().await;
1212            assert!(matches!(msg, ServerMessage::Connect { .. }));
1213
1214            // Mixer update
1215            let mixer = mixer_update_body(&server.session_id);
1216            server.post_json("/update/mixer", &mixer).await;
1217
1218            let msg = server.recv().await;
1219            assert!(matches!(msg, ServerMessage::Update(StateUpdate::Mixer(_))));
1220        }
1221
1222        /// A WebSocket client connected to /cover receives the file path of a
1223        /// newly loaded deck when a deck content update arrives.
1224        #[tokio::test]
1225        async fn cover_websocket_receives_file_path_on_deck_update() {
1226            use futures_util::StreamExt as FutStreamExt;
1227
1228            let mut server = TestServer::start().await;
1229
1230            // Connect WebSocket BEFORE sending the deck update
1231            let ws_url = format!("ws://{}/cover", server.addr);
1232            let (mut ws_stream, _) = tokio_tungstenite::connect_async(&ws_url)
1233                .await
1234                .expect("WebSocket connection failed");
1235
1236            // Init with an empty state first (no file paths yet)
1237            let init = init_body(&server.session_id, "");
1238            server.post_json("/init", &init).await;
1239            let _ = server.recv().await; // discard Connect
1240
1241            // Now update deck0 content with a real file path
1242            let deck_update = json!({
1243                "sessionId": server.session_id,
1244                "state": {
1245                    "isLoaded": true, "number": 2, "title": "New Track",
1246                    "artist": "DJ Test", "album": "", "genre": "Tango",
1247                    "comment": "", "comment2": "", "label": "", "key": "",
1248                    "filePath": "/music/new_track.mp3",
1249                    "trackLength": 240.0, "bpm": 120.0
1250                }
1251            });
1252            server.post_json("/update/deck0content", &deck_update).await;
1253
1254            // The server should push the file path to all connected WebSocket clients
1255            let ws_msg = timeout(Duration::from_secs(2), ws_stream.next())
1256                .await
1257                .expect("WS timeout")
1258                .expect("WS stream ended")
1259                .expect("WS error");
1260
1261            assert_eq!(ws_msg.into_text().unwrap(), "/music/new_track.mp3");
1262        }
1263
1264        /// Dropping TestServer aborts the task; a subsequent bind to the same
1265        /// address should succeed (server released the port).
1266        #[tokio::test]
1267        async fn server_releases_port_after_drop() {
1268            let server = TestServer::start().await;
1269            let addr = server.addr;
1270
1271            drop(server);
1272
1273            // Give the OS a moment to reclaim the port
1274            tokio::time::sleep(Duration::from_millis(500)).await;
1275
1276            let rebind = timeout(Duration::from_secs(2), async {
1277                loop {
1278                    match tokio::net::TcpListener::bind(addr).await {
1279                        Ok(listener) => break Ok::<tokio::net::TcpListener, Elapsed>(listener),
1280                        Err(_) => tokio::time::sleep(Duration::from_millis(25)).await,
1281                    }
1282                }
1283            })
1284            .await;
1285
1286            assert!(
1287                matches!(rebind, Ok(Ok(_))),
1288                "port should be available after server drop"
1289            );
1290        }
1291
1292        // -- client tracking (cover WebSocket connections) --
1293
1294        /// Connecting a cover socket emits ClientsChanged carrying the remote
1295        /// address; disconnecting emits ClientsChanged with the client removed.
1296        #[tokio::test]
1297        async fn cover_loader_connect_disconnect_changes_state() {
1298            let (state, mut rx) = new_state();
1299            let addr: SocketAddr = "1.2.3.4:5678".parse().unwrap();
1300
1301            let (tx, _ws_rx) = iced_mpsc::unbounded();
1302            let id = state
1303                .lock()
1304                .await
1305                .handle_socket_connect(tx, Some(addr))
1306                .await;
1307
1308            let msg = recv_msg(&mut rx).await;
1309            assert!(matches!(
1310                &msg,
1311                ServerMessage::ClientChanged(addr)
1312                    if addr.is_some()
1313            ));
1314
1315            state.lock().await.handle_socket_disconnect(id).await;
1316
1317            let msg = recv_msg(&mut rx).await;
1318            assert!(matches!(
1319                &msg,
1320                ServerMessage::ClientChanged(client) if client.is_none()
1321            ));
1322        }
1323    }
1324}