Commit df901a1c authored by Stephen Finucane's avatar Stephen Finucane
Browse files

Replace use of 'Engine.execute()'



Resolve the following RemovedIn20Warning warning:

  The Engine.execute() method is considered legacy as of the 1.x series
  of SQLAlchemy and will be removed in 2.0. All statement execution in
  SQLAlchemy 2.0 is performed by the Connection.execute() method of
  Connection, or in the ORM by the Session.execute() method of Session.

Change-Id: I4c47a690a94abcb3b4b6fb087a1bf86c5350b523
Signed-off-by: default avatarStephen Finucane <stephenfin@redhat.com>
parent ecb15c4a
......@@ -490,32 +490,34 @@ def drop_old_duplicate_entries_from_table(engine, table_name,
func.count(table.c.id) > 1
)
for row in engine.execute(duplicated_rows_select).fetchall():
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row._mapping[name]
rows_to_delete_select = sqlalchemy.sql.select(
table.c.id,
).where(delete_condition)
for row in engine.execute(rows_to_delete_select).fetchall():
LOG.info("Deleting duplicated row with id: %(id)s from table: "
"%(table)s", dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
engine.execute(delete_statement)
with engine.connect() as conn:
for row in conn.execute(duplicated_rows_select).fetchall():
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
is_none = None # workaround for pyflakes
delete_condition &= table.c.deleted_at == is_none
for name in uc_column_names:
delete_condition &= table.c[name] == row._mapping[name]
rows_to_delete_select = sqlalchemy.sql.select(
table.c.id,
).where(delete_condition)
for row in conn.execute(rows_to_delete_select).fetchall():
LOG.info(
"Deleting duplicated row with id: %(id)s from table: "
"%(table)s", dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
where(delete_condition).\
values({
'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()
})
else:
delete_statement = table.delete().where(delete_condition)
conn.execute(delete_statement)
def _get_default_deleted_value(table):
......@@ -1118,7 +1120,9 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version',
params['database'] = connectable.engine.url.database
query = text(query_str)
noninnodb = connectable.execute(query, params)
# TODO(stephenfin): What about if this is already a Connection?
with connectable.connect() as conn:
noninnodb = conn.execute(query, params)
return [i[0] for i in noninnodb]
......
......@@ -47,11 +47,6 @@ class WarningsFixture(fixtures.Fixture):
message=r'The Session.begin.subtransactions flag is deprecated .*',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'The Engine.execute\(\) method is considered legacy .*',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'The current statement is being autocommitted .*',
......@@ -73,4 +68,10 @@ class WarningsFixture(fixtures.Fixture):
module='migrate',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'ignore',
message=r'The Engine.execute\(\) method is considered legacy .*',
module='migrate',
category=sqla_exc.SADeprecationWarning)
self.addCleanup(warnings.resetwarnings)
......@@ -270,14 +270,16 @@ class TestNonExistentConstraintPostgreSQL(
):
def test_raise(self):
matched = self.assertRaises(
exception.DBNonExistentConstraint,
self.engine.execute,
sqla.schema.DropConstraint(
sqla.ForeignKeyConstraint(["id"], ["baz.id"],
name="bar_fkey",
table=self.table_1)),
)
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBNonExistentConstraint,
conn.execute,
sqla.schema.DropConstraint(
sqla.ForeignKeyConstraint(["id"], ["baz.id"],
name="bar_fkey",
table=self.table_1)),
)
self.assertInnerException(
matched,
"ProgrammingError",
......@@ -295,14 +297,16 @@ class TestNonExistentConstraintMySQL(
):
def test_raise(self):
matched = self.assertRaises(
exception.DBNonExistentConstraint,
self.engine.execute,
sqla.schema.DropConstraint(
sqla.ForeignKeyConstraint(["id"], ["baz.id"],
name="bar_fkey",
table=self.table_1)),
)
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBNonExistentConstraint,
conn.execute,
sqla.schema.DropConstraint(
sqla.ForeignKeyConstraint(["id"], ["baz.id"],
name="bar_fkey",
table=self.table_1)),
)
# NOTE(jd) Cannot check precisely with assertInnerException since MySQL
# error are not the same depending on its version…
self.assertIsInstance(matched.inner_exception,
......@@ -332,11 +336,13 @@ class TestNonExistentTable(
)
def test_raise(self):
matched = self.assertRaises(
exception.DBNonExistentTable,
self.engine.execute,
sqla.schema.DropTable(self.table_1),
)
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBNonExistentTable,
conn.execute,
sqla.schema.DropTable(self.table_1),
)
self.assertInnerException(
matched,
"OperationalError",
......@@ -352,11 +358,13 @@ class TestNonExistentTablePostgreSQL(
):
def test_raise(self):
matched = self.assertRaises(
exception.DBNonExistentTable,
self.engine.execute,
sqla.schema.DropTable(self.table_1),
)
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBNonExistentTable,
conn.execute,
sqla.schema.DropTable(self.table_1),
)
self.assertInnerException(
matched,
"ProgrammingError",
......@@ -372,11 +380,13 @@ class TestNonExistentTableMySQL(
):
def test_raise(self):
matched = self.assertRaises(
exception.DBNonExistentTable,
self.engine.execute,
sqla.schema.DropTable(self.table_1),
)
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBNonExistentTable,
conn.execute,
sqla.schema.DropTable(self.table_1),
)
# NOTE(jd) Cannot check precisely with assertInnerException since MySQL
# error are not the same depending on its version…
self.assertIsInstance(matched.inner_exception,
......@@ -488,13 +498,14 @@ class TestReferenceErrorSQLite(
self.table_2.create(self.engine)
def test_raise(self):
self.engine.execute(sql.text("PRAGMA foreign_keys = ON"))
with self.engine.connect() as conn:
conn.execute(sql.text("PRAGMA foreign_keys = ON"))
matched = self.assertRaises(
exception.DBReferenceError,
self.engine.execute,
self.table_2.insert().values(id=1, foo_id=2)
)
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_2.insert().values(id=1, foo_id=2)
)
self.assertInnerException(
matched,
......@@ -510,16 +521,17 @@ class TestReferenceErrorSQLite(
self.assertIsNone(matched.key_table)
def test_raise_delete(self):
self.engine.execute(sql.text("PRAGMA foreign_keys = ON"))
with self.engine.connect() as conn:
conn.execute(sql.text("PRAGMA foreign_keys = ON"))
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
self.engine.execute,
self.table_1.delete()
)
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
self.assertInnerException(
matched,
"IntegrityError",
......@@ -539,12 +551,14 @@ class TestReferenceErrorPostgreSQL(
db_test_base._PostgreSQLOpportunisticTestCase,
):
def test_raise(self):
params = {'id': 1, 'foo_id': 2}
matched = self.assertRaises(
exception.DBReferenceError,
self.engine.execute,
self.table_2.insert().values(**params)
)
with self.engine.connect() as conn:
params = {'id': 1, 'foo_id': 2}
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_2.insert().values(**params)
)
self.assertInnerException(
matched,
"IntegrityError",
......@@ -565,11 +579,12 @@ class TestReferenceErrorPostgreSQL(
with self.engine.connect() as conn:
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
self.engine.execute,
self.table_1.delete()
)
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
self.assertInnerException(
matched,
"IntegrityError",
......@@ -592,11 +607,12 @@ class TestReferenceErrorMySQL(
db_test_base._MySQLOpportunisticTestCase,
):
def test_raise(self):
matched = self.assertRaises(
exception.DBReferenceError,
self.engine.execute,
self.table_2.insert().values(id=1, foo_id=2)
)
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_2.insert().values(id=1, foo_id=2)
)
# NOTE(jd) Cannot check precisely with assertInnerException since MySQL
# error are not the same depending on its version…
......@@ -635,11 +651,11 @@ class TestReferenceErrorMySQL(
with self.engine.connect() as conn:
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
self.engine.execute,
self.table_1.delete()
)
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
# NOTE(jd) Cannot check precisely with assertInnerException since MySQL
# error are not the same depending on its version…
self.assertIsInstance(matched.inner_exception,
......@@ -1185,9 +1201,10 @@ class TestDBDisconnected(TestsExceptionFilter):
# test implicit execution
with self._fixture(dialect_name, exc_obj, 1):
self.assertEqual(
1, self.engine.execute(sqla.select(1)).scalars().first(),
)
with self.engine.connect() as conn:
self.assertEqual(
1, conn.execute(sqla.select(1)).scalars().first(),
)
def test_mariadb_error_1927(self):
for code in [1927]:
......
......@@ -109,7 +109,7 @@ class SQLiteSavepointTest(db_test_base._DbTestCase):
)
self.assertEqual(
[(1, 'data 1')],
self.engine.execute(
conn.execute(
self.test_table.select().
order_by(self.test_table.c.id)
).fetchall()
......@@ -145,13 +145,13 @@ class SQLiteSavepointTest(db_test_base._DbTestCase):
{'data': 'data 3'}
)
self.assertEqual(
[(1, 'data 1'), (2, 'data 3')],
self.engine.execute(
self.test_table.select().
order_by(self.test_table.c.id)
).fetchall()
)
self.assertEqual(
[(1, 'data 1'), (2, 'data 3')],
conn.execute(
self.test_table.select().
order_by(self.test_table.c.id)
).fetchall()
)
def test_savepoint_beginning(self):
with self.engine.begin() as conn:
......@@ -167,13 +167,13 @@ class SQLiteSavepointTest(db_test_base._DbTestCase):
{'data': 'data 2'}
)
self.assertEqual(
[(1, 'data 2')],
self.engine.execute(
self.test_table.select().
order_by(self.test_table.c.id)
).fetchall()
)
self.assertEqual(
[(1, 'data 2')],
conn.execute(
self.test_table.select().
order_by(self.test_table.c.id)
).fetchall()
)
class FakeDBAPIConnection(object):
......@@ -476,34 +476,42 @@ class SQLiteConnectTest(test_base.BaseTestCase):
def test_sqlite_fk_listener(self):
engine = self._fixture(sqlite_fk=True)
self.assertEqual(
1,
engine.execute(sql.text('pragma foreign_keys')).scalars().first(),
)
with engine.connect() as conn:
self.assertEqual(
1,
conn.execute(
sql.text('pragma foreign_keys')
).scalars().first(),
)
engine = self._fixture(sqlite_fk=False)
self.assertEqual(
0,
engine.execute(sql.text('pragma foreign_keys')).scalars().first(),
)
with engine.connect() as conn:
self.assertEqual(
0,
conn.execute(
sql.text('pragma foreign_keys')
).scalars().first(),
)
def test_sqlite_synchronous_listener(self):
engine = self._fixture()
# "The default setting is synchronous=FULL." (e.g. 2)
# http://www.sqlite.org/pragma.html#pragma_synchronous
self.assertEqual(
2,
engine.execute(sql.text('pragma synchronous')).scalars().first(),
)
with engine.connect() as conn:
self.assertEqual(
2,
conn.execute(sql.text('pragma synchronous')).scalars().first(),
)
engine = self._fixture(sqlite_synchronous=False)
self.assertEqual(
0,
engine.execute(sql.text('pragma synchronous')).scalars().first(),
)
with engine.connect() as conn:
self.assertEqual(
0,
conn.execute(sql.text('pragma synchronous')).scalars().first(),
)
class MysqlConnectTest(db_test_base._MySQLOpportunisticTestCase):
......@@ -512,9 +520,10 @@ class MysqlConnectTest(db_test_base._MySQLOpportunisticTestCase):
return session.create_engine(self.engine.url, mysql_sql_mode=sql_mode)
def _assert_sql_mode(self, engine, sql_mode_present, sql_mode_non_present):
mode = engine.execute(
sql.text("SHOW VARIABLES LIKE 'sql_mode'")
).fetchone()[1]
with engine.connect() as conn:
mode = conn.execute(
sql.text("SHOW VARIABLES LIKE 'sql_mode'")
).fetchone()[1]
self.assertIn(
sql_mode_present, mode
)
......@@ -538,9 +547,10 @@ class MysqlConnectTest(db_test_base._MySQLOpportunisticTestCase):
# get the GLOBAL sql_mode, not the @@SESSION, so that
# we get what is configured for the MySQL database, as opposed
# to what our own session.create_engine() has set it to.
expected = self.engine.execute(
sql.text("SELECT @@GLOBAL.sql_mode")
).scalar()
with self.engine.connect() as conn:
expected = conn.execute(
sql.text("SELECT @@GLOBAL.sql_mode")
).scalar()
engine = self._fixture(sql_mode=None)
self._assert_sql_mode(engine, expected, None)
......@@ -592,9 +602,10 @@ class MysqlConnectTest(db_test_base._MySQLOpportunisticTestCase):
engine = self._fixture(sql_mode='TRADITIONAL')
actual_mode = engine.execute(
sql.text("SHOW VARIABLES LIKE 'sql_mode'")
).fetchone()[1]
with engine.connect() as conn:
actual_mode = conn.execute(
sql.text("SHOW VARIABLES LIKE 'sql_mode'")
).fetchone()[1]
self.assertIn('MySQL server mode set to %s' % actual_mode,
log.output)
......
......@@ -699,7 +699,8 @@ class TestMigrationUtils(db_test_base._DbTestCase):
Column('updated_at', DateTime))
test_table.create(engine)
engine.execute(test_table.insert(), values)
with engine.connect() as conn:
conn.execute(test_table.insert(), values)
return test_table, values
def test_drop_old_duplicate_entries_from_table(self):
......@@ -719,10 +720,11 @@ class TestMigrationUtils(db_test_base._DbTestCase):
uniq_values.add(uniq_value)
expected_ids.append(value['id'])
real_ids = [
row[0] for row in
self.engine.execute(select(test_table.c.id)).fetchall()
]
with self.engine.connect() as conn:
real_ids = [
row[0] for row in
conn.execute(select(test_table.c.id)).fetchall()
]
self.assertEqual(len(expected_ids), len(real_ids))
for id_ in expected_ids:
......@@ -760,20 +762,21 @@ class TestMigrationUtils(db_test_base._DbTestCase):
base_select = table.select()
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [
row.id for row in self.engine.execute(rows_select).fetchall()
]
self.assertEqual(len(expected_values), len(row_ids))
for value in expected_values:
self.assertIn(value['id'], row_ids)
deleted_rows_select = base_select.where(
table.c.deleted == table.c.id)
deleted_rows_ids = [
row.id for row in
self.engine.execute(deleted_rows_select).fetchall()
]
with self.engine.connect() as conn:
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [
row.id for row in conn.execute(rows_select).fetchall()
]
self.assertEqual(len(expected_values), len(row_ids))
for value in expected_values:
self.assertIn(value['id'], row_ids)
deleted_rows_select = base_select.where(
table.c.deleted == table.c.id)
deleted_rows_ids = [
row.id for row in
conn.execute(deleted_rows_select).fetchall()
]
self.assertEqual(len(values) - len(row_ids),
len(deleted_rows_ids))
for value in soft_deleted_values:
......@@ -1649,43 +1652,49 @@ class TestDialectFunctionDispatcher(test_base.BaseTestCase):
class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
def test_all_tables_use_innodb(self):
self.engine.execute(
sql.text(
"CREATE TABLE customers "
"(a INT, b CHAR (20), INDEX (a)) ENGINE=InnoDB"))
with self.engine.connect() as conn:
conn.execute(
sql.text(
"CREATE TABLE customers "
"(a INT, b CHAR (20), INDEX (a)) ENGINE=InnoDB"))
self.assertEqual([], utils.get_non_innodb_tables(self.engine))
def test_all_tables_use_innodb_false(self):
self.engine.execute(
sql.text("CREATE TABLE employee (i INT) ENGINE=MEMORY"))
with self.engine.connect() as conn:
conn.execute(
sql.text("CREATE TABLE employee (i INT) ENGINE=MEMORY"))
self.assertEqual(['employee'],
utils.get_non_innodb_tables(self.engine))
def test_skip_tables_use_default_value(self):
self.engine.execute(
sql.text("CREATE TABLE migrate_version (i INT) ENGINE=MEMORY"))
with self.engine.connect() as conn:
conn.execute(
sql.text("CREATE TABLE migrate_version (i INT) ENGINE=MEMORY"))
self.assertEqual([],
utils.get_non_innodb_tables(self.engine))
def test_skip_tables_use_passed_value(self):
self.engine.execute(
sql.text("CREATE TABLE some_table (i INT) ENGINE=MEMORY"))
with self.engine.connect() as conn:
conn.execute(
sql.text("CREATE TABLE some_table (i INT) ENGINE=MEMORY"))
self.assertEqual([],
utils.get_non_innodb_tables(
self.engine, skip_tables=('some_table',)))
def test_skip_tables_use_empty_list(self):
self.engine.execute(
sql.text("CREATE TABLE some_table_3 (i INT) ENGINE=MEMORY"))
with self.engine.connect() as conn:
conn.execute(
sql.text("CREATE TABLE some_table_3 (i INT) ENGINE=MEMORY"))
self.assertEqual(['some_table_3'],
utils.get_non_innodb_tables(
self.engine, skip_tables=()))
def test_skip_tables_use_several_values(self):
self.engine.execute(
sql.text("CREATE TABLE some_table_1 (i INT) ENGINE=MEMORY"))
self.engine.execute(
sql.text("CREATE TABLE some_table_2 (i INT) ENGINE=MEMORY"))
with self.engine.connect() as conn:
conn.execute(
sql.text("CREATE TABLE some_table_1 (i INT) ENGINE=MEMORY"))
conn.execute(
sql.text("CREATE TABLE some_table_2 (i INT) ENGINE=MEMORY"))
self.assertEqual([],
utils.get_non_innodb_tables(
self.engine,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment