Commit 2da7abce authored by Marco Nenciarini's avatar Marco Nenciarini

New upstream version 2.2

parent 7fc9c86d
Barman Core Team (in alphabetical order):
* Gabriele Bartolini <gabriele.bartolini@2ndquadrant.it> (project leader)
* Gabriele Bartolini <gabriele.bartolini@2ndquadrant.it> (architect)
* Jonathan Battiato <jonathan.battiato@2ndquadrant.it> (QA/testing)
* Stefano Bianucci (developer, intern from University of Florence)
* Giuseppe Broccolo <giuseppe.broccolo@2ndquadrant.it> (QA/testing)
* Giulio Calacoci <giulio.calacoci@2ndquadrant.it> (developer)
* Francesco Canovai <francesco.canovai@2ndquadrant.it> (QA/testing)
* Leonardo Cecchi <leonardo.cecchi@2ndquadrant.it> (developer)
* Gianni Ciolli <gianni.ciolli@2ndquadrant.it> (QA/testing)
* Britt Cole <britt.cole@2ndquadrant.it> (documentation)
* Marco Nenciarini <marco.nenciarini@2ndquadrant.it> (lead developer)
* Britt Cole <britt.cole@2ndquadrant.com> (documentation)
* Marco Nenciarini <marco.nenciarini@2ndquadrant.it> (project leader)
* Rubens Souza <rubens.souza@2ndquadrant.it> (QA/testing)
Past contributors:
* Carlo Ascani
* Stefano Bianucci
* Giuseppe Broccolo
Many thanks go to our sponsors (in alphabetical order):
......
This diff is collapsed.
Barman INSTALL instructions
Copyright (C) 2011-2016 2ndQuadrant Italia Srl
Copyright (C) 2011-2017 2ndQuadrant Limited
For further information, see the "Installation" section in the
official manual of Barman or the Markdown source file:
......
Barman News - History of user-visible changes
Copyright (C) 2011-2016 2ndQuadrant Italia Srl
Copyright (C) 2011-2017 2ndQuadrant Limited
Version 2.2 - 17 Jul 2017
- Implement parallel copy for backup/recovery through the
parallel_jobs global/server option to be overridden by the --jobs or
-j runtime option for the backup and recover command. Parallel
backup is available only for the rsync copy method. By default, it
is set to 1 (for behaviour compatibility with previous versions).
- Support custom WAL size for PostgreSQL 8.4 and newer. At backup
time, Barman retrieves from PostgreSQL wal_segment_size and
wal_block_size values and computes the necessary calculations.
- Improve check command to ensure that incoming directory is empty
when archiver=off, and streaming directory is empty when
streaming_archiver=off (#80).
- Add external_configuration to backup_options so that users can
instruct Barman to ignore backup of configuration files when they
are not inside PGDATA (default for Debian/Ubuntu installations). In
this case, Barman does not display a warning anymore.
- Add --get-wal and --no-get-wal options to barman recover
- Add max_incoming_wals_queue global/server option for the check
command so that a non blocking error is returned in case incoming
WAL directories for both archiver and the streaming_archiver contain
more files than the specified value.
- Documentation improvements
- File format changes:
- The format of backup.info file has changed. For this reason a
backup taken with Barman 2.2 cannot be read by a previous
version of Barman. But, backups taken by previous versions can
be read by Barman 2.2.
- Minor bug fixes:
- Allow replication-status to work against a standby
- Close any PostgreSQL connection before starting pg_basebackup
(#104, #108)
- Safely handle paths containing special characters
- Archive .partial files after promotion of streaming source
- Recursively create directories during recovery (SF#44)
- Improve xlog.db locking (#99)
- Remove tablespace_map file during recover (#95)
- Reconnect to PostgreSQL if connection drops (SF#82)
Version 2.1 - 5 Jan 2017
......
Metadata-Version: 1.1
Name: barman
Version: 2.1
Version: 2.2
Summary: Backup and Recovery Manager for PostgreSQL
Home-page: http://www.pgbarman.org/
Author: 2ndQuadrant Italia Srl
Author-email: info@2ndquadrant.it
Author: 2ndQuadrant Limited
Author-email: info@2ndquadrant.com
License: GPL-3.0
Description: Barman (Backup and Recovery Manager) is an open-source administration
tool for disaster recovery of PostgreSQL servers written in Python.
......
......@@ -37,14 +37,14 @@ Web resources
- Man page, section 1 : http://docs.pgbarman.org/barman.1.html
- Man page, section 5 : http://docs.pgbarman.org/barman.5.html
- Community support : http://www.pgbarman.org/support/
- Professional support : http://www.2ndquadrant.com/
- Professional support : https://www.2ndquadrant.com/
- Client utilities : https://github.com/2ndquadrant-it/barman-cli
- pgespresso extension : https://github.com/2ndquadrant-it/pgespresso
Licence
-------
Copyright (C) 2011-2016 2ndQuadrant Italia Srl
Copyright (C) 2011-2017 2ndQuadrant Limited
Barman is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
......
Metadata-Version: 1.1
Name: barman
Version: 2.1
Version: 2.2
Summary: Backup and Recovery Manager for PostgreSQL
Home-page: http://www.pgbarman.org/
Author: 2ndQuadrant Italia Srl
Author-email: info@2ndquadrant.it
Author: 2ndQuadrant Limited
Author-email: info@2ndquadrant.com
License: GPL-3.0
Description: Barman (Backup and Recovery Manager) is an open-source administration
tool for disaster recovery of PostgreSQL servers written in Python.
......
......@@ -2,4 +2,3 @@ psycopg2 >= 2.4.2
argh >= 0.21.2, <= 0.26.2
python-dateutil
argcomplete
argparse
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -389,7 +389,11 @@ class BackupManager(RemoteStatusMixin):
backup_info.end_xlog,
backup_info.end_wal,
backup_info.end_offset)
output.info("Backup completed")
output.info("Backup completed (start time: %s, elapsed time: %s)",
self.executor.copy_start_time,
human_readable_timedelta(
self.executor.copy_end_time
- self.executor.copy_start_time))
# Create a restore point after a backup
target_name = 'barman_%s' % backup_info.backup_id
self.server.postgres.create_restore_point(target_name)
......@@ -463,9 +467,8 @@ class BackupManager(RemoteStatusMixin):
:param bool verbose: report even if no actions
"""
with self.server.xlogdb('a') as fxlogdb:
for archiver in self.server.archivers:
archiver.archive(fxlogdb, verbose)
for archiver in self.server.archivers:
archiver.archive(verbose)
def cron_retention_policy(self):
"""
......@@ -545,10 +548,10 @@ class BackupManager(RemoteStatusMixin):
:param CheckStrategy check_strategy: the strategy for the management
of the results of the various checks
"""
check_strategy.init_check('compression settings')
# Check compression_setting parameter
if self.config.compression and not self.compression_manager.check():
check_strategy.result(self.config.name,
'compression settings', False)
check_strategy.result(self.config.name, False)
else:
status = True
try:
......@@ -557,19 +560,18 @@ class BackupManager(RemoteStatusMixin):
check_strategy.result(self.config.name,
'%s setting' % field, False)
status = False
check_strategy.result(self.config.name,
'compression settings', status)
check_strategy.result(self.config.name, status)
# Failed backups check
check_strategy.init_check('failed backups')
failed_backups = self.get_available_backups((BackupInfo.FAILED,))
status = len(failed_backups) == 0
check_strategy.result(
self.config.name,
'failed backups',
status,
'there are %s failed backups' % (len(failed_backups,))
hint='there are %s failed backups' % (len(failed_backups,))
)
check_strategy.init_check('minimum redundancy requirements')
# Minimum redundancy checks
no_backups = len(self.get_available_backups())
# Check minimum_redundancy_requirements parameter
......@@ -578,9 +580,8 @@ class BackupManager(RemoteStatusMixin):
else:
status = True
check_strategy.result(
self.config.name,
'minimum redundancy requirements', status,
'have %s backups, expected at least %s' % (
self.config.name, status,
hint='have %s backups, expected at least %s' % (
no_backups, self.config.minimum_redundancy))
# TODO: Add a check for the existence of ssh and of rsync
......@@ -675,6 +676,12 @@ class BackupManager(RemoteStatusMixin):
wal_count += 1
elif xlog.is_backup_file(fullname):
label_count += 1
elif fullname.endswith('.tmp'):
_logger.warning(
'temporary file found '
'rebuilding the wal database: %s',
fullname)
continue
else:
_logger.warning(
'unexpected file '
......
This diff is collapsed.
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -30,6 +30,7 @@ from argh import ArghParser, arg, expects_obj, named
import barman.config
import barman.diagnose
from barman import output
from barman.config import RecoveryOptions
from barman.exceptions import BadXlogSegmentName
from barman.infofile import BackupInfo
from barman.server import Server
......@@ -183,6 +184,9 @@ def backup_completer(prefix, parsed_args, **kwargs):
type=check_non_negative)
@arg('--no-retry', help='Disable base backup copy retry logic.',
dest='retry_times', action='store_const', const=0)
@arg('--jobs', '-j',
help='Run the copy in parallel using NJOBS processes.',
type=check_positive, metavar='NJOBS')
@expects_obj
def backup(args):
"""
......@@ -204,6 +208,8 @@ def backup(args):
server.config.basebackup_retry_times = args.retry_times
if hasattr(args, 'immediate_checkpoint'):
server.config.immediate_checkpoint = args.immediate_checkpoint
if args.jobs is not None:
server.config.parallel_jobs = args.jobs
with closing(server):
server.backup()
output.close_and_exit()
......@@ -345,6 +351,19 @@ def rebuild_xlogdb(args):
type=check_non_negative)
@arg('--no-retry', help='Disable base backup copy retry logic.',
dest='retry_times', action='store_const', const=0)
@arg('--jobs', '-j',
help='Run the copy in parallel using NJOBS processes.',
type=check_positive, metavar='NJOBS')
@arg('--get-wal',
help='Enable the get-wal option during the recovery.',
dest='get_wal',
action='store_true',
default=SUPPRESS)
@arg('--no-get-wal',
help='Disable the get-wal option during recovery.',
dest='get_wal',
action='store_false',
default=SUPPRESS)
@expects_obj
def recover(args):
"""
......@@ -399,6 +418,13 @@ def recover(args):
server.config.basebackup_retry_sleep = args.retry_sleep
if args.retry_times is not None:
server.config.basebackup_retry_times = args.retry_times
if hasattr(args, 'get_wal'):
if args.get_wal:
server.config.recovery_options.add(RecoveryOptions.GET_WAL)
else:
server.config.recovery_options.remove(RecoveryOptions.GET_WAL)
if args.jobs is not None:
server.config.parallel_jobs = args.jobs
with closing(server):
server.recover(backup_id,
args.destination_directory,
......
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -113,7 +113,7 @@ class Command(object):
The subprocess output and error stream will be processed through
the output and error handler, respectively defined through the
`out_handler` and `err_handler` arguments. If not provided every line
will be sent to teh log respectively at INFO and WARNING level.
will be sent to the log respectively at INFO and WARNING level.
The `out_handler` and the `err_handler` functions will be invoked with
one single argument, which is a string containing the line that is
......@@ -219,18 +219,6 @@ class Command(object):
"""restore default signal handler (http://bugs.python.org/issue1652)"""
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # pragma: no cover
@staticmethod
def _cmd_quote(cmd, args):
"""
Quote all cmd's arguments.
This is needed to avoid command string breaking.
WARNING: this function does not protect against injection.
"""
if args is not None and len(args) > 0:
cmd = "%s '%s'" % (cmd, "' '".join(args))
return cmd
def __call__(self, *args, **kwargs):
"""
Run the command and return the exit code.
......@@ -377,7 +365,7 @@ class Command(object):
The subprocess output and error stream will be processed through
the output and error handler, respectively defined through the
`out_handler` and `err_handler` arguments. If not provided every line
will be sent to teh log respectively at INFO and WARNING level.
will be sent to the log respectively at INFO and WARNING level.
If the `close_fds` argument is True, all file descriptors
except 0, 1 and 2 will be closed before the child process is executed.
......@@ -458,7 +446,7 @@ class Command(object):
args = self.args + list(args)
# If shell is True, properly quote the command
if self.shell:
cmd = self._cmd_quote(self.cmd, args)
cmd = full_command_quote(self.cmd, args)
else:
cmd = [self.cmd] + args
......@@ -605,7 +593,7 @@ class Rsync(Command):
"""
options = []
if ssh:
options += ['-e', self._cmd_quote(ssh, ssh_options)]
options += ['-e', full_command_quote(ssh, ssh_options)]
if network_compression:
options += ['-z']
# Include patterns must be before the exclude ones, because the exclude
......@@ -936,3 +924,51 @@ class BarmanSubProcess(object):
stdin=devnull, stdout=devnull, stderr=devnull)
_logger.debug("BarmanSubProcess: subprocess started. pid: %s",
proc.pid)
def shell_quote(arg):
"""
Quote a string argument to be safely included in a shell command line.
:param str arg: The script argument
:return: The argument quoted
"""
# This is an excerpt of the Bash manual page, and the same applies for
# every Posix compliant shell:
#
# A non-quoted backslash (\) is the escape character. It preserves
# the literal value of the next character that follows, with the
# exception of <newline>. If a \<newline> pair appears, and the
# backslash is not itself quoted, the \<newline> is treated as a
# line continuation (that is, it is removed from the input
# stream and effectively ignored).
#
# Enclosing characters in single quotes preserves the literal value
# of each character within the quotes. A single quote may not occur
# between single quotes, even when pre-ceded by a backslash.
#
# This means that, as long as the original string doesn't contain any
# apostrophe character, it can be safely included between single quotes.
#
# If a single quote is contained in the string, we must terminate the
# string with a quote, insert an apostrophe character escaping it with
# a backslash, and then start another string using a quote character.
assert arg is not None
return "'%s'" % arg.replace("'", "'\\''")
def full_command_quote(command, args=None):
"""
Produce a command with quoted arguments
:param str command: the command to be executed
:param list[str] args: the command arguments
:rtype: str
"""
if args is not None and len(args) > 0:
return "%s %s" % (
command, ' '.join([shell_quote(arg) for arg in args]))
else:
return command
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -142,28 +142,15 @@ class BackupOptions(CsvOption):
# constants containing labels for allowed values
EXCLUSIVE_BACKUP = 'exclusive_backup'
CONCURRENT_BACKUP = 'concurrent_backup'
EXTERNAL_CONFIGURATION = 'external_configuration'
# list holding all the allowed values for the BackupOption class
value_list = [EXCLUSIVE_BACKUP, CONCURRENT_BACKUP]
value_list = [EXCLUSIVE_BACKUP, CONCURRENT_BACKUP, EXTERNAL_CONFIGURATION]
# map holding all the possible conflicts between the allowed values
conflicts = {
EXCLUSIVE_BACKUP: CONCURRENT_BACKUP,
CONCURRENT_BACKUP: EXCLUSIVE_BACKUP, }
def validate(self, key, source):
"""
Validates backup_option values: currently it makes sure
that either exclusive_backup or concurrent_backup are set.
"""
if len(self) == 0:
return
if self.CONCURRENT_BACKUP not in self \
and self.EXCLUSIVE_BACKUP not in self:
raise ValueError("Invalid configuration value for "
"key %s in %s: it must contain either "
"exclusive_backup or concurrent_backup option"
% (key, source))
class RecoveryOptions(CsvOption):
"""
......@@ -287,8 +274,10 @@ class ServerConfig(object):
'immediate_checkpoint',
'incoming_wals_directory',
'last_backup_maximum_age',
'max_incoming_wals_queue',
'minimum_redundancy',
'network_compression',
'parallel_jobs',
'path_prefix',
'post_archive_retry_script',
'post_archive_script',
......@@ -330,8 +319,10 @@ class ServerConfig(object):
'custom_decompression_filter',
'immediate_checkpoint',
'last_backup_maximum_age',
'max_incoming_wals_queue',
'minimum_redundancy',
'network_compression',
'parallel_jobs',
'path_prefix',
'post_archive_retry_script',
'post_archive_script',
......@@ -371,6 +362,7 @@ class ServerConfig(object):
'incoming_wals_directory': '%(backup_directory)s/incoming',
'minimum_redundancy': '0',
'network_compression': 'false',
'parallel_jobs': '1',
'recovery_options': '',
'retention_policy_mode': 'auto',
'streaming_archiver': 'off',
......@@ -399,7 +391,9 @@ class ServerConfig(object):
'disabled': parse_boolean,
'immediate_checkpoint': parse_boolean,
'last_backup_maximum_age': parse_time_interval,
'max_incoming_wals_queue': int,
'network_compression': parse_boolean,
'parallel_jobs': int,
'recovery_options': RecoveryOptions,
'reuse_backup': parse_reuse_backup,
'streaming_archiver': parse_boolean,
......
This diff is collapsed.
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......
This diff is collapsed.
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......
# Copyright (C) 2013-2016 2ndQuadrant Italia Srl
# Copyright (C) 2013-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -443,6 +443,9 @@ class BackupInfo(FieldListFile):
included_files = Field('included_files',
load=ast.literal_eval, dump=null_repr)
backup_label = Field('backup_label', load=ast.literal_eval, dump=null_repr)
copy_stats = Field('copy_stats', load=ast.literal_eval, dump=null_repr)
xlog_segment_size = Field('xlog_segment_size', load=int,
default=xlog.DEFAULT_XLOG_SEG_SIZE)
__slots__ = ('server', 'config', 'backup_manager',
'backup_id', 'backup_version')
......@@ -503,8 +506,10 @@ class BackupInfo(FieldListFile):
"""
Get the list of required WAL segments for the current backup
"""
return xlog.generate_segment_names(self.begin_wal, self.end_wal,
self.version)
return xlog.generate_segment_names(
self.begin_wal, self.end_wal,
self.version,
self.xlog_segment_size)
def get_list_of_files(self, target):
"""
......@@ -639,7 +644,9 @@ class BackupInfo(FieldListFile):
"""
result = dict(self.items())
result.update(backup_id=self.backup_id, server_name=self.server_name,
mode=self.mode, tablespaces=self.tablespaces)
mode=self.mode, tablespaces=self.tablespaces,
included_files=self.included_files,
copy_stats=self.copy_stats)
return result
def to_json(self):
......
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......
# Copyright (C) 2013-2016 2ndQuadrant Italia Srl
# Copyright (C) 2013-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -21,12 +21,13 @@ This module control how the output of Barman will be rendered
from __future__ import print_function
import datetime
import inspect
import logging
import sys
from barman.infofile import BackupInfo
from barman.utils import pretty_size
from barman.utils import human_readable_timedelta, pretty_size
from barman.xlog import diff_lsn
__all__ = [
......@@ -611,6 +612,25 @@ class ConsoleOutputWriter(object):
self.info(" Begin time : %s",
data['begin_time'])
self.info(" End time : %s", data['end_time'])
# If copy statistics are available print a summary
copy_stats = data.get('copy_stats')
if copy_stats:
copy_time = copy_stats.get('copy_time')
if copy_time:
value = human_readable_timedelta(
datetime.timedelta(seconds=copy_time))
# Show analysis time if it is more than a second
analysis_time = copy_stats.get('analysis_time')
if analysis_time is not None and analysis_time >= 1:
value += " + %s startup" % (human_readable_timedelta(
datetime.timedelta(seconds=analysis_time)))
self.info(" Copy time : %s", value)
size = data['deduplicated_size'] or data['size']
value = "%s/s" % pretty_size(size/copy_time)
number_of_workers = copy_stats.get('number_of_workers', 1)
if number_of_workers > 1:
value += " (%s jobs)" % number_of_workers
self.info(" Estimated throughput : %s", value)
self.info(" Begin Offset : %s",
data['begin_offset'])
self.info(" End Offset : %s",
......@@ -651,17 +671,16 @@ class ConsoleOutputWriter(object):
self.info(" Next Backup : %s",
data.setdefault('next_backup_id', 'not available') or
'- (this is the latest base backup)')
if data['children_timelines']:
self.info("")
self.info(
"WARNING: WAL information is inaccurate due to "
"multiple timelines interacting with this backup")
else:
if data['error']:
self.info(" Error: : %s",
data['error'])
if data['children_timelines']:
self.info("")
self.info(
"WARNING: WAL information is inaccurate due to "
"multiple timelines interacting with this backup")
def init_status(self, server_name):
"""
Init the status command
......
# Copyright (C) 2011-2016 2ndQuadrant Italia Srl
# Copyright (C) 2011-2017 2ndQuadrant Limited
#
# This file is part of Barman.
#
......@@ -42,6 +42,7 @@ from barman.exceptions import (ConninfoException, PostgresAppNameError,
from barman.infofile import Tablespace
from barman.remote_status import RemoteStatusMixin
from barman.utils import simplify_version, with_metaclass
from barman.xlog import DEFAULT_XLOG_SEG_SIZE
# This is necessary because the CONFIGURATION_LIMIT_EXCEEDED constant
# has been added in psycopg2 2.5, but Barman supports version 2.4.2+ so
......@@ -79,6 +80,8 @@ class PostgreSQL(with_metaclass(ABCMeta, RemoteStatusMixin)):
This abstract class represents a generic interface to a PostgreSQL server.
"""
CHECK_QUERY = 'SELECT 1'
def __init__(self, config, conninfo):
"""
Abstract base class constructor for PostgreSQL interface.
......@@ -91,6 +94,7 @@ class PostgreSQL(with_metaclass(ABCMeta, RemoteStatusMixin)):
self.config = config
self.conninfo = conninfo
self._conn = None
self.allow_reconnect = True
# Build a dictionary with connection info parameters
# This is mainly used to speed up search in conninfo
try:
......@@ -144,27 +148,59 @@ class PostgreSQL(with_metaclass(ABCMeta, RemoteStatusMixin)):
"""
Generic function for Postgres connection (using psycopg2)
"""
if not self._conn:
if not self._check_connection():
try:
self._conn = psycopg2.connect(self.conninfo)
# If psycopg2 fails to connect to the host,
# raise the appropriate exception
except psycopg2.DatabaseError as e:
raise PostgresConnectionError(str(e).strip())
# Register the connection to the live connections list
# Register the connection to the list of live connections
_live_connections.append(self)
return self._conn
def _check_connection(self):
"""
Return false if the connection is broken
:rtype: bool
"""
# If the connection is not present return False
if not self._conn:
return False
# Check if the connection works by running 'SELECT 1'
cursor = None
try:
cursor = self._conn.cursor()
cursor.execute(self.CHECK_QUERY)
except psycopg2.DatabaseError:
# Connection is broken, so we need to reconnect
self.close()
# Raise an error if reconnect is not allowed
if not self.allow_reconnect:
raise PostgresConnectionError(
"Connection lost, reconnection not allowed")
return False
finally:
if cursor:
cursor.close()
return True
def close(self):
"""
Close the connection to PostgreSQL
"""
if self._conn:
if self._conn.status == STATUS_IN_TRANSACTION:
self._conn.rollback()
self._conn.close()
self._conn = None
# If the connection is still alive, rollback and close it
if not self._conn.closed:
if self._conn.status == STATUS_IN_TRANSACTION:
self._conn.rollback()
self._conn.close()
# Remove the connection from the live connections list
self._conn = None
_live_connections.remove(self)
def _cursor(self, *args, **kwargs):
......@@ -224,6 +260,8 @@ class StreamingConnection(PostgreSQL):
This class represents a streaming connection to a PostgreSQL server.
"""
CHECK_QUERY = 'IDENTIFY_SYSTEM'
def __init__(self, config):
"""
Streaming connection constructor
......@@ -252,10 +290,12 @@ class StreamingConnection(PostgreSQL):
:returns: the connection to the server
"""
if not self._conn:
# Build a connection and set autocommit