1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
use crate::acquire::Acquire;
use crate::migrate::{AppliedMigration, Migrate, MigrateError, Migration, MigrationSource};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::slice;
#[derive(Debug)]
pub struct Migrator {
pub migrations: Cow<'static, [Migration]>,
pub ignore_missing: bool,
}
fn validate_applied_migrations(
applied_migrations: &[AppliedMigration],
migrator: &Migrator,
) -> Result<(), MigrateError> {
if migrator.ignore_missing {
return Ok(());
}
let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
for applied_migration in applied_migrations {
if !migrations.contains(&applied_migration.version) {
return Err(MigrateError::VersionMissing(applied_migration.version));
}
}
Ok(())
}
impl Migrator {
/// Creates a new instance with the given source.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// use std::path::Path;
///
/// // Read migrations from a local folder: ./migrations
/// let m = Migrator::new(Path::new("./migrations")).await?;
/// # Ok(())
/// # })
/// # }
/// ```
/// See [MigrationSource] for details on structure of the `./migrations` directory.
pub async fn new<'s, S>(source: S) -> Result<Self, MigrateError>
where
S: MigrationSource<'s>,
{
Ok(Self {
migrations: Cow::Owned(source.resolve().await.map_err(MigrateError::Source)?),
ignore_missing: false,
})
}
/// Specify should ignore applied migrations that missing in the resolved migrations.
pub fn set_ignore_missing(&mut self, ignore_missing: bool) -> &Self {
self.ignore_missing = ignore_missing;
self
}
/// Get an iterator over all known migrations.
pub fn iter(&self) -> slice::Iter<'_, Migration> {
self.migrations.iter()
}
/// Run any pending migrations against the database; and, validate previously applied migrations
/// against the current migration source to detect accidental changes in previously-applied migrations.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # #[cfg(feature = "sqlite")]
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// let m = Migrator::new(std::path::Path::new("./migrations")).await?;
/// let pool = sqlx_core::sqlite::SqlitePoolOptions::new().connect("sqlite::memory:").await?;
/// m.run(&pool).await
/// # })
/// # }
/// ```
pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError>
where
A: Acquire<'a>,
<A::Connection as Deref>::Target: Migrate,
{
let mut conn = migrator.acquire().await?;
// lock the database for exclusive access by the migrator
conn.lock().await?;
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table().await?;
let version = conn.dirty_version().await?;
if let Some(version) = version {
return Err(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations().await?;
validate_applied_migrations(&applied_migrations, self)?;
let applied_migrations: HashMap<_, _> = applied_migrations
.into_iter()
.map(|m| (m.version, m))
.collect();
for migration in self.iter() {
if migration.migration_type.is_down_migration() {
continue;
}
match applied_migrations.get(&migration.version) {
Some(applied_migration) => {
if migration.checksum != applied_migration.checksum {
return Err(MigrateError::VersionMismatch(migration.version));
}
}
None => {
conn.apply(migration).await?;
}
}
}
// unlock the migrator to allow other migrators to run
// but do nothing as we already migrated
conn.unlock().await?;
Ok(())
}
/// Run down migrations against the database until a specific version.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # #[cfg(feature = "sqlite")]
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// let m = Migrator::new(std::path::Path::new("./migrations")).await?;
/// let pool = sqlx_core::sqlite::SqlitePoolOptions::new().connect("sqlite::memory:").await?;
/// m.undo(&pool, 4).await
/// # })
/// # }
/// ```
pub async fn undo<'a, A>(&self, migrator: A, target: i64) -> Result<(), MigrateError>
where
A: Acquire<'a>,
<A::Connection as Deref>::Target: Migrate,
{
let mut conn = migrator.acquire().await?;
// lock the database for exclusive access by the migrator
conn.lock().await?;
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table().await?;
let version = conn.dirty_version().await?;
if let Some(version) = version {
return Err(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations().await?;
validate_applied_migrations(&applied_migrations, self)?;
let applied_migrations: HashMap<_, _> = applied_migrations
.into_iter()
.map(|m| (m.version, m))
.collect();
for migration in self
.iter()
.rev()
.filter(|m| m.migration_type.is_down_migration())
.filter(|m| applied_migrations.contains_key(&m.version))
.filter(|m| m.version > target)
{
conn.revert(migration).await?;
}
// unlock the migrator to allow other migrators to run
// but do nothing as we already migrated
conn.unlock().await?;
Ok(())
}
}