Commit eec481d7 authored by Zuul's avatar Zuul Committed by Gerrit Code Review
Browse files

Merge "Don't rely on implicit autocommit"

parents ff96dc67 b3a56b35
......@@ -518,12 +518,14 @@ class MySQLBackendImpl(BackendImpl):
def create_named_database(self, engine, ident, conditional=False):
with engine.connect() as conn:
if not conditional or not self.database_exists(conn, ident):
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
with conn.begin():
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect() as conn:
if not conditional or self.database_exists(conn, ident):
conn.exec_driver_sql("DROP DATABASE %s" % ident)
with conn.begin():
conn.exec_driver_sql("DROP DATABASE %s" % ident)
def database_exists(self, engine, ident):
s = sql.text("SHOW DATABASES LIKE :ident")
......@@ -581,18 +583,22 @@ class PostgresqlBackendImpl(BackendImpl):
def create_named_database(self, engine, ident, conditional=False):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
isolation_level="AUTOCOMMIT",
) as conn:
if not conditional or not self.database_exists(conn, ident):
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
with conn.begin():
conn.exec_driver_sql("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
isolation_level="AUTOCOMMIT",
) as conn:
self._close_out_database_users(conn, ident)
if conditional:
conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident)
else:
conn.exec_driver_sql("DROP DATABASE %s" % ident)
with conn.begin():
if conditional:
conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident)
else:
conn.exec_driver_sql("DROP DATABASE %s" % ident)
def drop_additional_objects(self, conn):
enums = [e['name'] for e in sqlalchemy.inspect(conn).get_enums()]
......
......@@ -490,7 +490,7 @@ def drop_old_duplicate_entries_from_table(engine, table_name,
func.count(table.c.id) > 1
)
with engine.connect() as conn:
with engine.connect() as conn, conn.begin():
for row in conn.execute(duplicated_rows_select).fetchall():
# NOTE(boris-42): Do not remove row that has the biggest ID.
delete_condition = table.c.id != row[0]
......@@ -571,7 +571,7 @@ def change_deleted_column_type_to_boolean(engine, table_name,
finally:
table.metadata.bind = None
with engine.connect() as conn:
with engine.connect() as conn, conn.begin():
conn.execute(
table.update().where(
table.c.deleted == table.c.id
......@@ -615,7 +615,9 @@ def _change_deleted_column_type_to_boolean_sqlite(engine, table_name,
new_table = Table(
table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create(conn)
with conn.begin():
new_table.create(conn)
indexes = []
for index in get_indexes(engine, table_name):
......@@ -631,9 +633,10 @@ def _change_deleted_column_type_to_boolean_sqlite(engine, table_name,
else:
c_select.append(table.c.deleted == table.c.id)
table.drop(conn)
for index in indexes:
index.create(conn)
with conn.begin():
table.drop(conn)
for index in indexes:
index.create(conn)
table.metadata.bind = engine
try:
......@@ -641,11 +644,12 @@ def _change_deleted_column_type_to_boolean_sqlite(engine, table_name,
finally:
table.metadata.bind = None
conn.execute(
new_table.update().where(
new_table.c.deleted == new_table.c.id
).values(deleted=True)
)
with conn.begin():
conn.execute(
new_table.update().where(
new_table.c.deleted == new_table.c.id
).values(deleted=True)
)
@debtcollector.removals.remove(
......@@ -672,17 +676,18 @@ def change_deleted_column_type_to_id_type(engine, table_name,
table.metadata.bind = engine
try:
with engine.connect() as conn:
with engine.connect() as conn, conn.begin():
deleted = True # workaround for pyflakes
conn.execute(
table.update().where(
table.c.deleted == deleted
).values(new_deleted=table.c.id)
)
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(conn, table_name, indexes)
table.c.deleted.drop()
table.c.new_deleted.alter(name="deleted")
_restore_indexes_on_deleted_columns(engine, table_name, indexes)
finally:
table.metadata.bind = None
......@@ -739,10 +744,13 @@ def _change_deleted_column_type_to_id_type_sqlite(engine, table_name,
constraints.append(constraint._copy())
with engine.connect() as conn:
new_table = Table(
table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create(conn)
# we need separate transactions, since we must create the table before
# we can copy entries into it (later)
with conn.begin():
new_table = Table(
table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create(conn)
indexes = []
for index in get_indexes(engine, table_name):
......@@ -751,30 +759,32 @@ def _change_deleted_column_type_to_id_type_sqlite(engine, table_name,
Index(index["name"], *column_names, unique=index["unique"])
)
table.drop(conn)
for index in indexes:
index.create(conn)
with conn.begin():
table.drop(conn)
for index in indexes:
index.create(conn)
new_table.metadata.bind = engine
try:
new_table.rename(table_name)
finally:
new_table.metadata.bind = None
with conn.begin():
new_table.metadata.bind = engine
try:
new_table.rename(table_name)
finally:
new_table.metadata.bind = None
deleted = True # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=new_table.c.id)
)
deleted = True # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=new_table.c.id)
)
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=default_deleted_value)
)
# NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
deleted = False # workaround for pyflakes
conn.execute(
new_table.update().where(
new_table.c.deleted == deleted
).values(deleted=default_deleted_value)
)
def get_db_connection_info(conn_pieces):
......@@ -1121,7 +1131,7 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version',
params['database'] = connectable.engine.url.database
query = text(query_str)
# TODO(stephenfin): What about if this is already a Connection?
with connectable.connect() as conn:
with connectable.connect() as conn, conn.begin():
noninnodb = conn.execute(query, params)
return [i[0] for i in noninnodb]
......@@ -1232,21 +1242,25 @@ def suspend_fk_constraints_for_col_alter(
ctx = MigrationContext.configure(conn)
op = Operations(ctx)
for fk in fks:
op.drop_constraint(
fk['name'], fk['source_table'], type_="foreignkey")
with conn.begin():
for fk in fks:
op.drop_constraint(
fk['name'], fk['source_table'], type_="foreignkey")
yield
for fk in fks:
op.create_foreign_key(
fk['name'], fk['source_table'],
fk['referred_table'],
fk['constrained_columns'],
fk['referred_columns'],
onupdate=fk['options'].get('onupdate'),
ondelete=fk['options'].get('ondelete'),
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
with conn.begin():
for fk in fks:
op.create_foreign_key(
fk['name'], fk['source_table'],
fk['referred_table'],
fk['constrained_columns'],
fk['referred_columns'],
onupdate=fk['options'].get('onupdate'),
ondelete=fk['options'].get('ondelete'),
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
class NonCommittingConnectable(object):
......
......@@ -50,11 +50,6 @@ class WarningsFixture(fixtures.Fixture):
message=r'The Session.begin.subtransactions flag is deprecated .*',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'The current statement is being autocommitted .*',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'Calling \.begin\(\) when a transaction is already .*',
......@@ -71,6 +66,12 @@ class WarningsFixture(fixtures.Fixture):
module='migrate',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'once',
message=r'The current statement is being autocommitted .*',
module='migrate',
category=sqla_exc.SADeprecationWarning)
warnings.filterwarnings(
'ignore',
message=r'The Engine.execute\(\) method is considered legacy .*',
......
......@@ -498,9 +498,15 @@ class TestReferenceErrorSQLite(
self.table_2.create(self.engine)
def test_raise(self):
with self.engine.connect() as conn:
conn.execute(sql.text("PRAGMA foreign_keys = ON"))
connection = self.engine.raw_connection()
try:
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.close()
finally:
connection.close()
with self.engine.connect() as conn:
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
......@@ -521,16 +527,24 @@ class TestReferenceErrorSQLite(
self.assertIsNone(matched.key_table)
def test_raise_delete(self):
with self.engine.connect() as conn:
conn.execute(sql.text("PRAGMA foreign_keys = ON"))
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
connection = self.engine.raw_connection()
try:
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.close()
finally:
connection.close()
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
with self.engine.connect() as conn:
with conn.begin():
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(
self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
self.assertInnerException(
matched,
......@@ -577,13 +591,17 @@ class TestReferenceErrorPostgreSQL(
def test_raise_delete(self):
with self.engine.connect() as conn:
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
with conn.begin():
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(
self.table_2.insert().values(id=4321, foo_id=1234))
with conn.begin():
matched = self.assertRaises(
exception.DBReferenceError,
conn.execute,
self.table_1.delete()
)
self.assertInnerException(
matched,
......@@ -648,7 +666,7 @@ class TestReferenceErrorMySQL(
self.assertEqual("resource_foo", matched.key_table)
def test_raise_delete(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(self.table_1.insert().values(id=1234, foo=42))
conn.execute(self.table_2.insert().values(id=4321, foo_id=1234))
matched = self.assertRaises(
......
......@@ -314,12 +314,15 @@ class MySQLModeTestCase(db_test_base._MySQLOpportunisticTestCase):
self.test_table = Table(_TABLE_NAME + "mode", meta,
Column('id', Integer, primary_key=True),
Column('bar', String(255)))
self.test_table.create(self.connection)
with self.connection.begin():
self.test_table.create(self.connection)
def cleanup():
self.test_table.drop(self.connection)
with self.connection.begin():
self.test_table.drop(self.connection)
self.connection.close()
mode_engine.dispose()
self.addCleanup(cleanup)
def _test_string_too_long(self, value):
......
......@@ -699,8 +699,9 @@ class TestMigrationUtils(db_test_base._DbTestCase):
Column('updated_at', DateTime))
test_table.create(engine)
with engine.connect() as conn:
conn.execute(test_table.insert(), values)
with engine.connect() as conn, conn.begin():
with conn.begin():
conn.execute(test_table.insert(), values)
return test_table, values
def test_drop_old_duplicate_entries_from_table(self):
......@@ -720,7 +721,7 @@ class TestMigrationUtils(db_test_base._DbTestCase):
uniq_values.add(uniq_value)
expected_ids.append(value['id'])
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
real_ids = [
row[0] for row in
conn.execute(select(test_table.c.id)).fetchall()
......@@ -762,7 +763,7 @@ class TestMigrationUtils(db_test_base._DbTestCase):
base_select = table.select()
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [
row.id for row in conn.execute(rows_select).fetchall()
......@@ -938,7 +939,7 @@ class TestMigrationUtils(db_test_base._DbTestCase):
# NOTE(zzzeek): SQLAlchemy 1.2 Boolean type will disallow non 1/0
# value here, 1.1 also coerces to "1/0" so use raw SQL to test the
# constraint
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.exec_driver_sql(
"INSERT INTO abc (deleted) VALUES (?)",
(10, ),
......@@ -1652,7 +1653,7 @@ class TestDialectFunctionDispatcher(test_base.BaseTestCase):
class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
def test_all_tables_use_innodb(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text(
"CREATE TABLE customers "
......@@ -1660,21 +1661,23 @@ class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
self.assertEqual([], utils.get_non_innodb_tables(self.engine))
def test_all_tables_use_innodb_false(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE employee (i INT) ENGINE=MEMORY"))
sql.text("CREATE TABLE employee (i INT) ENGINE=MEMORY")
)
self.assertEqual(['employee'],
utils.get_non_innodb_tables(self.engine))
def test_skip_tables_use_default_value(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE migrate_version (i INT) ENGINE=MEMORY"))
sql.text("CREATE TABLE migrate_version (i INT) ENGINE=MEMORY")
)
self.assertEqual([],
utils.get_non_innodb_tables(self.engine))
def test_skip_tables_use_passed_value(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE some_table (i INT) ENGINE=MEMORY"))
self.assertEqual([],
......@@ -1682,7 +1685,7 @@ class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
self.engine, skip_tables=('some_table',)))
def test_skip_tables_use_empty_list(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE some_table_3 (i INT) ENGINE=MEMORY"))
self.assertEqual(['some_table_3'],
......@@ -1690,7 +1693,7 @@ class TestGetInnoDBTables(db_test_base._MySQLOpportunisticTestCase):
self.engine, skip_tables=()))
def test_skip_tables_use_several_values(self):
with self.engine.connect() as conn:
with self.engine.connect() as conn, conn.begin():
conn.execute(
sql.text("CREATE TABLE some_table_1 (i INT) ENGINE=MEMORY"))
conn.execute(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment