warp/filters/
body.rs

1//! Body filters
2//!
3//! Filters that extract a body for a route.
4
5use std::error::Error as StdError;
6use std::fmt;
7
8use crate::bodyt::Body;
9use bytes::{Buf, Bytes};
10use futures_util::future;
11use futures_util::Stream;
12use headers::ContentLength;
13use http::header::CONTENT_TYPE;
14use http_body_util::BodyDataStream;
15use http_body_util::BodyExt;
16use mime;
17use serde::de::DeserializeOwned;
18
19use crate::filter::{filter_fn, filter_fn_one, Filter, FilterBase};
20use crate::reject::{self, Rejection};
21
22type BoxError = Box<dyn StdError + Send + Sync>;
23
24// Extracts the `Body` Stream from the route.
25//
26// Does not consume any of it.
27pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy {
28    filter_fn_one(|route| {
29        future::ready(route.take_body().ok_or_else(|| {
30            tracing::error!("request body already taken in previous filter");
31            reject::known(BodyConsumedMultipleTimes { _p: () })
32        }))
33    })
34}
35
36/// Require a `content-length` header to have a value no greater than some limit.
37///
38/// Rejects if `content-length` header is missing, is invalid, or has a number
39/// larger than the limit provided.
40///
41/// # Example
42///
43/// ```
44/// use warp::Filter;
45///
46/// // Limit the upload to 4kb...
47/// let upload = warp::body::content_length_limit(4096)
48///     .and(warp::body::aggregate());
49/// ```
50pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rejection> + Copy {
51    crate::filters::header::header2()
52        .map_err(crate::filter::Internal, |_| {
53            tracing::debug!("content-length missing");
54            reject::length_required()
55        })
56        .and_then(move |ContentLength(length)| {
57            if length <= limit {
58                future::ok(())
59            } else {
60                tracing::debug!("content-length: {} is over limit {}", length, limit);
61                future::err(reject::payload_too_large())
62            }
63        })
64        .untuple_one()
65}
66
67/// Create a `Filter` that extracts the request body as a `futures::Stream`.
68///
69/// If other filters have already extracted the body, this filter will reject
70/// with a `500 Internal Server Error`.
71///
72/// For example usage, please take a look at [examples/stream.rs](https://github.com/seanmonstar/warp/blob/master/examples/stream.rs).
73///
74/// # Warning
75///
76/// This does not have a default size limit, it would be wise to use one to
77/// prevent a overly large request from using too much memory.
78pub fn stream(
79) -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy
80{
81    body().map(|body| BodyDataStream::new(body))
82}
83
84/// Returns a `Filter` that matches any request and extracts a `Future` of a
85/// concatenated body.
86///
87/// The contents of the body will be flattened into a single contiguous
88/// `Bytes`, which may require memory copies. If you don't require a
89/// contiguous buffer, using `aggregate` can be give better performance.
90///
91/// # Warning
92///
93/// This does not have a default size limit, it would be wise to use one to
94/// prevent a overly large request from using too much memory.
95///
96/// # Example
97///
98/// ```
99/// use warp::{Buf, Filter};
100///
101/// let route = warp::body::content_length_limit(1024 * 32)
102///     .and(warp::body::bytes())
103///     .map(|bytes: bytes::Bytes| {
104///         println!("bytes = {:?}", bytes);
105///     });
106/// ```
107pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
108    body().and_then(|mut body| async move {
109        BodyExt::collect(&mut body)
110            .await
111            .map(|b| b.to_bytes())
112            .map_err(|err| {
113                tracing::debug!("to_bytes error: {}", err);
114                reject::known(BodyReadError(err))
115            })
116    })
117}
118
119/// Returns a `Filter` that matches any request and extracts a `Future` of an
120/// aggregated body.
121///
122/// The `Buf` may contain multiple, non-contiguous buffers. This can be more
123/// performant (by reducing copies) when receiving large bodies.
124///
125/// # Warning
126///
127/// This does not have a default size limit, it would be wise to use one to
128/// prevent a overly large request from using too much memory.
129///
130/// # Example
131///
132/// ```
133/// use warp::{Buf, Filter};
134///
135/// fn full_body(mut body: impl Buf) {
136///     // It could have several non-contiguous slices of memory...
137///     while body.has_remaining() {
138///         println!("slice = {:?}", body.chunk());
139///         let cnt = body.chunk().len();
140///         body.advance(cnt);
141///     }
142/// }
143///
144/// let route = warp::body::content_length_limit(1024 * 32)
145///     .and(warp::body::aggregate())
146///     .map(full_body);
147/// ```
148pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
149    body().and_then(|mut body: crate::bodyt::Body| async move {
150        http_body_util::BodyExt::collect(&mut body)
151            .await
152            .map(|collected| collected.aggregate())
153            .map_err(|err| {
154                tracing::debug!("aggregate error: {}", err);
155                reject::known(BodyReadError(err))
156            })
157    })
158}
159
160/// Returns a `Filter` that matches any request and extracts a `Future` of a
161/// JSON-decoded body.
162///
163/// # Warning
164///
165/// This does not have a default size limit, it would be wise to use one to
166/// prevent a overly large request from using too much memory.
167///
168/// # Example
169///
170/// ```
171/// use std::collections::HashMap;
172/// use warp::Filter;
173///
174/// let route = warp::body::content_length_limit(1024 * 32)
175///     .and(warp::body::json())
176///     .map(|simple_map: HashMap<String, String>| {
177///         "Got a JSON body!"
178///     });
179/// ```
180pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
181    is_content_type::<Json>()
182        .and(bytes())
183        .and_then(|buf| async move {
184            Json::decode(buf).map_err(|err| {
185                tracing::debug!("request json body error: {}", err);
186                reject::known(BodyDeserializeError { cause: err })
187            })
188        })
189}
190
191/// Returns a `Filter` that matches any request and extracts a
192/// `Future` of a form encoded body.
193///
194/// # Note
195///
196/// This filter is for the simpler `application/x-www-form-urlencoded` format,
197/// not `multipart/form-data`.
198///
199/// # Warning
200///
201/// This does not have a default size limit, it would be wise to use one to
202/// prevent a overly large request from using too much memory.
203///
204///
205/// ```
206/// use std::collections::HashMap;
207/// use warp::Filter;
208///
209/// let route = warp::body::content_length_limit(1024 * 32)
210///     .and(warp::body::form())
211///     .map(|simple_map: HashMap<String, String>| {
212///         "Got a urlencoded body!"
213///     });
214/// ```
215pub fn form<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
216    is_content_type::<Form>()
217        .and(aggregate())
218        .and_then(|buf| async move {
219            Form::decode(buf).map_err(|err| {
220                tracing::debug!("request form body error: {}", err);
221                reject::known(BodyDeserializeError { cause: err })
222            })
223        })
224}
225
226// ===== Decoders =====
227
228trait Decode {
229    const MIME: (mime::Name<'static>, mime::Name<'static>);
230    const WITH_NO_CONTENT_TYPE: bool;
231
232    fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError>;
233}
234
235struct Json;
236
237impl Decode for Json {
238    const MIME: (mime::Name<'static>, mime::Name<'static>) = (mime::APPLICATION, mime::JSON);
239    const WITH_NO_CONTENT_TYPE: bool = true;
240
241    fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
242        serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
243    }
244}
245
246struct Form;
247
248impl Decode for Form {
249    const MIME: (mime::Name<'static>, mime::Name<'static>) =
250        (mime::APPLICATION, mime::WWW_FORM_URLENCODED);
251    const WITH_NO_CONTENT_TYPE: bool = true;
252
253    fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError> {
254        serde_urlencoded::from_reader(buf.reader()).map_err(Into::into)
255    }
256}
257
258// Require the `content-type` header to be this type (or, if there's no `content-type`
259// header at all, optimistically hope it's the right type).
260fn is_content_type<D: Decode>() -> impl Filter<Extract = (), Error = Rejection> + Copy {
261    filter_fn(move |route| {
262        let (type_, subtype) = D::MIME;
263        if let Some(value) = route.headers().get(CONTENT_TYPE) {
264            tracing::trace!("is_content_type {}/{}? {:?}", type_, subtype, value);
265            let ct = value
266                .to_str()
267                .ok()
268                .and_then(|s| s.parse::<mime::Mime>().ok());
269            if let Some(ct) = ct {
270                if ct.type_() == type_ && ct.subtype() == subtype {
271                    future::ok(())
272                } else {
273                    tracing::debug!(
274                        "content-type {:?} doesn't match {}/{}",
275                        value,
276                        type_,
277                        subtype
278                    );
279                    future::err(reject::unsupported_media_type())
280                }
281            } else {
282                tracing::debug!("content-type {:?} couldn't be parsed", value);
283                future::err(reject::unsupported_media_type())
284            }
285        } else if D::WITH_NO_CONTENT_TYPE {
286            // Optimistically assume its correct!
287            tracing::trace!("no content-type header, assuming {}/{}", type_, subtype);
288            future::ok(())
289        } else {
290            tracing::debug!("no content-type found");
291            future::err(reject::unsupported_media_type())
292        }
293    })
294}
295
296// ===== Rejections =====
297
298/// An error used in rejections when deserializing a request body fails.
299#[derive(Debug)]
300pub struct BodyDeserializeError {
301    cause: BoxError,
302}
303
304impl fmt::Display for BodyDeserializeError {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        write!(f, "Request body deserialize error: {}", self.cause)
307    }
308}
309
310impl StdError for BodyDeserializeError {
311    fn source(&self) -> Option<&(dyn StdError + 'static)> {
312        Some(self.cause.as_ref())
313    }
314}
315
316#[derive(Debug)]
317pub(crate) struct BodyReadError(crate::Error);
318
319impl fmt::Display for BodyReadError {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        write!(f, "Request body read error: {}", self.0)
322    }
323}
324
325impl StdError for BodyReadError {}
326
327unit_error! {
328    pub(crate) BodyConsumedMultipleTimes: "Request body consumed multiple times"
329}