Source code for tkp.db.database
import exceptions
import logging
import numpy
import tkp.config
from tkp.utility import substitute_inf
logger = logging.getLogger(__name__)
# The version of the TKP DB schema which is assumed by the current tree.
# Increment whenever the schema changes.
DB_VERSION = 34
[docs]class DBExceptions(object):
"""
This provides an engine-agnostic wrapper around the exceptions that can
the thrown by the database layer: we can refer to eg
DBExcetions(engine).Error rather than <engine specific module>.Error.
We handle both the PEP-0249 exceptions as provided by the DB engine, and
add our own as necessary.
"""
def __init__(self, engine):
# RhombusError refers to unhandled source layout, See issue 4778:
# https://support.astron.nl/lofar_issuetracker/issues/4778
if engine == "monetdb":
import monetdb.exceptions
self.exceptions = monetdb.exceptions
self.RhombusError = self.exceptions.OperationalError
elif engine == "postgresql":
import psycopg2
self.exceptions = psycopg2
self.RhombusError = self.exceptions.IntegrityError
def __getattr__(self, attrname):
obj = getattr(self.exceptions, attrname)
# Weed the cluttered psycopg2 namespace: only return things that
# really are valid database errors.
if isinstance(obj, type) and issubclass(obj, exceptions.StandardError):
return obj
else:
raise AttributeError(attrname)
[docs]class Database(object):
"""
An object representing a database connection.
"""
_connection = None
_configured = False
# this makes this class a singleton
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, **kwargs):
if self._configured:
if kwargs: logger.warning("Not configuring pre-configured database")
return
elif not kwargs:
kwargs = tkp.config.get_database_config()
self.engine = kwargs['engine']
self.database = kwargs['database']
self.user = kwargs['user']
self.password = kwargs['password']
self.host = kwargs['host']
self.port = kwargs['port']
logger.info("Database config: %s://%s@%s:%s/%s" % (self.engine,
self.user,
self.host,
self.port,
self.database))
self._configured = True
# Provide placeholders for engine-specific Exception classes
self.exceptions = DBExceptions(self.engine)
[docs] def connect(self):
"""
connect to the configured database
"""
logger.info("connecting to database...")
kwargs = {}
if self.user:
kwargs['user'] = self.user
if self.host:
kwargs['host'] = self.host
if self.database:
kwargs['database'] = self.database
if self.password:
kwargs['password'] = self.password
if self.port:
kwargs['port'] = int(self.port)
# During pipeline operation, we force autocommit to off (which should
# be the default according to the DB-API specs). See #4885.
if self.engine == 'monetdb':
import monetdb.sql
kwargs['autocommit'] = False
self._connection = monetdb.sql.connect(**kwargs)
elif self.engine == 'postgresql':
import psycopg2
self._connection = psycopg2.connect(**kwargs)
self._connection.autocommit = False
else:
msg = "engine %s not supported " % self.engine
logger.error(msg)
raise NotImplementedError(msg)
# Check that our database revision matches that expected by the
# codebase.
cursor = self.connection.cursor()
cursor.execute("SELECT value FROM version WHERE name='revision'")
schema_version = cursor.fetchone()[0]
if schema_version != DB_VERSION:
error = ("Database version incompatibility (needed %d, got %d)" %
(DB_VERSION, schema_version))
logger.error(error)
self._connection.close()
self._connection = None
raise Exception(error)
# I don't like this but it is used in some parts of TKP
self.cursor = self._connection.cursor()
logger.info("connected to: %s://%s@%s:%s/%s" % (self.engine,
self.user,
self.host,
self.port,
self.database))
@property
def connection(self):
"""
The database connection, will be created if it doesn't exists.
This is a property to be backwards compatible with the rest of TKP.
:return: a database connection
"""
if not self._connection:
self.connect()
# I don't like this but it is used in some parts of TKP
self.cursor = self._connection.cursor()
return self._connection
[docs] def close(self):
"""
close the connection if open
"""
if self._connection:
self._connection.close()
self._connection = None
[docs] def vacuum(self, table):
"""
Force a vacuum on a table, which removes dead rows. (Postgres only)
Normally the auto vacuum process does this for you, but in some cases
(for example when the table receives many insert and deletes) manual
vacuuming is necessary for performance reasons.
args:
table: name of the table in the database you want to vacuum
"""
if self.engine != "postgresql":
return
from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT,
ISOLATION_LEVEL_READ_COMMITTED)
# disable autocommit since can't vacuum in transaction
self.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = self.connection.cursor()
cursor.execute("VACUUM ANALYZE %s" % table)
# reset settings
self.connection.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED)