tokio/io/async_write.rs
1use std::io::{self, IoSlice};
2use std::ops::DerefMut;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6/// Writes bytes asynchronously.
7///
8/// This trait is analogous to the [`std::io::Write`] trait, but integrates with
9/// the asynchronous task system. In particular, the [`poll_write`] method,
10/// unlike [`Write::write`], will automatically queue the current task for wakeup
11/// and return if data is not yet available, rather than blocking the calling
12/// thread.
13///
14/// Specifically, this means that the [`poll_write`] function will return one of
15/// the following:
16///
17/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately
18/// written.
19///
20/// * `Poll::Pending` means that no data was written from the buffer
21/// provided. The I/O object is not currently writable but may become writable
22/// in the future. Most importantly, **the current future's task is scheduled
23/// to get unparked when the object is writable**. This means that like
24/// `Future::poll` you'll receive a notification when the I/O object is
25/// writable again.
26///
27/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
28/// underlying object.
29///
30/// This trait importantly means that the `write` method only works in the
31/// context of a future's task. The object may panic if used outside of a task.
32///
33/// Utilities for working with `AsyncWrite` values are provided by
34/// [`AsyncWriteExt`].
35///
36/// [`std::io::Write`]: std::io::Write
37/// [`Write::write`]: std::io::Write::write()
38/// [`poll_write`]: AsyncWrite::poll_write()
39/// [`AsyncWriteExt`]: crate::io::AsyncWriteExt
40pub trait AsyncWrite {
41 /// Attempt to write bytes from `buf` into the object.
42 ///
43 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. If successful,
44 /// then it must be guaranteed that `n <= buf.len()`. A return value of `0`
45 /// typically means that the underlying object is no longer able to accept
46 /// bytes and will likely not be able to in the future as well, or that the
47 /// buffer provided is empty.
48 ///
49 /// If the object is not ready for writing, the method returns
50 /// `Poll::Pending` and arranges for the current task (via
51 /// `cx.waker()`) to receive a notification when the object becomes
52 /// writable or is closed.
53 fn poll_write(
54 self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 buf: &[u8],
57 ) -> Poll<io::Result<usize>>;
58
59 /// Attempts to flush the object, ensuring that any buffered data reach
60 /// their destination.
61 ///
62 /// On success, returns `Poll::Ready(Ok(()))`.
63 ///
64 /// If flushing cannot immediately complete, this method returns
65 /// `Poll::Pending` and arranges for the current task (via
66 /// `cx.waker()`) to receive a notification when the object can make
67 /// progress towards flushing.
68 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
69
70 /// Initiates or attempts to shut down this writer, returning success when
71 /// the I/O connection has completely shut down.
72 ///
73 /// This method is intended to be used for asynchronous shutdown of I/O
74 /// connections. For example this is suitable for implementing shutdown of a
75 /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
76 /// Protocols sometimes need to flush out final pieces of data or otherwise
77 /// perform a graceful shutdown handshake, reading/writing more data as
78 /// appropriate. This method is the hook for such protocols to implement the
79 /// graceful shutdown logic.
80 ///
81 /// This `shutdown` method is required by implementers of the
82 /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
83 /// through to the wrapped type, and base types will typically implement
84 /// shutdown logic here or just return `Ok(().into())`. Note that if you're
85 /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
86 /// transitively the entire stream has been shut down. After your wrapper's
87 /// shutdown logic has been executed you should shut down the underlying
88 /// stream.
89 ///
90 /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
91 /// method returns `Ready` it implies that a flush successfully happened
92 /// before the shutdown happened. That is, callers don't need to call
93 /// `flush` before calling `shutdown`. They can rely that by calling
94 /// `shutdown` any pending buffered data will be written out.
95 ///
96 /// # Return value
97 ///
98 /// This function returns a `Poll<io::Result<()>>` classified as such:
99 ///
100 /// * `Poll::Ready(Ok(()))` - indicates that the connection was
101 /// successfully shut down and is now safe to deallocate/drop/close
102 /// resources associated with it. This method means that the current task
103 /// will no longer receive any notifications due to this method and the
104 /// I/O object itself is likely no longer usable.
105 ///
106 /// * `Poll::Pending` - indicates that shutdown is initiated but could
107 /// not complete just yet. This may mean that more I/O needs to happen to
108 /// continue this shutdown operation. The current task is scheduled to
109 /// receive a notification when it's otherwise ready to continue the
110 /// shutdown operation. When woken up this method should be called again.
111 ///
112 /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown,
113 /// indicating that the shutdown operation did not complete successfully.
114 /// This typically means that the I/O object is no longer usable.
115 ///
116 /// # Errors
117 ///
118 /// This function can return normal I/O errors through `Err`, described
119 /// above. Additionally this method may also render the underlying
120 /// `Write::write` method no longer usable (e.g. will return errors in the
121 /// future). It's recommended that once `shutdown` is called the
122 /// `write` method is no longer called.
123 ///
124 /// # Panics
125 ///
126 /// This function will panic if not called within the context of a future's
127 /// task.
128 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
129
130 /// Like [`poll_write`], except that it writes from a slice of buffers.
131 ///
132 /// Data is copied from each buffer in order, with the final buffer
133 /// read from possibly being only partially consumed. This method must
134 /// behave as a call to [`write`] with the buffers concatenated would.
135 ///
136 /// The default implementation calls [`poll_write`] with either the first nonempty
137 /// buffer provided, or an empty one if none exists.
138 ///
139 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
140 ///
141 /// If the object is not ready for writing, the method returns
142 /// `Poll::Pending` and arranges for the current task (via
143 /// `cx.waker()`) to receive a notification when the object becomes
144 /// writable or is closed.
145 ///
146 /// # Note
147 ///
148 /// This should be implemented as a single "atomic" write action. If any
149 /// data has been partially written, it is wrong to return an error or
150 /// pending.
151 ///
152 /// [`poll_write`]: AsyncWrite::poll_write
153 fn poll_write_vectored(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 bufs: &[IoSlice<'_>],
157 ) -> Poll<io::Result<usize>> {
158 let buf = bufs
159 .iter()
160 .find(|b| !b.is_empty())
161 .map_or(&[][..], |b| &**b);
162 self.poll_write(cx, buf)
163 }
164
165 /// Determines if this writer has an efficient [`poll_write_vectored`]
166 /// implementation.
167 ///
168 /// If a writer does not override the default [`poll_write_vectored`]
169 /// implementation, code using it may want to avoid the method all together
170 /// and coalesce writes into a single buffer for higher performance.
171 ///
172 /// The default implementation returns `false`.
173 ///
174 /// [`poll_write_vectored`]: AsyncWrite::poll_write_vectored
175 fn is_write_vectored(&self) -> bool {
176 false
177 }
178}
179
180macro_rules! deref_async_write {
181 () => {
182 fn poll_write(
183 mut self: Pin<&mut Self>,
184 cx: &mut Context<'_>,
185 buf: &[u8],
186 ) -> Poll<io::Result<usize>> {
187 Pin::new(&mut **self).poll_write(cx, buf)
188 }
189
190 fn poll_write_vectored(
191 mut self: Pin<&mut Self>,
192 cx: &mut Context<'_>,
193 bufs: &[IoSlice<'_>],
194 ) -> Poll<io::Result<usize>> {
195 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
196 }
197
198 fn is_write_vectored(&self) -> bool {
199 (**self).is_write_vectored()
200 }
201
202 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
203 Pin::new(&mut **self).poll_flush(cx)
204 }
205
206 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
207 Pin::new(&mut **self).poll_shutdown(cx)
208 }
209 };
210}
211
212impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
213 deref_async_write!();
214}
215
216impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
217 deref_async_write!();
218}
219
220impl<P> AsyncWrite for Pin<P>
221where
222 P: DerefMut,
223 P::Target: AsyncWrite,
224{
225 fn poll_write(
226 self: Pin<&mut Self>,
227 cx: &mut Context<'_>,
228 buf: &[u8],
229 ) -> Poll<io::Result<usize>> {
230 crate::util::pin_as_deref_mut(self).poll_write(cx, buf)
231 }
232
233 fn poll_write_vectored(
234 self: Pin<&mut Self>,
235 cx: &mut Context<'_>,
236 bufs: &[IoSlice<'_>],
237 ) -> Poll<io::Result<usize>> {
238 crate::util::pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
239 }
240
241 fn is_write_vectored(&self) -> bool {
242 (**self).is_write_vectored()
243 }
244
245 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
246 crate::util::pin_as_deref_mut(self).poll_flush(cx)
247 }
248
249 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
250 crate::util::pin_as_deref_mut(self).poll_shutdown(cx)
251 }
252}
253
254impl AsyncWrite for Vec<u8> {
255 fn poll_write(
256 self: Pin<&mut Self>,
257 _cx: &mut Context<'_>,
258 buf: &[u8],
259 ) -> Poll<io::Result<usize>> {
260 self.get_mut().extend_from_slice(buf);
261 Poll::Ready(Ok(buf.len()))
262 }
263
264 fn poll_write_vectored(
265 mut self: Pin<&mut Self>,
266 _: &mut Context<'_>,
267 bufs: &[IoSlice<'_>],
268 ) -> Poll<io::Result<usize>> {
269 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
270 }
271
272 fn is_write_vectored(&self) -> bool {
273 true
274 }
275
276 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
277 Poll::Ready(Ok(()))
278 }
279
280 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
281 Poll::Ready(Ok(()))
282 }
283}
284
285impl AsyncWrite for io::Cursor<&mut [u8]> {
286 fn poll_write(
287 mut self: Pin<&mut Self>,
288 _: &mut Context<'_>,
289 buf: &[u8],
290 ) -> Poll<io::Result<usize>> {
291 Poll::Ready(io::Write::write(&mut *self, buf))
292 }
293
294 fn poll_write_vectored(
295 mut self: Pin<&mut Self>,
296 _: &mut Context<'_>,
297 bufs: &[IoSlice<'_>],
298 ) -> Poll<io::Result<usize>> {
299 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
300 }
301
302 fn is_write_vectored(&self) -> bool {
303 true
304 }
305
306 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
307 Poll::Ready(io::Write::flush(&mut *self))
308 }
309
310 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
311 self.poll_flush(cx)
312 }
313}
314
315impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
316 fn poll_write(
317 mut self: Pin<&mut Self>,
318 _: &mut Context<'_>,
319 buf: &[u8],
320 ) -> Poll<io::Result<usize>> {
321 Poll::Ready(io::Write::write(&mut *self, buf))
322 }
323
324 fn poll_write_vectored(
325 mut self: Pin<&mut Self>,
326 _: &mut Context<'_>,
327 bufs: &[IoSlice<'_>],
328 ) -> Poll<io::Result<usize>> {
329 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
330 }
331
332 fn is_write_vectored(&self) -> bool {
333 true
334 }
335
336 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
337 Poll::Ready(io::Write::flush(&mut *self))
338 }
339
340 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
341 self.poll_flush(cx)
342 }
343}
344
345impl AsyncWrite for io::Cursor<Vec<u8>> {
346 fn poll_write(
347 mut self: Pin<&mut Self>,
348 _: &mut Context<'_>,
349 buf: &[u8],
350 ) -> Poll<io::Result<usize>> {
351 Poll::Ready(io::Write::write(&mut *self, buf))
352 }
353
354 fn poll_write_vectored(
355 mut self: Pin<&mut Self>,
356 _: &mut Context<'_>,
357 bufs: &[IoSlice<'_>],
358 ) -> Poll<io::Result<usize>> {
359 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
360 }
361
362 fn is_write_vectored(&self) -> bool {
363 true
364 }
365
366 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
367 Poll::Ready(io::Write::flush(&mut *self))
368 }
369
370 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
371 self.poll_flush(cx)
372 }
373}
374
375impl AsyncWrite for io::Cursor<Box<[u8]>> {
376 fn poll_write(
377 mut self: Pin<&mut Self>,
378 _: &mut Context<'_>,
379 buf: &[u8],
380 ) -> Poll<io::Result<usize>> {
381 Poll::Ready(io::Write::write(&mut *self, buf))
382 }
383
384 fn poll_write_vectored(
385 mut self: Pin<&mut Self>,
386 _: &mut Context<'_>,
387 bufs: &[IoSlice<'_>],
388 ) -> Poll<io::Result<usize>> {
389 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
390 }
391
392 fn is_write_vectored(&self) -> bool {
393 true
394 }
395
396 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
397 Poll::Ready(io::Write::flush(&mut *self))
398 }
399
400 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
401 self.poll_flush(cx)
402 }
403}