1use std::error::Error as StdError;
6use std::fmt;
7
8use crate::bodyt::Body;
9use bytes::{Buf, Bytes};
10use futures_util::future;
11use futures_util::Stream;
12use headers::ContentLength;
13use http::header::CONTENT_TYPE;
14use http_body_util::BodyDataStream;
15use http_body_util::BodyExt;
16use mime;
17use serde::de::DeserializeOwned;
18
19use crate::filter::{filter_fn, filter_fn_one, Filter, FilterBase};
20use crate::reject::{self, Rejection};
21
22type BoxError = Box<dyn StdError + Send + Sync>;
23
24pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy {
28 filter_fn_one(|route| {
29 future::ready(route.take_body().ok_or_else(|| {
30 tracing::error!("request body already taken in previous filter");
31 reject::known(BodyConsumedMultipleTimes { _p: () })
32 }))
33 })
34}
35
36pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rejection> + Copy {
51 crate::filters::header::header2()
52 .map_err(crate::filter::Internal, |_| {
53 tracing::debug!("content-length missing");
54 reject::length_required()
55 })
56 .and_then(move |ContentLength(length)| {
57 if length <= limit {
58 future::ok(())
59 } else {
60 tracing::debug!("content-length: {} is over limit {}", length, limit);
61 future::err(reject::payload_too_large())
62 }
63 })
64 .untuple_one()
65}
66
67pub fn stream(
79) -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy
80{
81 body().map(|body| BodyDataStream::new(body))
82}
83
84pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
108 body().and_then(|mut body| async move {
109 BodyExt::collect(&mut body)
110 .await
111 .map(|b| b.to_bytes())
112 .map_err(|err| {
113 tracing::debug!("to_bytes error: {}", err);
114 reject::known(BodyReadError(err))
115 })
116 })
117}
118
119pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
149 body().and_then(|mut body: crate::bodyt::Body| async move {
150 http_body_util::BodyExt::collect(&mut body)
151 .await
152 .map(|collected| collected.aggregate())
153 .map_err(|err| {
154 tracing::debug!("aggregate error: {}", err);
155 reject::known(BodyReadError(err))
156 })
157 })
158}
159
160pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
181 is_content_type::<Json>()
182 .and(bytes())
183 .and_then(|buf| async move {
184 Json::decode(buf).map_err(|err| {
185 tracing::debug!("request json body error: {}", err);
186 reject::known(BodyDeserializeError { cause: err })
187 })
188 })
189}
190
191pub fn form<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
216 is_content_type::<Form>()
217 .and(aggregate())
218 .and_then(|buf| async move {
219 Form::decode(buf).map_err(|err| {
220 tracing::debug!("request form body error: {}", err);
221 reject::known(BodyDeserializeError { cause: err })
222 })
223 })
224}
225
226trait Decode {
229 const MIME: (mime::Name<'static>, mime::Name<'static>);
230 const WITH_NO_CONTENT_TYPE: bool;
231
232 fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError>;
233}
234
235struct Json;
236
237impl Decode for Json {
238 const MIME: (mime::Name<'static>, mime::Name<'static>) = (mime::APPLICATION, mime::JSON);
239 const WITH_NO_CONTENT_TYPE: bool = true;
240
241 fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
242 serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
243 }
244}
245
246struct Form;
247
248impl Decode for Form {
249 const MIME: (mime::Name<'static>, mime::Name<'static>) =
250 (mime::APPLICATION, mime::WWW_FORM_URLENCODED);
251 const WITH_NO_CONTENT_TYPE: bool = true;
252
253 fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError> {
254 serde_urlencoded::from_reader(buf.reader()).map_err(Into::into)
255 }
256}
257
258fn is_content_type<D: Decode>() -> impl Filter<Extract = (), Error = Rejection> + Copy {
261 filter_fn(move |route| {
262 let (type_, subtype) = D::MIME;
263 if let Some(value) = route.headers().get(CONTENT_TYPE) {
264 tracing::trace!("is_content_type {}/{}? {:?}", type_, subtype, value);
265 let ct = value
266 .to_str()
267 .ok()
268 .and_then(|s| s.parse::<mime::Mime>().ok());
269 if let Some(ct) = ct {
270 if ct.type_() == type_ && ct.subtype() == subtype {
271 future::ok(())
272 } else {
273 tracing::debug!(
274 "content-type {:?} doesn't match {}/{}",
275 value,
276 type_,
277 subtype
278 );
279 future::err(reject::unsupported_media_type())
280 }
281 } else {
282 tracing::debug!("content-type {:?} couldn't be parsed", value);
283 future::err(reject::unsupported_media_type())
284 }
285 } else if D::WITH_NO_CONTENT_TYPE {
286 tracing::trace!("no content-type header, assuming {}/{}", type_, subtype);
288 future::ok(())
289 } else {
290 tracing::debug!("no content-type found");
291 future::err(reject::unsupported_media_type())
292 }
293 })
294}
295
296#[derive(Debug)]
300pub struct BodyDeserializeError {
301 cause: BoxError,
302}
303
304impl fmt::Display for BodyDeserializeError {
305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306 write!(f, "Request body deserialize error: {}", self.cause)
307 }
308}
309
310impl StdError for BodyDeserializeError {
311 fn source(&self) -> Option<&(dyn StdError + 'static)> {
312 Some(self.cause.as_ref())
313 }
314}
315
316#[derive(Debug)]
317pub(crate) struct BodyReadError(crate::Error);
318
319impl fmt::Display for BodyReadError {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 write!(f, "Request body read error: {}", self.0)
322 }
323}
324
325impl StdError for BodyReadError {}
326
327unit_error! {
328 pub(crate) BodyConsumedMultipleTimes: "Request body consumed multiple times"
329}