diff options
Diffstat (limited to 'trunk/dimbola/bgjobs.py')
-rw-r--r-- | trunk/dimbola/bgjobs.py | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/trunk/dimbola/bgjobs.py b/trunk/dimbola/bgjobs.py new file mode 100644 index 0000000..2b37143 --- /dev/null +++ b/trunk/dimbola/bgjobs.py @@ -0,0 +1,231 @@ +# Copyright (C) 2009 Lars Wirzenius <liw@liw.fi> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +'''Manage heavy background processing tasks. + +bgjobs is a package that wraps around the multiprocessing module in +the standard Python library. It gives a higher level abstraction for +running CPU intensive background jobs in other processes. Each job is +abstracted into a class with a specific interface, and bgjobs +takes care of running jobs as CPUs become available, and returning +the results. + +''' + + +import multiprocessing +import Queue + + +class BackgroundJob(object): + + '''A job to be run in the background, via multiprocessing. + + This is a base class. Subclasses MUST implement the run method, which + does the actual job of the task. The run method should return the value + that is to be passed to the main process. + + ''' + + def send_status(self, status): + '''Send status info to the foreground process. + + The job may use this to, for example, inform a foreground UI + about progress with the job. + + Note that the background runner sets the status_queue attribute + and this is necessary for this to work. This is the same queue + where results go; the caller is responsible for making sure + that the values can be differentiated from the return values + of the run method. + + ''' + self.status_queue.put(status) + + def run(self): + raise Exception('Unimplemented run method.') + + +def worker_process(jobs, results, status): + '''Run jobs and return results.''' + + while True: + job = jobs.get() + + job.status_queue = results + try: + result = job.run() + except BaseException, e: + result = e + + results.put(result) + status.put(None) + + jobs.close() + results.close() + + +class BackgroundManager(object): + + '''A manager for background jobs. + + This starts and stops background processes, and gives jobs to them + as they become available. + + Call add_job() to add a new job (subclass of BackgroundJob) to the + job queue, and start_jobs() to actually start executing jobs. After + start_jobs() has been called, background processes will continue to + wait for new jobs, and to execute them, just add them with add_job(). + + The caller MUST query the running property occasionally, and call + stop_jobs() when shutting down. It is not necessary to call stop_jobs() + until the caller application is shutting down, but it can be called + at any time, even when jobs are still being executed (all queued + and running jobs are terminated and forgotten, as are all existing + results). + + Typical use: + + manager = BackgroundManager() + manager.start_jobs() + + while main_loop_needs_to_run: + if there is a new job: + manager.add_job() + do something else + if manager.running: + try: + result = manager.results.get(block=False) + except Queue.Empty: + pass + else: + do something with result + + If you don't wish to poll results non-blockingly, just do this: + + manager = BackgroundManager() + manager.start_jobs() + for job in jobs: + manager.add_job(job) + + while manager.running: + result = manager.results.get(block=False) + do something with result + + ''' + + # We start some child processes to run the jobs. We have a queue for + # unprocessed jobs (attribute jobs), and another for results. Each + # child process gets a job from the job queue, runs it, puts the result + # in the result queue. + # + # An additional queue is used for status reports, specifically the + # child processes send a token to the manager when they've finished + # running the job. This is used by the manager to keep track of when + # jobs have been finished. The caller needs to know this to be able + # to do things like report background processes in the user interface. + + def __init__(self): + self.init() + + def init(self): + '''Initialize things. + + The user must not call this method. + + ''' + self.jobs = multiprocessing.Queue() + self.jobs_counter = 0 + self.status = multiprocessing.Queue() + self.results = multiprocessing.Queue() + self.processes = [] + + @property + def running(self): + '''Are any child processes running jobs now?''' + + if not self.processes: + return self.results.qsize() > 0 + + while True: + try: + item = self.status.get(block=False) + except Queue.Empty: + break + self.jobs_counter -= 1 + + return self.jobs_counter > 0 or self.results.qsize() > 0 + + def add_job(self, job): + '''Add a job to the queue. + + The job will be executed when there is a free CPU to do it. + + ''' + + self.jobs_counter += 1 + self.jobs.put(job) + + def start_jobs(self, maxproc=None): + '''Start executing jobs. + + This starts the background processes. It must not be called if + there are any already running. A background process is started + for each job, unless maxproc is set, in which case that many + background processes are started. + + ''' + + assert self.processes == [] + if maxproc is None: + maxproc = multiprocessing.cpu_count() + for i in range(maxproc): + p = multiprocessing.Process(target=worker_process, + args=(self.jobs, self.results, + self.status)) + self.processes.append(p) + p.start() + + def stop_jobs(self): + '''Stop processing jobs. + + The queue of jobs will be emptied. Currently running jobs will + be killed mercielssly, and will not produce results. This call + will block until all background processes have terminated. + + ''' + + # Close pipes. This will shut down background threads so that + # things go away nicely. Not doing this will occasionally cause + # the background threads to throw exceptions. + self.jobs.close() + self.results.close() + self.status.close() + self.jobs.join_thread() + self.results.join_thread() + self.status.join_thread() + + # Kill all processes. + for p in self.processes: + p.terminate() + + # Wait for them to die. + for p in self.processes: + p.join() + + # Start over. + self.init() + |