use std::fmt::{self, Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;
use futures_intrusive::sync::SemaphoreReleaser;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use super::inner::{DecrementSizeGuard, SharedPool};
use std::future::Future;
pub struct PoolConnection<DB: Database> {
live: Option<Live<DB>>,
pub(crate) pool: Arc<SharedPool<DB>>,
}
pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) created: Instant,
}
pub(super) struct Idle<DB: Database> {
pub(super) live: Live<DB>,
pub(super) since: Instant,
}
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<DB>,
}
const DEREF_ERR: &str = "(bug) connection already released to pool";
impl<DB: Database> Debug for PoolConnection<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolConnection").finish()
}
}
impl<DB: Database> Deref for PoolConnection<DB> {
type Target = DB::Connection;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(DEREF_ERR).raw
}
}
impl<DB: Database> DerefMut for PoolConnection<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(DEREF_ERR).raw
}
}
impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
fn as_ref(&self) -> &DB::Connection {
self
}
}
impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
fn as_mut(&mut self) -> &mut DB::Connection {
self
}
}
impl<DB: Database> PoolConnection<DB> {
#[deprecated = "renamed to `.detach()` for clarity"]
pub fn release(self) -> DB::Connection {
self.detach()
}
pub fn detach(mut self) -> DB::Connection {
self.live
.take()
.expect("PoolConnection double-dropped")
.float(self.pool.clone())
.detach()
}
pub fn leak(mut self) -> DB::Connection {
self.live.take().expect("PoolConnection double-dropped").raw
}
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
let floating = self.live.take().map(|live| live.float(self.pool.clone()));
async move {
let mut floating = if let Some(floating) = floating {
floating
} else {
return;
};
if let Err(e) = floating.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);
drop(floating);
} else {
floating.release();
}
}
}
}
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if self.live.is_some() {
#[cfg(not(feature = "_rt-async-std"))]
if let Ok(handle) = sqlx_rt::Handle::try_current() {
handle.spawn(self.return_to_pool());
}
#[cfg(feature = "_rt-async-std")]
sqlx_rt::spawn(self.return_to_pool());
}
}
}
impl<DB: Database> Live<DB> {
pub fn float(self, pool: Arc<SharedPool<DB>>) -> Floating<DB, Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new_permit(pool),
}
}
pub fn into_idle(self) -> Idle<DB> {
Idle {
live: self,
since: Instant::now(),
}
}
}
impl<DB: Database> Deref for Idle<DB> {
type Target = Live<DB>;
fn deref(&self) -> &Self::Target {
&self.live
}
}
impl<DB: Database> DerefMut for Idle<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live
}
}
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
Self {
inner: Live {
raw: conn,
created: Instant::now(),
},
guard,
}
}
pub fn attach(self, pool: &Arc<SharedPool<DB>>) -> PoolConnection<DB> {
let Floating { inner, guard } = self;
debug_assert!(
guard.same_pool(pool),
"BUG: attaching connection to different pool"
);
guard.cancel();
PoolConnection {
live: Some(inner),
pool: Arc::clone(pool),
}
}
pub fn release(self) {
self.guard.pool.clone().release(self);
}
pub async fn close(self) -> Result<(), Error> {
self.inner.raw.close().await
}
pub fn detach(self) -> DB::Connection {
self.inner.raw
}
pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
}
impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn from_idle(
idle: Idle<DB>,
pool: Arc<SharedPool<DB>>,
permit: SemaphoreReleaser<'_>,
) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::from_permit(pool, permit),
}
}
pub async fn ping(&mut self) -> Result<(), Error> {
self.live.raw.ping().await
}
pub fn into_live(self) -> Floating<DB, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}
pub async fn close(self) -> DecrementSizeGuard<DB> {
if let Err(e) = self.inner.live.raw.close().await {
log::debug!("error occurred while closing the pool connection: {}", e);
}
self.guard
}
}
impl<DB: Database, C> Deref for Floating<DB, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<DB: Database, C> DerefMut for Floating<DB, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}