Source code for nestly.scripts.nestrun

"""
nestrun.py - run commands based on control dictionaries.
"""
import argparse
import collections
import csv
import datetime
import errno
import functools
import json
import logging
import os
import os.path
import shlex
import shutil
import signal
import subprocess
import sys

from nestly.core import control_iter

# Constants to be used as defaults.
MAX_PROCS = 2                    # Set the default maximum number of child processes that can be spawned.
DRY_RUN = False                   # Run in dry_run mode, default is False.


def _terminate_procs(procs):
    """
    Terminate all processes in the process dictionary
    """
    logging.warn("Stopping all remaining processes")
    for proc, g in procs.values():
        logging.debug("[%s] SIGTERM", proc.pid)
        try:
            proc.terminate()
        except OSError as e:
            # we don't care if the process we tried to kill didn't exist.
            if e.errno != errno.ESRCH:
                raise
    sys.exit(1)

[docs]def sigterm_handler(nlocal, signum, frame): logging.warning('SIGTERM received; no longer spawning jobs') nlocal['spawn_jobs'] = False
[docs]def sigusr1_handler(running_procs, signum, frame): for pid, (proc, _) in running_procs.iteritems(): sys.stderr.write('%5d - in %s\n' % (pid, proc.working_dir)) sys.stderr.flush() # just in case it's being buffered by something
[docs]def sigint_handler(nlocal, write_this_summary, running_procs, signum, frame): if nlocal['received_SIGINT']: logging.warning('SIGINT received; terminating') _terminate_procs(running_procs) write_this_summary() sys.exit(0) else: logging.warning('SIGINT received; send again to terminate') nlocal['received_SIGINT'] = True
[docs]def invoke(max_procs, data, json_files): nlocal = {'spawn_jobs': True, 'received_SIGINT': False} running_procs = {} all_procs = [] def write_this_summary(): write_summary(all_procs, data['summary_file']) signal.signal(signal.SIGTERM, functools.partial(sigterm_handler, nlocal)) signal.signal(signal.SIGUSR1, functools.partial(sigusr1_handler, running_procs)) signal.signal(signal.SIGINT, functools.partial(sigint_handler, nlocal, write_this_summary, running_procs)) files = iter(json_files) try: while True: while nlocal['spawn_jobs'] and len(running_procs) < max_procs: try: json_file = files.next() except StopIteration: # no more files; allow other processes to finish. break g = worker(data, json_file) try: proc = g.next() except StopIteration: continue except OSError: # OSError thrown when command couldn't be started logging.exception("Exception starting %s", json_file) if data['stop_on_error']: _terminate_procs(running_procs) return else: all_procs.append(proc) running_procs[proc.pid] = proc, g try: pid, status = os.wait() except OSError as e: # wait(2) raising ECHILD means there's no child processes to wait # for anymore, so we're done. if e.errno == errno.ECHILD: return elif e.errno == errno.EINTR: continue else: raise # Pull the actual exit status - high byte of 16-bit number exit_status = os.WEXITSTATUS(status) proc, g = running_procs.pop(pid) proc.complete(exit_status) try: g.next() except StopIteration: pass else: raise ValueError('worker generators should only yield once') # Check exit status, cancel jobs if stop_on_error specified and # non-zero if exit_status: logging.warn('[%s] %s Finished with non-zero exit status %s\n%s', pid, proc.working_dir, exit_status, proc.log_tail()) if data['stop_on_error']: _terminate_procs(running_procs) return else: logging.info("[%s] %s Finished with %s", pid, proc.working_dir, exit_status) finally: write_this_summary()
[docs]def write_summary(all_procs, summary_file): """ Write a summary of all run processes to summary_file in tab-delimited format. """ if not summary_file: return with summary_file: writer = csv.writer(summary_file, delimiter='\t', lineterminator='\n') writer.writerow(('directory', 'command', 'start_time', 'end_time', 'run_time', 'exit_status', 'result')) rows = ((p.working_dir, ' '.join(p.command), p.start_time, p.end_time, p.running_time, p.return_code, p.status) for p in all_procs) writer.writerows(rows)
[docs]def template_subs_file(in_file, out_fobj, d): """ Substitute template arguments in in_file from variables in d, write the result to out_fobj. """ with open(in_file, 'r') as in_fobj: for line in in_fobj: out_fobj.write(line.format(**d))
[docs]class NestlyProcess(object): """ Metadata about a process run """ def __init__(self, command, working_dir, popen, log_name='log.txt'): self.command = command self.working_dir = working_dir self.log_name = log_name self.popen = popen self.pid = popen.pid self.return_code = None self.start_time = datetime.datetime.now() self.end_time = None self.status = 'RUNNING'
[docs] def terminate(self): self.popen.terminate() self.end_time = datetime.datetime.now() self.status = 'TERMINATED'
[docs] def complete(self, return_code): """ Mark the process as complete with provided return_code """ self.return_code = return_code self.status = 'COMPLETE' if not return_code else 'FAILED' self.end_time = datetime.datetime.now()
@property
[docs] def running_time(self): if self.end_time is None: return None return self.end_time - self.start_time
[docs] def log_tail(self, nlines=10): """ Return the last ``nlines`` lines of the log file """ log_path = os.path.join(self.working_dir, self.log_name) with open(log_path) as fp: d = collections.deque(maxlen=nlines) d.extend(fp) return ''.join(d)
[docs]def worker(data, json_file): """ Handle parameter substitution and execute command as child process. """ # PERHAPS TODO: Support either full or relative paths. with open(json_file) as fp: d = json.load(fp) json_directory = os.path.dirname(json_file) def p(*parts): return os.path.join(json_directory, *parts) # STDOUT and STDERR will be written to a log file in each job directory. log_file = data['log_file'] # A template file will be written in each job directory, including the # substitution that was performed.. savecmd_file = data['savecmd_file'] # if a template file is being used, then we write out to it template_file = data['template_file'] if template_file: output_template = p(os.path.basename(template_file)) with open(output_template, 'w') as out_fobj: template_subs_file(template_file, out_fobj, d) # Copy permissions to destination try: shutil.copymode(template_file, output_template) except OSError as e: if e.errno == errno.EPERM: logging.warn("Couldn't set permissions on %s. " "Continuing with existing permissions", output_template) else: raise work = data['template'].format(**d) if savecmd_file: with open(p(savecmd_file), 'w') as command_file: command_file.write(work + "\n") # View what actions will take place in dry_run mode. if data['dry_run']: logging.info("%s - Dry run of %s\n", p(), work) else: try: with open(p(log_file), 'w') as log: cmd = shlex.split(work) while True: try: pr = subprocess.Popen( cmd, stdout=log, stderr=log, cwd=p()) except OSError as e: if e.errno != errno.EINTR: raise continue else: break logging.info('[%d] Started %s in %s', pr.pid, work, p()) nestproc = NestlyProcess(cmd, p(), pr) yield nestproc except Exception as e: # Seems useful to print the command that failed to make the # traceback more meaningful. Note that error output could get # mixed up if two processes encounter errors at the same instant logging.error("%s - Error executing %s - %s", p(), work, e) raise e
[docs]def extant_file(x): """ 'Type' for argparse - checks that file exists but does not open. """ if not os.path.exists(x): raise argparse.ArgumentError("{0} does not exist".format(x)) return x
[docs]def parse_arguments(): """ Grab options and json files. """ max_procs = MAX_PROCS dry_run = DRY_RUN logging.basicConfig(level=logging.INFO, stream=sys.stdout, format='%(asctime)s * %(levelname)s * %(message)s') parser = argparse.ArgumentParser(description="""nestrun - substitute values into a template and run commands in parallel.""") parser.add_argument('-j', '--processes', '--local', dest='local_procs', type=int, help="""Run a maximum of N processes in parallel locally (default: %(default)s)""", metavar='N', default=MAX_PROCS) parser.add_argument('--template', dest='template', metavar="'template text'", help="""Command-execution template, e.g. bash {infile}. By default, nestrun executes the templatefile.""") parser.add_argument('--stop-on-error', action='store_true', default=False, help="""Terminate remaining processes if any process returns non-zero exit status (default: %(default)s)""") parser.add_argument('--template-file', dest='template_file', metavar="FILE", help='Command-execution template file path.') parser.add_argument('--save-cmd-file', dest='savecmd_file', help="""Name of the file that will contain the command that was executed.""") log_group = parser.add_mutually_exclusive_group() log_group.add_argument('--log-file', dest='log_file', default='log.txt', help="""Name of the file that will contain output of the executed command.""") log_group.add_argument('--no-log', dest="log_file", action="store_const", default='log.txt', const=os.devnull, help="""Don't create a log file""") parser.add_argument('--dry-run', action='store_true', help="""Dry run mode, does not execute commands.""", default=False) parser.add_argument('--summary-file', type=argparse.FileType('w'), help="""Write a summary of the run to the specified file""") ctrl_group = parser.add_argument_group('Control files') ctrl_group.add_argument('json_files', metavar='control_files', type=extant_file, nargs='*', help="""Nestly control dictionaries""") ctrl_group.add_argument('-d', '--directory', help="""Run on all control files under %(metavar)s. May be used in place of specifying control files.""", metavar='DIR') arguments = parser.parse_args() # Load controls if bool(arguments.directory) == bool(arguments.json_files): parser.error('Exactly one of `-d` and control_files must be specified.') elif arguments.directory: arguments.json_files.extend(control_iter(arguments.directory)) template = arguments.template # Make sure that either a template or a template file was given if arguments.template_file: # if given a template file, the default is to run the input if not arguments.template: template = os.path.join('.', os.path.basename(arguments.template_file)) # If using the default argument, the template must be executable: if (not os.access(arguments.template_file, os.X_OK) and not arguments.dry_run): parser.error( "{0} is not executable. Specify a template.".format( arguments.template_file)) if not (arguments.template or arguments.template_file): parser.exit("Error: Please specify either a template " "or a template file") logging.info('Template: %s', template) if arguments.local_procs is not None: max_procs = arguments.local_procs # Create a dictionary that will be shared amongst all forked processes. data = {} data['dry_run'] = arguments.dry_run data['start_directory'] = os.getcwd() data['template'] = template data['template_file'] = arguments.template_file data['savecmd_file'] = arguments.savecmd_file data['log_file'] = arguments.log_file data['stop_on_error'] = arguments.stop_on_error data['summary_file'] = arguments.summary_file return data, max_procs, arguments.json_files
[docs]def main(): data, max_procs, json_files = parse_arguments() invoke(max_procs, data, json_files)