Source code for tkp.main

"""
The main pipeline logic, from where all other components are called.
"""
import imp
import logging
import os
from tkp import steps
from tkp.config import initialize_pipeline_config, get_database_config
import tkp.db
from tkp.db import consistency as dbconsistency
from tkp.db import Image
from tkp.db import general as dbgen
from tkp.db import associations as dbass
from tkp.db.quality import sync_rejectreasons
from tkp.distribute import Runner
from tkp.steps.misc import (load_job_config, dump_configs_to_logdir,
                            check_job_configs_match,
                            setup_logging, dump_database_backup,
                            group_per_timestep
                            )
from tkp.db.configstore import store_config, fetch_config
from tkp.steps.persistence import create_dataset, store_images
import tkp.steps.forced_fitting as steps_ff
from tkp.steps.varmetric import execute_store_varmetric


logger = logging.getLogger(__name__)


[docs]def run(job_name, supplied_mon_coords=[]): pipe_config = initialize_pipeline_config( os.path.join(os.getcwd(), "pipeline.cfg"), job_name) # get parallelise props. Defaults to multiproc with autodetect num cores parallelise = pipe_config.get('parallelise', {}) distributor = os.environ.get('TKP_PARALLELISE', parallelise.get('method', 'multiproc')) runner = Runner(distributor=distributor, cores=parallelise.get('cores', 0)) #Setup logfile before we do anything else log_dir = pipe_config.logging.log_dir setup_logging(log_dir, debug=pipe_config.logging.debug, use_colorlog=pipe_config.logging.colorlog ) job_dir = pipe_config.DEFAULT.job_directory if not os.access(job_dir, os.X_OK): msg = "can't access job folder %s" % job_dir logger.error(msg) raise IOError(msg) logger.info("Job dir: %s", job_dir) db_config = get_database_config(pipe_config.database, apply=True) dump_database_backup(db_config, job_dir) job_config = load_job_config(pipe_config) se_parset = job_config.source_extraction deruiter_radius = job_config.association.deruiter_radius beamwidths_limit = job_config.association.beamwidths_limit new_src_sigma = job_config.transient_search.new_source_sigma_margin all_images = imp.load_source('images_to_process', os.path.join(job_dir, 'images_to_process.py')).images logger.info("dataset %s contains %s images" % (job_name, len(all_images))) logger.info("performing database consistency check") if not dbconsistency.check(): logger.error("Inconsistent database found; aborting") return 1 sync_rejectreasons(tkp.db.Database().Session()) dataset_id = create_dataset(job_config.persistence.dataset_id, job_config.persistence.description) if job_config.persistence.dataset_id == -1: store_config(job_config, dataset_id) # new data set if supplied_mon_coords: dbgen.insert_monitor_positions(dataset_id,supplied_mon_coords) else: job_config_from_db = fetch_config(dataset_id) # existing data set if check_job_configs_match(job_config, job_config_from_db): logger.debug("Job configs from file / database match OK.") else: logger.warn("Job config file has changed since dataset was " "first loaded into database. ") logger.warn("Using job config settings loaded from database, see " "log dir for details") job_config = job_config_from_db if supplied_mon_coords: logger.warn("Monitor positions supplied will be ignored. " "(Previous dataset specified)") dump_configs_to_logdir(log_dir, job_config, pipe_config) logger.info("performing persistence step") image_cache_params = pipe_config.image_cache imgs = [[img] for img in all_images] rms_est_sigma = job_config.persistence.rms_est_sigma rms_est_fraction = job_config.persistence.rms_est_fraction metadatas = runner.map("persistence_node_step", imgs, [image_cache_params, rms_est_sigma, rms_est_fraction]) metadatas = [m[0] for m in metadatas if m] logger.info("Storing images") image_ids = store_images(metadatas, job_config.source_extraction.extraction_radius_pix, dataset_id) db_images = [Image(id=image_id) for image_id in image_ids] logger.info("performing quality check") urls = [img.url for img in db_images] arguments = [job_config] rejecteds = runner.map("quality_reject_check", urls, arguments) good_images = [] for image, rejected in zip(db_images, rejecteds): if rejected: reason, comment = rejected steps.quality.reject_image(image.id, reason, comment) else: good_images.append(image) if not good_images: logger.warn("No good images under these quality checking criteria") return grouped_images = group_per_timestep(good_images) timestep_num = len(grouped_images) for n, (timestep, images) in enumerate(grouped_images): msg = "processing %s images in timestep %s (%s/%s)" logger.info(msg % (len(images), timestep, n+1, timestep_num)) logger.debug("performing source extraction") urls = [img.url for img in images] arguments = [se_parset] extraction_results = runner.map("extract_sources", urls, arguments) logger.debug("storing extracted sources to database") # we also set the image max,min RMS values which calculated during # source extraction for image, results in zip(images, extraction_results): image.update(rms_min=results.rms_min, rms_max=results.rms_max, detection_thresh=se_parset['detection_threshold'], analysis_thresh=se_parset['analysis_threshold']) dbgen.insert_extracted_sources(image.id, results.sources, 'blind') for image in images: logger.info("performing DB operations for image {} ({})".format( image.id, image.url)) dbass.associate_extracted_sources(image.id, deRuiter_r=deruiter_radius, new_source_sigma_margin=new_src_sigma) expiration = job_config.source_extraction.expiration all_fit_posns, all_fit_ids = steps_ff.get_forced_fit_requests(image, expiration) if all_fit_posns: successful_fits, successful_ids = steps_ff.perform_forced_fits( all_fit_posns, all_fit_ids, image.url, se_parset) steps_ff.insert_and_associate_forced_fits(image.id,successful_fits, successful_ids) dbgen.update_dataset_process_end_ts(dataset_id) logger.info("calculating variability metrics") execute_store_varmetric(dataset_id)