Source code for tkp.db.database
import exceptions
import logging
import numpy
import tkp.config
from tkp.utility import substitute_inf
logger = logging.getLogger(__name__)
# The version of the TKP DB schema which is assumed by the current tree.
# Increment whenever the schema changes.
DB_VERSION = 34
[docs]class DBExceptions(object):
"""
This provides an engine-agnostic wrapper around the exceptions that can
the thrown by the database layer: we can refer to eg
DBExcetions(engine).Error rather than <engine specific module>.Error.
We handle both the PEP-0249 exceptions as provided by the DB engine, and
add our own as necessary.
"""
def __init__(self, engine):
# RhombusError refers to unhandled source layout, See issue 4778:
# https://support.astron.nl/lofar_issuetracker/issues/4778
if engine == "monetdb":
import monetdb.exceptions
self.exceptions = monetdb.exceptions
self.RhombusError = self.exceptions.OperationalError
elif engine == "postgresql":
import psycopg2
self.exceptions = psycopg2
self.RhombusError = self.exceptions.IntegrityError
def __getattr__(self, attrname):
obj = getattr(self.exceptions, attrname)
# Weed the cluttered psycopg2 namespace: only return things that
# really are valid database errors.
if isinstance(obj, type) and issubclass(obj, exceptions.StandardError):
return obj
else:
raise AttributeError(attrname)
[docs]class Database(object):
"""
An object representing a database connection.
"""
_connection = None
_configured = False
# this makes this class a singleton
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, **kwargs):
if self._configured:
if kwargs: logger.warning("Not configuring pre-configured database")
return
elif not kwargs:
kwargs = tkp.config.get_database_config()
self.engine = kwargs['engine']
self.database = kwargs['database']
self.user = kwargs['user']
self.password = kwargs['password']
self.host = kwargs['host']
self.port = kwargs['port']
logger.info("Database config: %s://%s@%s:%s/%s" % (self.engine,
self.user,
self.host,
self.port,
self.database))
self._configured = True
# Provide placeholders for engine-specific Exception classes
self.exceptions = DBExceptions(self.engine)
[docs] def connect(self):
"""
connect to the configured database
"""
logger.info("connecting to database...")
kwargs = {}
if self.user:
kwargs['user'] = self.user
if self.host:
kwargs['host'] = self.host
if self.database:
kwargs['database'] = self.database
if self.password:
kwargs['password'] = self.password
if self.port:
kwargs['port'] = int(self.port)
# During pipeline operation, we force autocommit to off (which should
# be the default according to the DB-API specs). See #4885.
if self.engine == 'monetdb':
import monetdb.sql
kwargs['autocommit'] = False
self._connection = monetdb.sql.connect(**kwargs)
elif self.engine == 'postgresql':
import psycopg2
self._connection = psycopg2.connect(**kwargs)
self._connection.autocommit = False
else:
msg = "engine %s not supported " % self.engine
logger.error(msg)
raise NotImplementedError(msg)
# Check that our database revision matches that expected by the
# codebase.
cursor = self.connection.cursor()
cursor.execute("SELECT value FROM version WHERE name='revision'")
schema_version = cursor.fetchone()[0]
if schema_version != DB_VERSION:
error = ("Database version incompatibility (needed %d, got %d)" %
(DB_VERSION, schema_version))
logger.error(error)
self._connection.close()
self._connection = None
raise Exception(error)
# I don't like this but it is used in some parts of TKP
self.cursor = self._connection.cursor()
logger.info("connected to: %s://%s@%s:%s/%s" % (self.engine,
self.user,
self.host,
self.port,
self.database))
@property
def connection(self):
"""
The database connection, will be created if it doesn't exists.
This is a property to be backwards compatible with the rest of TKP.
:return: a database connection
"""
if not self._connection:
self.connect()
# I don't like this but it is used in some parts of TKP
self.cursor = self._connection.cursor()
return self._connection
[docs] def close(self):
"""
close the connection if open
"""
if self._connection:
self._connection.close()
self._connection = None