# Copyright (C) 2009 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . '''Manage heavy background processing tasks. bgjobs is a package that wraps around the multiprocessing module in the standard Python library. It gives a higher level abstraction for running CPU intensive background jobs in other processes. Each job is abstracted into a class with a specific interface, and bgjobs takes care of running jobs as CPUs become available, and returning the results. ''' import multiprocessing import Queue class BackgroundJob(object): '''A job to be run in the background, via multiprocessing. This is a base class. Subclasses MUST implement the run method, which does the actual job of the task. The run method should return the value that is to be passed to the main process. ''' def send_status(self, status): '''Send status info to the foreground process. The job may use this to, for example, inform a foreground UI about progress with the job. Note that the background runner sets the status_queue attribute and this is necessary for this to work. This is the same queue where results go; the caller is responsible for making sure that the values can be differentiated from the return values of the run method. ''' self.status_queue.put(status) def run(self): raise Exception('Unimplemented run method.') def worker_process(jobs, results, status): '''Run jobs and return results.''' while True: job = jobs.get() job.status_queue = results try: result = job.run() except BaseException, e: result = e results.put(result) status.put(None) jobs.close() results.close() class BackgroundManager(object): '''A manager for background jobs. This starts and stops background processes, and gives jobs to them as they become available. Call add_job() to add a new job (subclass of BackgroundJob) to the job queue, and start_jobs() to actually start executing jobs. After start_jobs() has been called, background processes will continue to wait for new jobs, and to execute them, just add them with add_job(). The caller MUST query the running property occasionally, and call stop_jobs() when shutting down. It is not necessary to call stop_jobs() until the caller application is shutting down, but it can be called at any time, even when jobs are still being executed (all queued and running jobs are terminated and forgotten, as are all existing results). Typical use: manager = BackgroundManager() manager.start_jobs() while main_loop_needs_to_run: if there is a new job: manager.add_job() do something else if manager.running: try: result = manager.results.get(block=False) except Queue.Empty: pass else: do something with result If you don't wish to poll results non-blockingly, just do this: manager = BackgroundManager() manager.start_jobs() for job in jobs: manager.add_job(job) while manager.running: result = manager.results.get(block=False) do something with result ''' # We start some child processes to run the jobs. We have a queue for # unprocessed jobs (attribute jobs), and another for results. Each # child process gets a job from the job queue, runs it, puts the result # in the result queue. # # An additional queue is used for status reports, specifically the # child processes send a token to the manager when they've finished # running the job. This is used by the manager to keep track of when # jobs have been finished. The caller needs to know this to be able # to do things like report background processes in the user interface. def __init__(self): self.init() def init(self): '''Initialize things. The user must not call this method. ''' self.jobs = multiprocessing.Queue() self.jobs_counter = 0 self.status = multiprocessing.Queue() self.results = multiprocessing.Queue() self.processes = [] @property def running(self): '''Are any child processes running jobs now?''' if not self.processes: return self.results.qsize() > 0 while True: try: item = self.status.get(block=False) except Queue.Empty: break self.jobs_counter -= 1 return self.jobs_counter > 0 or self.results.qsize() > 0 def add_job(self, job): '''Add a job to the queue. The job will be executed when there is a free CPU to do it. ''' self.jobs_counter += 1 self.jobs.put(job) def start_jobs(self, maxproc=None): '''Start executing jobs. This starts the background processes. It must not be called if there are any already running. A background process is started for each job, unless maxproc is set, in which case that many background processes are started. ''' assert self.processes == [] if maxproc is None: maxproc = multiprocessing.cpu_count() for i in range(maxproc): p = multiprocessing.Process(target=worker_process, args=(self.jobs, self.results, self.status)) self.processes.append(p) p.start() def stop_jobs(self): '''Stop processing jobs. The queue of jobs will be emptied. Currently running jobs will be killed mercielssly, and will not produce results. This call will block until all background processes have terminated. ''' # Close pipes. This will shut down background threads so that # things go away nicely. Not doing this will occasionally cause # the background threads to throw exceptions. self.jobs.close() self.results.close() self.status.close() self.jobs.join_thread() self.results.join_thread() self.status.join_thread() # Kill all processes. for p in self.processes: p.terminate() # Wait for them to die. for p in self.processes: p.join() # Start over. self.init()