danceinterpreter_rs/traktor_api/
server.rs

1use crate::async_utils::DroppingOnce;
2use crate::traktor_api::model::{
3    AppMessage, ConnectionResponse, InitializeRequest, ServerMessage, UpdateRequest,
4};
5use crate::traktor_api::{StateUpdate, ID};
6use bytes::Bytes;
7use iced::futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
8use iced::futures::channel::oneshot;
9use iced::futures::{stream, TryFutureExt};
10use iced::futures::{SinkExt, Stream, StreamExt};
11use libmdns::{Responder, Service};
12use serde::de::DeserializeOwned;
13use std::collections::HashMap;
14use std::convert::Infallible;
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use tokio::sync::Mutex;
19use uuid::Uuid;
20use warp::http::StatusCode;
21use warp::Filter;
22
23const MAX_QUEUE_LENGTH: usize = 20;
24
25struct TraktorServer {
26    output: UnboundedSender<ServerMessage>,
27
28    debug_logging: bool,
29    session_id: String,
30
31    is_initialized: bool,
32    queue: Vec<StateUpdate>,
33
34    deck_files: (String, String, String, String),
35    loaded_images: Vec<String>,
36    pending_images: Vec<String>,
37
38    cover_socket_id: usize,
39    cover_sockets: HashMap<usize, UnboundedSender<warp::ws::Message>>,
40}
41
42impl TraktorServer {
43    pub fn new(output: UnboundedSender<ServerMessage>) -> Self {
44        TraktorServer {
45            output,
46
47            debug_logging: false,
48            session_id: "".to_owned(),
49
50            is_initialized: false,
51            queue: Vec::new(),
52
53            deck_files: Default::default(),
54            loaded_images: Vec::new(),
55            pending_images: Vec::new(),
56
57            cover_socket_id: 0,
58            cover_sockets: HashMap::new(),
59        }
60    }
61
62    async fn send_message(&mut self, message: ServerMessage) {
63        let _ = self.output.send(message).await;
64    }
65
66    async fn send_messages(&mut self, messages: impl IntoIterator<Item=ServerMessage>) {
67        let _ = self
68            .output
69            .send_all(&mut stream::iter(messages).map(Ok))
70            .await;
71    }
72
73    pub async fn send_ready(&mut self, app_input_sender: UnboundedSender<AppMessage>) {
74        self.send_message(ServerMessage::Ready(app_input_sender))
75            .await
76    }
77
78    pub fn reconnect(&mut self, debug_logging: bool) {
79        self.session_id = Uuid::new_v4().to_string();
80        self.debug_logging = debug_logging;
81
82        self.is_initialized = false;
83        self.queue.clear();
84    }
85
86    fn get_required_images(&self) -> Vec<String> {
87        let mut required_images: Vec<String> = vec![
88            &self.deck_files.0,
89            &self.deck_files.1,
90            &self.deck_files.2,
91            &self.deck_files.3,
92        ]
93            .into_iter()
94            .filter(|&f| !f.is_empty())
95            .map(|f| f.to_owned())
96            .collect();
97        required_images.dedup();
98
99        required_images
100    }
101
102    async fn on_update_deck_files(&mut self) {
103        let required_images = self.get_required_images();
104
105        self.loaded_images.retain(|i| required_images.contains(i));
106        self.pending_images.retain(|i| required_images.contains(i));
107
108        let new_images = required_images
109            .iter()
110            .filter(|&i| !self.loaded_images.contains(i) && !self.pending_images.contains(i));
111
112        for img in new_images {
113            for socket in self.cover_sockets.values_mut() {
114                _ = socket.send(warp::ws::Message::text(img)).await;
115            }
116        }
117    }
118
119    async fn handle_connect(&mut self) -> warp::reply::Json {
120        warp::reply::json(&ConnectionResponse {
121            session_id: self.session_id.to_owned(),
122            debug_logging: self.debug_logging,
123        })
124    }
125
126    async fn handle_init(&mut self, request: InitializeRequest) -> impl warp::Reply + use<> {
127        if request.session_id == self.session_id {
128            let time_offset_ms = SystemTime::now()
129                .duration_since(UNIX_EPOCH)
130                .map(|d| (request.timestamp as i64) - (d.as_millis() as i64))
131                .unwrap_or(0);
132
133            self.deck_files.0 = request.state.decks.0.content.file_path.clone();
134            self.deck_files.1 = request.state.decks.1.content.file_path.clone();
135            self.deck_files.2 = request.state.decks.2.content.file_path.clone();
136            self.deck_files.3 = request.state.decks.3.content.file_path.clone();
137            self.on_update_deck_files().await;
138
139            self.send_message(ServerMessage::Connect {
140                time_offset_ms,
141                initial_state: Box::new(request.state),
142            })
143                .await;
144
145            let mut messages = self
146                .queue
147                .drain(..)
148                .map(ServerMessage::Update)
149                .collect::<Vec<_>>();
150            self.send_messages(messages.drain(..)).await;
151
152            self.is_initialized = true;
153        }
154
155        self.session_id.to_owned()
156    }
157
158    async fn handle_update(
159        &mut self,
160        session_id: String,
161        update: StateUpdate,
162    ) -> impl warp::Reply + use < > {
163        if session_id == self.session_id {
164            match &update {
165                StateUpdate::DeckContent(ID::A, content) => {
166                    self.deck_files.0 = content.file_path.clone()
167                }
168                StateUpdate::DeckContent(ID::B, content) => {
169                    self.deck_files.1 = content.file_path.clone()
170                }
171                StateUpdate::DeckContent(ID::C, content) => {
172                    self.deck_files.2 = content.file_path.clone()
173                }
174                StateUpdate::DeckContent(ID::D, content) => {
175                    self.deck_files.3 = content.file_path.clone()
176                }
177                _ => {}
178            }
179            self.on_update_deck_files().await;
180
181            if self.is_initialized {
182                self.send_message(ServerMessage::Update(update)).await;
183            } else {
184                self.queue.push(update);
185
186                if self.queue.len() > MAX_QUEUE_LENGTH {
187                    self.reconnect(self.debug_logging);
188                }
189            }
190        }
191
192        self.session_id.to_owned()
193    }
194
195    async fn handle_cover(&mut self, path: String, data: Bytes) -> StatusCode {
196        if data.is_empty() {
197            return StatusCode::BAD_REQUEST;
198        }
199
200        if !self.get_required_images().contains(&path) {
201            return StatusCode::OK;
202        }
203
204        println!("cover received for \"{}\"", path);
205
206        self.pending_images.retain(|i| i != &path);
207        if !self.loaded_images.contains(&path) {
208            self.loaded_images.push(path.clone());
209        }
210
211        self.send_message(ServerMessage::CoverImage { path, data })
212            .await;
213        StatusCode::ACCEPTED
214    }
215
216    async fn handle_socket_connect(&mut self, mut tx: UnboundedSender<warp::ws::Message>) -> usize {
217        while self.cover_sockets.contains_key(&self.cover_socket_id) {
218            self.cover_socket_id += 1;
219        }
220
221        for img in &self.pending_images {
222            _ = tx.send(warp::ws::Message::text(img)).await;
223        }
224
225        self.cover_sockets.insert(self.cover_socket_id, tx);
226        self.cover_socket_id
227    }
228
229    fn handle_socket_disconnect(&mut self, id: usize) {
230        self.cover_sockets.remove(&id);
231    }
232
233    async fn handle_log(&mut self, msg: String) -> impl warp::Reply + use < > {
234        self.send_message(ServerMessage::Log(msg)).await;
235        StatusCode::CREATED
236    }
237}
238
239impl TraktorServer {
240    pub fn routes(
241        state: Arc<Mutex<Self>>,
242    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone + use < > {
243        Self::is_started(state.clone())
244            .and(
245                Self::route_connect(state.clone())
246                    .or(Self::route_init(state.clone()))
247                    .or(Self::route_update(state.clone()))
248                    .or(Self::route_log(state.clone()))
249                    .or(Self::route_cover(state.clone())),
250            )
251            .map(|_, reply| reply)
252    }
253
254    fn with_state(
255        state: Arc<Mutex<Self>>,
256    ) -> impl Filter<Extract=(Arc<Mutex<Self>>,), Error=Infallible> + Clone {
257        warp::any().map(move || state.clone())
258    }
259
260    fn is_started(
261        state: Arc<Mutex<Self>>,
262    ) -> impl Filter<Extract=((),), Error=warp::Rejection> + Clone {
263        warp::any()
264            .and(Self::with_state(state))
265            .and_then(async |state: Arc<Mutex<Self>>| {
266                let state = state.lock().await;
267
268                if state.session_id.is_empty() {
269                    Err(warp::reject::not_found())
270                } else {
271                    Ok(())
272                }
273            })
274    }
275
276    fn json_body<T: DeserializeOwned + Send>()
277        -> impl Filter<Extract=(T,), Error=warp::Rejection> + Clone {
278        warp::body::json()
279    }
280
281    fn route_connect(
282        state: Arc<Mutex<Self>>,
283    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
284        warp::path!("connect")
285            .and(Self::with_state(state))
286            .then(async |state: Arc<Mutex<Self>>| state.lock().await.handle_connect().await)
287    }
288
289    fn route_init(
290        state: Arc<Mutex<Self>>,
291    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
292        warp::path!("init")
293            .and(warp::post())
294            .and(Self::with_state(state))
295            .and(Self::json_body())
296            .then(
297                async |state: Arc<Mutex<Self>>, request: InitializeRequest| {
298                    state.lock().await.handle_init(request).await
299                },
300            )
301    }
302
303    fn route_update(
304        state: Arc<Mutex<Self>>,
305    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
306        warp::path("update")
307            .and(warp::post())
308            .and(Self::with_state(state))
309            .and(Self::route_update_sub_routes())
310            .then(async |state: Arc<Mutex<Self>>, (session_id, update)| {
311                state.lock().await.handle_update(session_id, update).await
312            })
313    }
314
315    fn route_update_sub_routes()
316        -> impl Filter<Extract=((String, StateUpdate),), Error=warp::Rejection> + Clone {
317        warp::path!("mixer")
318            .and(Self::json_body())
319            .then(async move |req: UpdateRequest<_>| {
320                (req.session_id, StateUpdate::Mixer(req.state))
321            })
322            .or(warp::path!("channel0").and(Self::json_body()).then(
323                async move |req: UpdateRequest<_>| {
324                    (req.session_id, StateUpdate::Channel(ID::A, req.state))
325                },
326            ))
327            .unify()
328            .or(warp::path!("channel1").and(Self::json_body()).then(
329                async move |req: UpdateRequest<_>| {
330                    (req.session_id, StateUpdate::Channel(ID::B, req.state))
331                },
332            ))
333            .unify()
334            .or(warp::path!("channel2").and(Self::json_body()).then(
335                async move |req: UpdateRequest<_>| {
336                    (req.session_id, StateUpdate::Channel(ID::C, req.state))
337                },
338            ))
339            .unify()
340            .or(warp::path!("channel3").and(Self::json_body()).then(
341                async move |req: UpdateRequest<_>| {
342                    (req.session_id, StateUpdate::Channel(ID::D, req.state))
343                },
344            ))
345            .unify()
346            .or(warp::path!("deck0content").and(Self::json_body()).then(
347                async move |req: UpdateRequest<_>| {
348                    (req.session_id, StateUpdate::DeckContent(ID::A, req.state))
349                },
350            ))
351            .unify()
352            .or(warp::path!("deck1content").and(Self::json_body()).then(
353                async move |req: UpdateRequest<_>| {
354                    (req.session_id, StateUpdate::DeckContent(ID::B, req.state))
355                },
356            ))
357            .unify()
358            .or(warp::path!("deck2content").and(Self::json_body()).then(
359                async move |req: UpdateRequest<_>| {
360                    (req.session_id, StateUpdate::DeckContent(ID::C, req.state))
361                },
362            ))
363            .unify()
364            .or(warp::path!("deck3content").and(Self::json_body()).then(
365                async move |req: UpdateRequest<_>| {
366                    (req.session_id, StateUpdate::DeckContent(ID::D, req.state))
367                },
368            ))
369            .unify()
370            .or(warp::path!("deck0playstate").and(Self::json_body()).then(
371                async move |req: UpdateRequest<_>| {
372                    (req.session_id, StateUpdate::DeckPlayState(ID::A, req.state))
373                },
374            ))
375            .unify()
376            .or(warp::path!("deck1playstate").and(Self::json_body()).then(
377                async move |req: UpdateRequest<_>| {
378                    (req.session_id, StateUpdate::DeckPlayState(ID::B, req.state))
379                },
380            ))
381            .unify()
382            .or(warp::path!("deck2playstate").and(Self::json_body()).then(
383                async move |req: UpdateRequest<_>| {
384                    (req.session_id, StateUpdate::DeckPlayState(ID::C, req.state))
385                },
386            ))
387            .unify()
388            .or(warp::path!("deck3playstate").and(Self::json_body()).then(
389                async move |req: UpdateRequest<_>| {
390                    (req.session_id, StateUpdate::DeckPlayState(ID::D, req.state))
391                },
392            ))
393            .unify()
394    }
395
396    fn route_cover(
397        state: Arc<Mutex<Self>>,
398    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
399        warp::path!("cover").and(
400            Self::route_cover_upload(state.clone()).or(Self::route_cover_socket(state.clone())),
401        )
402    }
403
404    fn route_cover_upload(
405        state: Arc<Mutex<Self>>,
406    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
407        warp::post()
408            .and(Self::with_state(state))
409            .and(warp::body::bytes())
410            .and(warp::query::<HashMap<String, String>>())
411            .then(
412                async |state: Arc<Mutex<Self>>, body: Bytes, query: HashMap<String, String>| {
413                    match query.get("path") {
414                        Some(path) => state.lock().await.handle_cover(path.to_owned(), body).await,
415                        None => StatusCode::BAD_REQUEST,
416                    }
417                },
418            )
419    }
420
421    fn route_cover_socket(
422        state: Arc<Mutex<Self>>,
423    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
424        warp::ws()
425            .and(Self::with_state(state))
426            .map(|ws: warp::ws::Ws, state: Arc<Mutex<Self>>| {
427                ws.on_upgrade(move |socket| async move {
428                    let (mut ws_tx, mut ws_rx) = socket.split();
429                    let (tx, mut rx) = iced::futures::channel::mpsc::unbounded();
430
431                    tokio::task::spawn(async move {
432                        while let Some(message) = rx.next().await {
433                            ws_tx
434                                .send(message)
435                                .unwrap_or_else(|e| {
436                                    println!("websocket send error: {}", e);
437                                })
438                                .await;
439                        }
440                    });
441
442                    let socket_id = state.lock().await.handle_socket_connect(tx).await;
443                    println!("websocket connected");
444
445                    while let Some(result) = ws_rx.next().await {
446                        match result {
447                            Ok(_) => {}
448                            Err(e) => {
449                                println!("websocket error: {}", e);
450                                break;
451                            }
452                        };
453                    }
454
455                    state.lock().await.handle_socket_disconnect(socket_id);
456                    println!("websocket disconnected");
457                })
458            })
459    }
460
461    fn route_log(
462        state: Arc<Mutex<Self>>,
463    ) -> impl Filter<Extract=(impl warp::Reply,), Error=warp::Rejection> + Clone {
464        warp::path!("log")
465            .and(warp::post())
466            .and(Self::with_state(state))
467            .and(warp::body::bytes())
468            .then(async |state: Arc<Mutex<Self>>, body: Bytes| {
469                state
470                    .lock()
471                    .await
472                    .handle_log(String::from_utf8_lossy(&body).into_owned())
473                    .await
474            })
475    }
476}
477
478async fn server_main(
479    addr: SocketAddr,
480    output: UnboundedSender<ServerMessage>,
481    mut input: UnboundedReceiver<AppMessage>,
482    input_send: UnboundedSender<AppMessage>,
483    cancelled: oneshot::Receiver<()>,
484) {
485    let state = Arc::new(Mutex::new(TraktorServer::new(output)));
486    let routes = TraktorServer::routes(state.clone());
487
488    println!("starting traktor server on {}", addr);
489    let service = advertise_server(addr);
490
491    let Ok(listener) = tokio::net::TcpListener::bind(addr).await else {
492        println!("could not start traktor server on {}", addr);
493
494        drop(service);
495        return;
496    };
497    let server = warp::serve(routes).incoming(listener).graceful(async {
498        cancelled.await.ok();
499    });
500
501    tokio::task::spawn(server.run());
502
503    state.lock().await.send_ready(input_send).await;
504    loop {
505        match input.select_next_some().await {
506            AppMessage::Reconnect { debug_logging } => state.lock().await.reconnect(debug_logging),
507        }
508    }
509}
510
511fn advertise_server(addr: SocketAddr) -> Service {
512    let addr_vec = if !addr.ip().is_unspecified() {
513        [addr.ip()].to_vec()
514    } else {
515        Vec::new()
516    };
517    let responder = Responder::new_with_ip_list(addr_vec).expect("could not create responder");
518    let svc = responder.register(
519        "_http._tcp",
520        "traktor-di-webserver",
521        addr.port(),
522        &["path=/"],
523    );
524    println!("advertising traktor server on {}", addr);
525    svc
526}
527
528pub fn run_server(addr: SocketAddr) -> impl Stream<Item=ServerMessage> {
529    let (output, output_receive) = iced::futures::channel::mpsc::unbounded();
530    let (input_send, input) = iced::futures::channel::mpsc::unbounded();
531    let (cancel, cancelled) = oneshot::channel();
532
533    let runner = DroppingOnce::new(
534        server_main(addr, output, input, input_send, cancelled),
535        move || {
536            println!("stopping traktor server on {}", addr);
537            let _ = cancel.send(());
538        },
539    )
540        .filter_map(|_| async { None });
541
542    stream::select(output_receive, runner)
543}