tokio/io/
async_write.rs

1use std::io::{self, IoSlice};
2use std::ops::DerefMut;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6/// Writes bytes asynchronously.
7///
8/// This trait is analogous to the [`std::io::Write`] trait, but integrates with
9/// the asynchronous task system. In particular, the [`poll_write`] method,
10/// unlike [`Write::write`], will automatically queue the current task for wakeup
11/// and return if data is not yet available, rather than blocking the calling
12/// thread.
13///
14/// Specifically, this means that the [`poll_write`] function will return one of
15/// the following:
16///
17/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately
18///   written.
19///
20/// * `Poll::Pending` means that no data was written from the buffer
21///   provided. The I/O object is not currently writable but may become writable
22///   in the future. Most importantly, **the current future's task is scheduled
23///   to get unparked when the object is writable**. This means that like
24///   `Future::poll` you'll receive a notification when the I/O object is
25///   writable again.
26///
27/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
28///   underlying object.
29///
30/// This trait importantly means that the `write` method only works in the
31/// context of a future's task. The object may panic if used outside of a task.
32///
33/// Utilities for working with `AsyncWrite` values are provided by
34/// [`AsyncWriteExt`].
35///
36/// [`std::io::Write`]: std::io::Write
37/// [`Write::write`]: std::io::Write::write()
38/// [`poll_write`]: AsyncWrite::poll_write()
39/// [`AsyncWriteExt`]: crate::io::AsyncWriteExt
40pub trait AsyncWrite {
41    /// Attempt to write bytes from `buf` into the object.
42    ///
43    /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. If successful,
44    /// then it must be guaranteed that `n <= buf.len()`. A return value of `0`
45    /// typically means that the underlying object is no longer able to accept
46    /// bytes and will likely not be able to in the future as well, or that the
47    /// buffer provided is empty.
48    ///
49    /// If the object is not ready for writing, the method returns
50    /// `Poll::Pending` and arranges for the current task (via
51    /// `cx.waker()`) to receive a notification when the object becomes
52    /// writable or is closed.
53    fn poll_write(
54        self: Pin<&mut Self>,
55        cx: &mut Context<'_>,
56        buf: &[u8],
57    ) -> Poll<io::Result<usize>>;
58
59    /// Attempts to flush the object, ensuring that any buffered data reach
60    /// their destination.
61    ///
62    /// On success, returns `Poll::Ready(Ok(()))`.
63    ///
64    /// If flushing cannot immediately complete, this method returns
65    /// `Poll::Pending` and arranges for the current task (via
66    /// `cx.waker()`) to receive a notification when the object can make
67    /// progress towards flushing.
68    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
69
70    /// Initiates or attempts to shut down this writer, returning success when
71    /// the I/O connection has completely shut down.
72    ///
73    /// This method is intended to be used for asynchronous shutdown of I/O
74    /// connections. For example this is suitable for implementing shutdown of a
75    /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
76    /// Protocols sometimes need to flush out final pieces of data or otherwise
77    /// perform a graceful shutdown handshake, reading/writing more data as
78    /// appropriate. This method is the hook for such protocols to implement the
79    /// graceful shutdown logic.
80    ///
81    /// This `shutdown` method is required by implementers of the
82    /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
83    /// through to the wrapped type, and base types will typically implement
84    /// shutdown logic here or just return `Ok(().into())`. Note that if you're
85    /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
86    /// transitively the entire stream has been shut down. After your wrapper's
87    /// shutdown logic has been executed you should shut down the underlying
88    /// stream.
89    ///
90    /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
91    /// method returns `Ready` it implies that a flush successfully happened
92    /// before the shutdown happened. That is, callers don't need to call
93    /// `flush` before calling `shutdown`. They can rely that by calling
94    /// `shutdown` any pending buffered data will be written out.
95    ///
96    /// # Return value
97    ///
98    /// This function returns a `Poll<io::Result<()>>` classified as such:
99    ///
100    /// * `Poll::Ready(Ok(()))` - indicates that the connection was
101    ///   successfully shut down and is now safe to deallocate/drop/close
102    ///   resources associated with it. This method means that the current task
103    ///   will no longer receive any notifications due to this method and the
104    ///   I/O object itself is likely no longer usable.
105    ///
106    /// * `Poll::Pending` - indicates that shutdown is initiated but could
107    ///   not complete just yet. This may mean that more I/O needs to happen to
108    ///   continue this shutdown operation. The current task is scheduled to
109    ///   receive a notification when it's otherwise ready to continue the
110    ///   shutdown operation. When woken up this method should be called again.
111    ///
112    /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown,
113    ///   indicating that the shutdown operation did not complete successfully.
114    ///   This typically means that the I/O object is no longer usable.
115    ///
116    /// # Errors
117    ///
118    /// This function can return normal I/O errors through `Err`, described
119    /// above. Additionally this method may also render the underlying
120    /// `Write::write` method no longer usable (e.g. will return errors in the
121    /// future). It's recommended that once `shutdown` is called the
122    /// `write` method is no longer called.
123    ///
124    /// # Panics
125    ///
126    /// This function will panic if not called within the context of a future's
127    /// task.
128    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
129
130    /// Like [`poll_write`], except that it writes from a slice of buffers.
131    ///
132    /// Data is copied from each buffer in order, with the final buffer
133    /// read from possibly being only partially consumed. This method must
134    /// behave as a call to [`write`] with the buffers concatenated would.
135    ///
136    /// The default implementation calls [`poll_write`] with either the first nonempty
137    /// buffer provided, or an empty one if none exists.
138    ///
139    /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
140    ///
141    /// If the object is not ready for writing, the method returns
142    /// `Poll::Pending` and arranges for the current task (via
143    /// `cx.waker()`) to receive a notification when the object becomes
144    /// writable or is closed.
145    ///
146    /// # Note
147    ///
148    /// This should be implemented as a single "atomic" write action. If any
149    /// data has been partially written, it is wrong to return an error or
150    /// pending.
151    ///
152    /// [`poll_write`]: AsyncWrite::poll_write
153    fn poll_write_vectored(
154        self: Pin<&mut Self>,
155        cx: &mut Context<'_>,
156        bufs: &[IoSlice<'_>],
157    ) -> Poll<io::Result<usize>> {
158        let buf = bufs
159            .iter()
160            .find(|b| !b.is_empty())
161            .map_or(&[][..], |b| &**b);
162        self.poll_write(cx, buf)
163    }
164
165    /// Determines if this writer has an efficient [`poll_write_vectored`]
166    /// implementation.
167    ///
168    /// If a writer does not override the default [`poll_write_vectored`]
169    /// implementation, code using it may want to avoid the method all together
170    /// and coalesce writes into a single buffer for higher performance.
171    ///
172    /// The default implementation returns `false`.
173    ///
174    /// [`poll_write_vectored`]: AsyncWrite::poll_write_vectored
175    fn is_write_vectored(&self) -> bool {
176        false
177    }
178}
179
180macro_rules! deref_async_write {
181    () => {
182        fn poll_write(
183            mut self: Pin<&mut Self>,
184            cx: &mut Context<'_>,
185            buf: &[u8],
186        ) -> Poll<io::Result<usize>> {
187            Pin::new(&mut **self).poll_write(cx, buf)
188        }
189
190        fn poll_write_vectored(
191            mut self: Pin<&mut Self>,
192            cx: &mut Context<'_>,
193            bufs: &[IoSlice<'_>],
194        ) -> Poll<io::Result<usize>> {
195            Pin::new(&mut **self).poll_write_vectored(cx, bufs)
196        }
197
198        fn is_write_vectored(&self) -> bool {
199            (**self).is_write_vectored()
200        }
201
202        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
203            Pin::new(&mut **self).poll_flush(cx)
204        }
205
206        fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
207            Pin::new(&mut **self).poll_shutdown(cx)
208        }
209    };
210}
211
212impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
213    deref_async_write!();
214}
215
216impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
217    deref_async_write!();
218}
219
220impl<P> AsyncWrite for Pin<P>
221where
222    P: DerefMut,
223    P::Target: AsyncWrite,
224{
225    fn poll_write(
226        self: Pin<&mut Self>,
227        cx: &mut Context<'_>,
228        buf: &[u8],
229    ) -> Poll<io::Result<usize>> {
230        crate::util::pin_as_deref_mut(self).poll_write(cx, buf)
231    }
232
233    fn poll_write_vectored(
234        self: Pin<&mut Self>,
235        cx: &mut Context<'_>,
236        bufs: &[IoSlice<'_>],
237    ) -> Poll<io::Result<usize>> {
238        crate::util::pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
239    }
240
241    fn is_write_vectored(&self) -> bool {
242        (**self).is_write_vectored()
243    }
244
245    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
246        crate::util::pin_as_deref_mut(self).poll_flush(cx)
247    }
248
249    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
250        crate::util::pin_as_deref_mut(self).poll_shutdown(cx)
251    }
252}
253
254impl AsyncWrite for Vec<u8> {
255    fn poll_write(
256        self: Pin<&mut Self>,
257        _cx: &mut Context<'_>,
258        buf: &[u8],
259    ) -> Poll<io::Result<usize>> {
260        self.get_mut().extend_from_slice(buf);
261        Poll::Ready(Ok(buf.len()))
262    }
263
264    fn poll_write_vectored(
265        mut self: Pin<&mut Self>,
266        _: &mut Context<'_>,
267        bufs: &[IoSlice<'_>],
268    ) -> Poll<io::Result<usize>> {
269        Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
270    }
271
272    fn is_write_vectored(&self) -> bool {
273        true
274    }
275
276    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
277        Poll::Ready(Ok(()))
278    }
279
280    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
281        Poll::Ready(Ok(()))
282    }
283}
284
285impl AsyncWrite for io::Cursor<&mut [u8]> {
286    fn poll_write(
287        mut self: Pin<&mut Self>,
288        _: &mut Context<'_>,
289        buf: &[u8],
290    ) -> Poll<io::Result<usize>> {
291        Poll::Ready(io::Write::write(&mut *self, buf))
292    }
293
294    fn poll_write_vectored(
295        mut self: Pin<&mut Self>,
296        _: &mut Context<'_>,
297        bufs: &[IoSlice<'_>],
298    ) -> Poll<io::Result<usize>> {
299        Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
300    }
301
302    fn is_write_vectored(&self) -> bool {
303        true
304    }
305
306    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
307        Poll::Ready(io::Write::flush(&mut *self))
308    }
309
310    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
311        self.poll_flush(cx)
312    }
313}
314
315impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
316    fn poll_write(
317        mut self: Pin<&mut Self>,
318        _: &mut Context<'_>,
319        buf: &[u8],
320    ) -> Poll<io::Result<usize>> {
321        Poll::Ready(io::Write::write(&mut *self, buf))
322    }
323
324    fn poll_write_vectored(
325        mut self: Pin<&mut Self>,
326        _: &mut Context<'_>,
327        bufs: &[IoSlice<'_>],
328    ) -> Poll<io::Result<usize>> {
329        Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
330    }
331
332    fn is_write_vectored(&self) -> bool {
333        true
334    }
335
336    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
337        Poll::Ready(io::Write::flush(&mut *self))
338    }
339
340    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
341        self.poll_flush(cx)
342    }
343}
344
345impl AsyncWrite for io::Cursor<Vec<u8>> {
346    fn poll_write(
347        mut self: Pin<&mut Self>,
348        _: &mut Context<'_>,
349        buf: &[u8],
350    ) -> Poll<io::Result<usize>> {
351        Poll::Ready(io::Write::write(&mut *self, buf))
352    }
353
354    fn poll_write_vectored(
355        mut self: Pin<&mut Self>,
356        _: &mut Context<'_>,
357        bufs: &[IoSlice<'_>],
358    ) -> Poll<io::Result<usize>> {
359        Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
360    }
361
362    fn is_write_vectored(&self) -> bool {
363        true
364    }
365
366    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
367        Poll::Ready(io::Write::flush(&mut *self))
368    }
369
370    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
371        self.poll_flush(cx)
372    }
373}
374
375impl AsyncWrite for io::Cursor<Box<[u8]>> {
376    fn poll_write(
377        mut self: Pin<&mut Self>,
378        _: &mut Context<'_>,
379        buf: &[u8],
380    ) -> Poll<io::Result<usize>> {
381        Poll::Ready(io::Write::write(&mut *self, buf))
382    }
383
384    fn poll_write_vectored(
385        mut self: Pin<&mut Self>,
386        _: &mut Context<'_>,
387        bufs: &[IoSlice<'_>],
388    ) -> Poll<io::Result<usize>> {
389        Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
390    }
391
392    fn is_write_vectored(&self) -> bool {
393        true
394    }
395
396    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
397        Poll::Ready(io::Write::flush(&mut *self))
398    }
399
400    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
401        self.poll_flush(cx)
402    }
403}