Source code for tkp.main

"""
Main pipeline logic is defined here.

The science logic is a bit entwined with celery-specific functionality.
This is somewhat unavoidable, since how a task is parallelised (or not) has
implications for the resulting logic.

In general, we try to keep functions elsewhere so this file is succinct.
The exceptions are a couple of celery-specific subroutines.
"""
import imp
import logging
import os
from tkp import steps
from tkp.config import initialize_pipeline_config, get_database_config
from tkp.db import consistency as dbconsistency
from tkp.db import Image
from tkp.db import general as dbgen
from tkp.db import associations as dbass
from tkp.distribute import Runner
from tkp.steps.misc import (load_job_config, dump_configs_to_logdir,
                                   check_job_configs_match,
                                   setup_log_file, dump_database_backup,
                                group_per_timestep
                            )
from tkp.db.configstore import store_config, fetch_config
from tkp.steps.persistence import create_dataset, store_images
import tkp.steps.forced_fitting as steps_ff


logger = logging.getLogger(__name__)


[docs]def run(job_name, supplied_mon_coords=[]): pipe_config = initialize_pipeline_config( os.path.join(os.getcwd(), "pipeline.cfg"), job_name) # get parallelise props. Defaults to multiproc with autodetect num cores parallelise = pipe_config.get('parallelise', {}) distributor = os.environ.get('TKP_PARALLELISE', parallelise.get('method', 'multiproc')) runner = Runner(distributor=distributor, cores=parallelise.get('cores', 0)) debug = pipe_config.logging.debug #Setup logfile before we do anything else log_dir = pipe_config.logging.log_dir setup_log_file(log_dir, debug) job_dir = pipe_config.DEFAULT.job_directory if not os.access(job_dir, os.X_OK): msg = "can't access job folder %s" % job_dir logger.error(msg) raise IOError(msg) logger.info("Job dir: %s", job_dir) db_config = get_database_config(pipe_config.database, apply=True) dump_database_backup(db_config, job_dir) job_config = load_job_config(pipe_config) se_parset = job_config.source_extraction deruiter_radius = job_config.association.deruiter_radius beamwidths_limit = job_config.association.beamwidths_limit new_src_sigma = job_config.transient_search.new_source_sigma_margin all_images = imp.load_source('images_to_process', os.path.join(job_dir, 'images_to_process.py')).images logger.info("dataset %s contains %s images" % (job_name, len(all_images))) logger.info("performing database consistency check") if not dbconsistency.check(): logger.error("Inconsistent database found; aborting") return 1 dataset_id = create_dataset(job_config.persistence.dataset_id, job_config.persistence.description) if job_config.persistence.dataset_id == -1: store_config(job_config, dataset_id) # new data set if supplied_mon_coords: dbgen.insert_monitor_positions(dataset_id,supplied_mon_coords) else: job_config_from_db = fetch_config(dataset_id) # existing data set if check_job_configs_match(job_config, job_config_from_db): logger.debug("Job configs from file / database match OK.") else: logger.warn("Job config file has changed since dataset was " "first loaded into database. ") logger.warn("Using job config settings loaded from database, see " "log dir for details") job_config = job_config_from_db if supplied_mon_coords: logger.warn("Monitor positions supplied will be ignored. " "(Previous dataset specified)") dump_configs_to_logdir(log_dir, job_config, pipe_config) logger.info("performing persistence step") image_cache_params = pipe_config.image_cache imgs = [[img] for img in all_images] rms_est_sigma = job_config.persistence.rms_est_sigma rms_est_fraction = job_config.persistence.rms_est_fraction metadatas = runner.map("persistence_node_step", imgs, [image_cache_params, rms_est_sigma, rms_est_fraction]) metadatas = [m[0] for m in metadatas if m] logger.info("Storing images") image_ids = store_images(metadatas, job_config.source_extraction.extraction_radius_pix, dataset_id) db_images = [Image(id=image_id) for image_id in image_ids] logger.info("performing quality check") urls = [img.url for img in db_images] arguments = [job_config] rejecteds = runner.map("quality_reject_check", urls, arguments) good_images = [] for image, rejected in zip(db_images, rejecteds): if rejected: reason, comment = rejected steps.quality.reject_image(image.id, reason, comment) else: good_images.append(image) if not good_images: logger.warn("No good images under these quality checking criteria") return grouped_images = group_per_timestep(good_images) timestep_num = len(grouped_images) for n, (timestep, images) in enumerate(grouped_images): msg = "processing %s images in timestep %s (%s/%s)" logger.info(msg % (len(images), timestep, n+1, timestep_num)) logger.info("performing source extraction") urls = [img.url for img in images] arguments = [se_parset] extraction_results = runner.map("extract_sources", urls, arguments) logger.info("storing extracted sources to database") # we also set the image max,min RMS values which calculated during # source extraction for image, results in zip(images, extraction_results): image.update(rms_min=results.rms_min, rms_max=results.rms_max, detection_thresh=se_parset['detection_threshold'], analysis_thresh=se_parset['analysis_threshold']) dbgen.insert_extracted_sources(image.id, results.sources, 'blind') logger.info("performing database operations") for image in images: logger.info("performing DB operations for image %s" % image.id) logger.info("performing source association") dbass.associate_extracted_sources(image.id, deRuiter_r=deruiter_radius, new_source_sigma_margin=new_src_sigma) all_fit_posns, all_fit_ids = steps_ff.get_forced_fit_requests(image) if all_fit_posns: successful_fits, successful_ids = steps_ff.perform_forced_fits( all_fit_posns, all_fit_ids, image.url, se_parset) steps_ff.insert_and_associate_forced_fits(image.id,successful_fits, successful_ids) dbgen.update_dataset_process_end_ts(dataset_id)