#!/usr/bin/python # Copyright 2013 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import cliapp import logging import os import time import sys import ttystatus import yaml import cronish class DesktopCronish(cliapp.Application): def add_settings(self): self.settings.integer( ['max-jobs'], 'run at least N jobs, then quit (use 0 for infinite)', metavar='N', default=0) self.settings.boolean( ['quiet', 'q'], 'no status messaging to terminal') self.settings.integer( ['sleep'], 'if there is no known job, sleep for SECONDS', default=1) def process_args(self, args): self.ts = ttystatus.TerminalStatus(period=0.1) self.ts.format('%String(timestamp) %String(msg)') if self.settings['quiet']: self.ts.disable() self.jobs = {} self.previously = {} self.process_inputs(args) self.execute_jobs() self.ts.finish() def status(self, msg): logging.info(msg) self.ts['timestamp'] = time.strftime('%H:%M:%S') self.ts['msg'] = ' '.join(msg.split('\n')) self.ts.flush() def process_input(self, filename): self.status('Loading jobs from %s' % filename) with self.open_input(filename, 'r') as f: job_dict = yaml.safe_load(f) self.jobs.update(job_dict) def execute_jobs(self): n = 0 max_jobs = self.settings['max-jobs'] while max_jobs == 0 or n < max_jobs: job_name, when = self.choose_job() if job_name is not None: self.wait_until(when, job_name) self.execute_job(job_name) else: self.status( 'No idea what to do, sleeping for %d seconds' % self.settings['sleep']) time.sleep(self.settings['sleep']) n += 1 self.status('Stopped executing after %d jobs' % n) def choose_job(self): next_job_name = None next_when = 0 for job_name, job in self.jobs.items(): if 'interval' in job: job_when = self.when_interval_job(job_name, job) elif 'trigger-file' in job: job_when = self.when_trigger_file_job(job_name, job) else: raise cliapp.AppException( 'Unknown job trigger for %s' % job_name) if job_when is not None: if next_job_name is None or job_when <= next_when: next_job_name = job_name next_when = job_when return next_job_name, next_when def when_interval_job(self, job_name, job): return self.previously.get(job_name, 0) + job['interval'] def when_trigger_file_job(self, job_name, job): filename = job['trigger-file'] # If file doesn't exist, trigger now. if not os.path.exists(filename): return self.now() # If there is no max age for file, but it exists, never trigger. if 'trigger-age' not in job: return None # If the file exists and is too old, trigger now. mtime = os.path.getmtime(filename) if mtime + job['trigger-age'] <= self.now(): return self.now() # Do not trigger now. We can't compute the next time to trigger: # the file might go missing, or get updated, so we need to test # it every iteration. return None def wait_until(self, when, for_what): while self.now() < when: seconds = when - self.now() t = time.strftime('%H:%M:%S', time.localtime(when)) self.status('Sleeping until %s for %s' % (t, for_what)) time.sleep(seconds) def execute_job(self, job_name): job = self.jobs[job_name] self.ts.notify( '%s Executing job %s' % (time.strftime('%H:%M:%S'), job_name)) self.status('Started command: %s' % job['command']) argv = ['sh', '-c', job['command']] if 'timeout' in job: argv = ['timeout', str(job['timeout'])] + argv exit, out, err = cliapp.runcmd_unchecked(argv) if out.endswith('\n'): out = out[:-1] if out: self.ts.notify(out) if exit != 0: self.ts.error( 'ERROR: Job %s: Command failed: %s' % (job_name, job['command'])) if err: self.ts.error(err) self.previously[job_name] = self.now() def now(self): return time.time() DesktopCronish(version=cronish.__version__).run()