Source code for tkp.steps.misc

"""
Various subroutines used in the main pipeline flow.

We keep them separately to make the pipeline logic easier to read at a glance.
"""

import datetime
import ConfigParser
import logging
import os
from pprint import pprint

from collections import defaultdict

from tkp.config import parse_to_dict
from tkp.db.dump import dump_db

import colorlog

logger = logging.getLogger(__name__)


[docs]def load_job_config(pipe_config): """ Locates the job_params.cfg in 'job_directory' and loads via ConfigParser. """ job_directory = pipe_config['DEFAULT']['job_directory'] job_config = ConfigParser.SafeConfigParser() job_config.read(os.path.join(job_directory, 'job_params.cfg')) return parse_to_dict(job_config)
def dump_configs_to_logdir(log_dir, job_config, pipe_config): if not os.path.isdir(log_dir): os.makedirs(log_dir) with open(os.path.join(log_dir, 'job_params.cfg'), 'w') as f: pprint(job_config, stream=f) with open(os.path.join(log_dir, 'pipeline.cfg'), 'w') as f: pprint(pipe_config, stream=f)
[docs]def check_job_configs_match(job_config_1, job_config_2): """ Check if job configs match, except dataset_id which we expect to change. """ jc_from_file = job_config_1.copy() jc_from_db = job_config_2.copy() del jc_from_file['persistence']['dataset_id'] del jc_from_db['persistence']['dataset_id'] return jc_from_file==jc_from_db
[docs]def setup_logging(log_dir, debug, use_colorlog, basename='trap'): """ Sets up logging to stdout, + info/debug level logfiles in log_dir. Args: log_dir (string): log directory debug (bool): do we want debug level logging on stdout? basename (string): basename of the log file """ if not os.path.isdir(log_dir): os.makedirs(log_dir) info_log_file = os.path.join(log_dir, basename+'.log') debug_log_file = os.path.join(log_dir, basename+'.debug.log') long_formatter = logging.Formatter( '%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S" ) short_formatter = logging.Formatter( '%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt="%H:%M:%S") info_hdlr = logging.FileHandler(info_log_file) info_hdlr.setLevel(logging.INFO) info_hdlr.setFormatter(long_formatter) debug_hdlr = logging.FileHandler(debug_log_file) debug_hdlr.setLevel(logging.DEBUG) debug_hdlr.setFormatter(long_formatter) stdout_handler = logging.StreamHandler() color_fmt = colorlog.ColoredFormatter( "%(log_color)s%(asctime)s:%(name)s:%(levelname)s%(reset)s %(blue)s%(message)s", datefmt= "%H:%M:%S", reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red', } ) if use_colorlog: stdout_handler.setFormatter(color_fmt) else: stdout_handler.setFormatter(short_formatter) if debug: stdout_handler.setLevel(logging.DEBUG) else: stdout_handler.setLevel(logging.INFO) root_logger = logging.getLogger() #We set level to debug, and handle output via handler-levels root_logger.setLevel(logging.DEBUG) #Trash any preset handlers and start fresh root_logger.handlers = [] root_logger.addHandler(stdout_handler) root_logger.addHandler(info_hdlr) root_logger.addHandler(debug_hdlr) logger.info("logging to %s" % log_dir) #Suppress noisy streams: logging.getLogger('tkp.sourcefinder.image.sigmaclip').setLevel(logging.INFO)
def dump_database_backup(db_config, job_dir): if 'dump_backup_copy' in db_config: if db_config['dump_backup_copy']: output_name = os.path.join( job_dir, "%s_%s_%s.dump" % ( db_config['host'], db_config['database'], datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S") ) ) dump_db( db_config['engine'], db_config['host'], str(db_config['port']), db_config['database'], db_config['user'], db_config['password'], output_name )
[docs]def group_per_timestep(images): """ groups a list of TRAP images per time step. Per time step the images are order per frequency and then per stokes. The eventual order is: (t1, f1, s1), (t1, f1, s2), (t1, f2, s1), (t1, f2, s2), (t2, f1, s1), ...) where: * t is time sorted by old to new * f is frequency sorted from low to high * s is stokes, sorted by ID as defined in the database schema Args: images (list): Images to group. Returns: list: List of tuples. The list is sorted by timestamp. Each tuple has the timestamp as a first element, and a list of images sorted by frequency and then stokes as the second element. """ timestamp_to_images_map = defaultdict(list) for image in images: timestamp_to_images_map[image.taustart_ts].append(image) #List of (timestamp, [images_at_timestamp]) tuples: grouped_images = timestamp_to_images_map.items() # sort the tuples by first element (timestamps) grouped_images.sort() # and then sort the nested items per freq and stokes [l[1].sort(key=lambda x: (x.freq_eff, x.stokes)) for l in grouped_images] return grouped_images