Commit b3a56b35 authored by Stephen Finucane's avatar Stephen Finucane
Browse files

Don't rely on implicit autocommit



Resolve the following RemovedIn20Warning warning:

  The current statement is being autocommitted using implicit
  autocommit, which will be removed in SQLAlchemy 2.0. Use the .begin()
  method of Engine or Connection in order to use an explicit transaction
  for DML and DDL statements.

Change-Id: Ib789cd4d11a3d5dd01fcdb99822025b11bbc234e
Signed-off-by: default avatarStephen Finucane <stephenfin@redhat.com>
parent df901a1c
......@@ -518,12 +518,14 @@ class MySQLBackendImpl(BackendImpl):
def create_named_database(self, engine, ident, conditional=False):
with engine.connect() as conn:
if not conditional or not self.database_exists(conn, ident):
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
with conn.begin():
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect() as conn:
if not conditional or self.database_exists(conn, ident):
conn.exec_driver_sql("DROP DATABASE %s" % ident)
with conn.begin():
conn.exec_driver_sql("DROP DATABASE %s" % ident)
def database_exists(self, engine, ident):
s = sql.text("SHOW DATABASES LIKE :ident")
......@@ -582,18 +584,22 @@ class PostgresqlBackendImpl(BackendImpl):
def create_named_database(self, engine, ident, conditional=False):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
isolation_level="AUTOCOMMIT",
) as conn:
if not conditional or not self.database_exists(conn, ident):
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
with conn.begin():
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
isolation_level="AUTOCOMMIT",
) as conn:
self._close_out_database_users(conn, ident)
if conditional:
conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident)
else:
conn.exec_driver_sql("DROP DATABASE %s" % ident)
with conn.begin():
if conditional:
conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident)
else:
conn.exec_driver_sql("DROP DATABASE %s" % ident)
def drop_additional_objects(self, conn):
enums = [e['name'] for e in sqlalchemy.inspect(conn).get_enums()]
......
......@@ -490,7 +490,7 @@ def drop_old_duplicate_entries_from_table(engine, table_name,
func.count(table.c.id) > 1
)
with engine.connect() as conn:
with engine.connect() as conn, conn.begin():
for row in conn.execute(duplicated_rows_select).fetchall():
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
......@@ -571,7 +571,7 @@ def change_deleted_column_type_to_boolean(engine, table_name,
finally:
table.metadata.bind = None
with engine.connect() as conn:
with engine.connect() as conn, conn.begin():
conn.execute(
table.update().where(
table.c.deleted == table.c.id
......@@ -615,7 +615,9 @@ def _change_deleted_column_type_to_boolean_sqlite(engine, table_name,
new_table = Table(
table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create(conn)
with conn.begin():
new_table.create(conn)
indexes = []
for index in get_indexes(engine, table_name):
......@@ -631,9 +633,10 @@ def _change_deleted_column_type_to_boolean_sqlite(engine, table_name,
else:
c_select.append(table.c.deleted == table.c.id)
table.drop(conn)
for index in indexes:
index.create(conn)
with conn.begin():
table.drop(conn)
for index in indexes:
index.create(conn)
table.metadata.bind = engine
try:
......@@ -641,11 +644,12 @@ def _change_deleted_column_type_to_boolean_sqlite(engine, table_name,
finally:
table.metadata.bind = None
conn.execute(
new_table.update().where(
new_table.c.deleted == new_table.c.id
).values(deleted=True)
)
with conn.begin():
conn.execute(
new_table.update().where(
new_table.c.deleted == new_table.c.id
).values(deleted=True)
)
@debtcollector.removals.remove(
......@@ -672,17 +676,18 @@ def change_deleted_column_type_to_id_type(engine, table_name,
table.metadata.bind = engine
try:
with engine.connect() as conn:
with engine.connect() as conn, conn.begin():
deleted = True # workaround for pyflakes
conn.execute(
table.update().where(
table.c.deleted == deleted
).values(new_deleted=table.c.id)
)
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(conn, table_name, indexes)
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(engine, table_name, indexes)
finally:
table.metadata.bind = None
......@@ -739,10 +744,13 @@ def _change_deleted_column_type_to_id_type_sqlite(engine, table_name,
constraints.append(constraint._copy())
with engine.connect() as conn:
new_table = Table(
table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create(conn)
# we need separate transactions, since we must create the table before
# we can copy entries into it (later)
with conn.begin():
new_table = Table(
table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create(conn)
indexes = []
for index in get_indexes(engine, table_name):
......@@ -751,30 +759,32 @@ def _change_deleted_column_type_to_id_type_sqlite(engine, table_name,
Index(index["name"], *column_names, unique=index["unique"])
)
table.drop(conn)
for index in indexes:
index.create(conn)
with conn.begin():
table.drop(conn)
for index in indexes:
index.create(conn)
new_table.metadata.bind = engine
try:
new_table.rename(table_name)
finally:
new_table.metadata.bind = None
with conn.begin():
new_table.metadata.bind = engine
try:
new_table.rename(table_name)
finally:
new_table.metadata.bind = None
deleted = True # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=new_table.c.id)
)
deleted = True # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=new_table.c.id)
)
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=default_deleted_value)
)
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=default_deleted_value)
)
def get_db_connection_info(conn_pieces):
......@@ -1121,7 +1131,7 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version',
params['database'] = connectable.engine.url.database
query = text(query_str)
# TODO(stephenfin): What about if this is already a Connection?
with connectable.connect() as conn:
with connectable.connect() as conn, conn.begin():
noninnodb = conn.execute(query, params)
return [i[0] for i in noninnodb]
......@@ -1232,21 +1242,25 @@ def suspend_fk_constraints_for_col_alter(
ctx = MigrationContext.configure(conn)
op = Operations(ctx)
for fk in fks:
op.drop_constraint(
fk['name'], fk['source_table'], type_="foreignkey")
with conn.begin():
for fk in fks:
op.drop_constraint(
fk['name'], fk['source_table'], type_="foreignkey")
yield
for fk in fks:
op.create_foreign_key(
fk['name'], fk['source_table'],
fk['referred_table'],
fk['constrained_columns'],
fk['referred_columns'],
onupdate=fk['options'].get('onupdate'),
ondelete=fk['options'].get('ondelete'),
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
with conn.begin():
for fk in fks:
op.create_foreign_key(
fk['name'], fk['source_table'],
fk['referred_table'],
fk['constrained_columns'],
fk['referred_columns'],
onupdate=fk['options'].get('onupdate'),
ondelete=fk['options'].get('ondelete'),
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
class NonCommittingConnectable(object):
......
......@@ -47,11 +47,6 @@ class WarningsFixture(fixtures.Fixture):
message=r'The Session.begin.subtransactions flag is deprecated .*',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'The current statement is being autocommitted .*',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'Calling \.begin\(\) when a transaction is already .*',
......@@ -68,6 +63,12 @@ class WarningsFixture(fixtures.Fixture):
module='migrate',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'The current statement is being autocommitted .*',
module='migrate',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'ignore',
message=r'The Engine.execute\(\) method is considered legacy .*',
......
......@@ -498,9 +498,15 @@ class TestReferenceErrorSQLite(
self.table_2.create(self.engine)
def test_raise(self):
with self.engine.connect() as conn:
conn.execute(sql.text("PRAGMA foreign_keys = ON"))
connection = self.engine.raw_connection()
try:
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.close()
finally:
connection.close()
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
......@@ -521,16 +527,24 @@ class TestReferenceErrorSQLite(
self.assertIsNone(matched.key_table)
def test_raise_delete(self):
with self.engine.connect() as conn:
conn.execute(sql.text("PRAGMA foreign_keys = ON"))
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
connection = self.engine.raw_connection()
try:
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.close()
finally:
connection.close()
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
with self.engine.connect() as conn:
with conn.begin():
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(
self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
self.assertInnerException(
matched,
......@@ -577,13 +591,17 @@ class TestReferenceErrorPostgreSQL(
def test_raise_delete(self):
with self.engine.connect() as conn:
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
with conn.begin():
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(
self.table_2.insert().values(id=4321, foo_id=1234))
with conn.begin():
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
self.assertInnerException(
matched,
......@@ -648,7 +666,7 @@ class TestReferenceErrorMySQL(
self.assertEqual("resource_foo", matched.key_table)
def test_raise_delete(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
......
......@@ -314,12 +314,15 @@ class MySQLModeTestCase(db_test_base._MySQLOpportunisticTestCase):
self.test_table = Table(_TABLE_NAME + "mode", meta,
Column('id', Integer, primary_key=True),
Column('bar', String(255)))
self.test_table.create(self.connection)
with self.connection.begin():
self.test_table.create(self.connection)
def cleanup():
self.test_table.drop(self.connection)
with self.connection.begin():
self.test_table.drop(self.connection)
self.connection.close()
mode_engine.dispose()
self.addCleanup(cleanup)
def _test_string_too_long(self, value):
......
......@@ -699,8 +699,9 @@ class TestMigrationUtils(db_test_base._DbTestCase):
Column('updated_at', DateTime))
test_table.create(engine)
with engine.connect() as conn:
conn.execute(test_table.insert(), values)
with engine.connect() as conn, conn.begin():
with conn.begin():
conn.execute(test_table.insert(), values)
return test_table, values
def test_drop_old_duplicate_entries_from_table(self):
......@@ -720,7 +721,7 @@ class TestMigrationUtils(db_test_base._DbTestCase):
uniq_values.add(uniq_value)
expected_ids.append(value['id'])
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
real_ids = [
row[0] for row in
conn.execute(select(test_table.c.id)).fetchall()
......@@ -762,7 +763,7 @@ class TestMigrationUtils(db_test_base._DbTestCase):
base_select = table.select()
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [
row.id for row in conn.execute(rows_select).fetchall()
......@@ -938,7 +939,7 @@ class TestMigrationUtils(db_test_base._DbTestCase):
# NOTE(zzzeek): SQLAlchemy 1.2 Boolean type will disallow non 1/0
# value here, 1.1 also coerces to "1/0" so use raw SQL to test the
# constraint
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.exec_driver_sql(
"INSERT INTO abc (deleted) VALUES (?)",
(10, ),
......@@ -1652,7 +1653,7 @@ class TestDialectFunctionDispatcher(test_base.BaseTestCase):
class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
def test_all_tables_use_innodb(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text(
"CREATE TABLE customers "
......@@ -1660,21 +1661,23 @@ class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
self.assertEqual([], utils.get_non_innodb_tables(self.engine))
def test_all_tables_use_innodb_false(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE employee (i INT) ENGINE=MEMORY"))
sql.text("CREATE TABLE employee (i INT) ENGINE=MEMORY")
)
self.assertEqual(['employee'],
utils.get_non_innodb_tables(self.engine))
def test_skip_tables_use_default_value(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE migrate_version (i INT) ENGINE=MEMORY"))
sql.text("CREATE TABLE migrate_version (i INT) ENGINE=MEMORY")
)
self.assertEqual([],
utils.get_non_innodb_tables(self.engine))
def test_skip_tables_use_passed_value(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE some_table (i INT) ENGINE=MEMORY"))
self.assertEqual([],
......@@ -1682,7 +1685,7 @@ class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
self.engine, skip_tables=('some_table',)))
def test_skip_tables_use_empty_list(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE some_table_3 (i INT) ENGINE=MEMORY"))
self.assertEqual(['some_table_3'],
......@@ -1690,7 +1693,7 @@ class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
self.engine, skip_tables=()))
def test_skip_tables_use_several_values(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE some_table_1 (i INT) ENGINE=MEMORY"))
conn.execute(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment