1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use crate::error::Error;
use futures_core::Future;
use futures_util::ready;
use sqlx_rt::AsyncWrite;
use std::io::{BufRead, Cursor};
use std::pin::Pin;
use std::task::{Context, Poll};

// Atomic operation that writes the full buffer to the stream, flushes the stream, and then
// clears the buffer (even if either of the two previous operations failed).
pub struct WriteAndFlush<'a, S> {
    pub(super) stream: &'a mut S,
    pub(super) buf: Cursor<&'a mut Vec<u8>>,
}

impl<S: AsyncWrite + Unpin> Future for WriteAndFlush<'_, S> {
    type Output = Result<(), Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Self {
            ref mut stream,
            ref mut buf,
        } = *self;

        loop {
            let read = buf.fill_buf()?;

            if !read.is_empty() {
                let written = ready!(Pin::new(&mut *stream).poll_write(cx, read)?);
                buf.consume(written);
            } else {
                break;
            }
        }

        Pin::new(stream).poll_flush(cx).map_err(Error::Io)
    }
}

impl<'a, S> Drop for WriteAndFlush<'a, S> {
    fn drop(&mut self) {
        // clear the buffer regardless of whether the flush succeeded or not
        self.buf.get_mut().clear();
    }
}