Source code for tkp.steps.misc

Various subroutines used in the main pipeline flow.

We keep them separately to make the pipeline logic easier to read at a glance.

import datetime
import ConfigParser
import logging
import os
from pprint import pprint

from collections import defaultdict

from tkp.config import parse_to_dict
from tkp.db.dump import dump_db

logger = logging.getLogger(__name__)

[docs]def load_job_config(pipe_config): """ Locates the job_params.cfg in 'job_directory' and loads via ConfigParser. """ job_directory = pipe_config['DEFAULT']['job_directory'] job_config = ConfigParser.SafeConfigParser(), 'job_params.cfg')) return parse_to_dict(job_config)
def dump_configs_to_logdir(log_dir, job_config, pipe_config): if not os.path.isdir(log_dir): os.makedirs(log_dir) with open(os.path.join(log_dir, 'job_params.cfg'), 'w') as f: pprint(job_config, stream=f) with open(os.path.join(log_dir, 'pipeline.cfg'), 'w') as f: pprint(pipe_config, stream=f)
[docs]def check_job_configs_match(job_config_1, job_config_2): """ Check if job configs match, except dataset_id which we expect to change. """ jc_from_file = job_config_1.copy() jc_from_db = job_config_2.copy() del jc_from_file['persistence']['dataset_id'] del jc_from_db['persistence']['dataset_id'] return jc_from_file==jc_from_db
[docs]def setup_log_file(log_dir, debug=False, basename='trap.log'): """ sets up a catch all logging handler which writes to `log_file`. :param log_file: log file to write :param debug: do we want debug level logging? :param basename: basename of the log file """ if not os.path.isdir(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, basename) global_logger = logging.getLogger() hdlr = logging.FileHandler(log_file) global_logger.addHandler(hdlr) formatter = logging.Formatter( '%(asctime)s.%(msecs)03d %(levelname)s %(name)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S" ) hdlr.setFormatter(formatter)"logging to %s" % log_file) if debug: global_logger.setLevel(logging.DEBUG) else: global_logger.setLevel(logging.INFO)
def dump_database_backup(db_config, job_dir): if 'dump_backup_copy' in db_config: if db_config['dump_backup_copy']: output_name = os.path.join( job_dir, "%s_%s_%s.dump" % ( db_config['host'], db_config['database'],"%Y-%m-%dT%H:%M:%S") ) ) dump_db( db_config['engine'], db_config['host'], str(db_config['port']), db_config['database'], db_config['user'], db_config['password'], output_name )
[docs]def group_per_timestep(images): """ groups a list of TRAP images per time step. Per time step the images are order per frequency and then per stokes. The eventual order is: (t1, f1, s1), (t1, f1, s2), (t1, f2, s1), (t1, f2, s2), (t2, f1, s1), ...) where: * t is time sorted by old to new * f is frequency sorted from low to high * s is stokes, sorted by ID as defined in the database schema Args: images (list): Images to group. Returns: list: List of tuples. The list is sorted by timestamp. Each tuple has the timestamp as a first element, and a list of images sorted by frequency and then stokes as the second element. """ timestamp_to_images_map = defaultdict(list) for image in images: timestamp_to_images_map[image.taustart_ts].append(image) #List of (timestamp, [images_at_timestamp]) tuples: grouped_images = timestamp_to_images_map.items() # sort the tuples by first element (timestamps) grouped_images.sort() # and then sort the nested items per freq and stokes [l[1].sort(key=lambda x: (x.freq_eff, x.stokes)) for l in grouped_images] return grouped_images