use std::marker::PhantomData;
use either::Either;
use futures_core::stream::BoxStream;
use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
use crate::arguments::{Arguments, IntoArguments};
use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
use crate::encode::Encode;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::statement::Statement;
use crate::types::Type;
#[must_use = "query must be executed to affect database"]
pub struct Query<'q, DB: Database, A> {
pub(crate) statement: Either<&'q str, &'q <DB as HasStatement<'q>>::Statement>,
pub(crate) arguments: Option<A>,
pub(crate) database: PhantomData<DB>,
pub(crate) persistent: bool,
}
#[must_use = "query must be executed to affect database"]
pub struct Map<'q, DB: Database, F, A> {
inner: Query<'q, DB, A>,
mapper: F,
}
impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A>
where
DB: Database,
A: Send + IntoArguments<'q, DB>,
{
#[inline]
fn sql(&self) -> &'q str {
match self.statement {
Either::Right(ref statement) => statement.sql(),
Either::Left(sql) => sql,
}
}
fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
match self.statement {
Either::Right(ref statement) => Some(&statement),
Either::Left(_) => None,
}
}
#[inline]
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.arguments.take().map(IntoArguments::into_arguments)
}
#[inline]
fn persistent(&self) -> bool {
self.persistent
}
}
impl<'q, DB: Database> Query<'q, DB, <DB as HasArguments<'q>>::Arguments> {
pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
if let Some(arguments) = &mut self.arguments {
arguments.add(value);
}
self
}
}
impl<'q, DB, A> Query<'q, DB, A>
where
DB: Database + HasStatementCache,
{
pub fn persistent(mut self, value: bool) -> Self {
self.persistent = value;
self
}
}
impl<'q, DB, A: Send> Query<'q, DB, A>
where
DB: Database,
A: 'q + IntoArguments<'q, DB>,
{
#[inline]
pub fn map<F, O>(
self,
mut f: F,
) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, Error> + Send, A>
where
F: FnMut(DB::Row) -> O + Send,
O: Unpin,
{
self.try_map(move |row| Ok(f(row)))
}
#[inline]
pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A>
where
F: FnMut(DB::Row) -> Result<O, Error> + Send,
O: Unpin,
{
Map {
inner: self,
mapper: f,
}
}
#[inline]
pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.execute(self).await
}
#[inline]
pub async fn execute_many<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<DB::QueryResult, Error>>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.execute_many(self)
}
#[inline]
pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.fetch(self)
}
#[inline]
pub fn fetch_many<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.fetch_many(self)
}
#[inline]
pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.fetch_all(self).await
}
#[inline]
pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.fetch_one(self).await
}
#[inline]
pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.fetch_optional(self).await
}
}
impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<'q, DB, F, A>
where
DB: Database,
A: IntoArguments<'q, DB>,
{
#[inline]
fn sql(&self) -> &'q str {
self.inner.sql()
}
#[inline]
fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
self.inner.statement()
}
#[inline]
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.inner.take_arguments()
}
#[inline]
fn persistent(&self) -> bool {
self.inner.arguments.is_some()
}
}
impl<'q, DB, F, O, A> Map<'q, DB, F, A>
where
DB: Database,
F: FnMut(DB::Row) -> Result<O, Error> + Send,
O: Send + Unpin,
A: 'q + Send + IntoArguments<'q, DB>,
{
#[inline]
pub fn map<G, P>(
self,
mut g: G,
) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
where
G: FnMut(O) -> P + Send,
P: Unpin,
{
self.try_map(move |data| Ok(g(data)))
}
#[inline]
pub fn try_map<G, P>(
self,
mut g: G,
) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
where
G: FnMut(O) -> Result<P, Error> + Send,
P: Unpin,
{
let mut f = self.mapper;
Map {
inner: self.inner,
mapper: move |row| f(row).and_then(|o| g(o)),
}
}
pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
F: 'e,
O: 'e,
{
self.fetch_many(executor)
.try_filter_map(|step| async move {
Ok(match step {
Either::Left(_) => None,
Either::Right(o) => Some(o),
})
})
.boxed()
}
pub fn fetch_many<'e, 'c: 'e, E>(
mut self,
executor: E,
) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
F: 'e,
O: 'e,
{
Box::pin(try_stream! {
let mut s = executor.fetch_many(self.inner);
while let Some(v) = s.try_next().await? {
r#yield!(match v {
Either::Left(v) => Either::Left(v),
Either::Right(row) => {
Either::Right((self.mapper)(row)?)
}
});
}
Ok(())
})
}
pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
F: 'e,
O: 'e,
{
self.fetch(executor).try_collect().await
}
pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
F: 'e,
O: 'e,
{
self.fetch_optional(executor)
.and_then(|row| match row {
Some(row) => future::ok(row),
None => future::err(Error::RowNotFound),
})
.await
}
pub async fn fetch_optional<'e, 'c: 'e, E>(mut self, executor: E) -> Result<Option<O>, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
F: 'e,
O: 'e,
{
let row = executor.fetch_optional(self.inner).await?;
if let Some(row) = row {
(self.mapper)(row).map(Some)
} else {
Ok(None)
}
}
}
pub(crate) fn query_statement<'q, DB>(
statement: &'q <DB as HasStatement<'q>>::Statement,
) -> Query<'q, DB, <DB as HasArguments<'_>>::Arguments>
where
DB: Database,
{
Query {
database: PhantomData,
arguments: Some(Default::default()),
statement: Either::Right(statement),
persistent: true,
}
}
pub(crate) fn query_statement_with<'q, DB, A>(
statement: &'q <DB as HasStatement<'q>>::Statement,
arguments: A,
) -> Query<'q, DB, A>
where
DB: Database,
A: IntoArguments<'q, DB>,
{
Query {
database: PhantomData,
arguments: Some(arguments),
statement: Either::Right(statement),
persistent: true,
}
}
pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as HasArguments<'_>>::Arguments>
where
DB: Database,
{
Query {
database: PhantomData,
arguments: Some(Default::default()),
statement: Either::Left(sql),
persistent: true,
}
}
pub fn query_with<'q, DB, A>(sql: &'q str, arguments: A) -> Query<'q, DB, A>
where
DB: Database,
A: IntoArguments<'q, DB>,
{
Query {
database: PhantomData,
arguments: Some(arguments),
statement: Either::Left(sql),
persistent: true,
}
}