summaryrefslogtreecommitdiff
path: root/trunk/dimbola/bgjobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/dimbola/bgjobs.py')
-rw-r--r--trunk/dimbola/bgjobs.py231
1 files changed, 231 insertions, 0 deletions
diff --git a/trunk/dimbola/bgjobs.py b/trunk/dimbola/bgjobs.py
new file mode 100644
index 0000000..2b37143
--- /dev/null
+++ b/trunk/dimbola/bgjobs.py
@@ -0,0 +1,231 @@
+# Copyright (C) 2009 Lars Wirzenius <liw@liw.fi>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+'''Manage heavy background processing tasks.
+
+bgjobs is a package that wraps around the multiprocessing module in
+the standard Python library. It gives a higher level abstraction for
+running CPU intensive background jobs in other processes. Each job is
+abstracted into a class with a specific interface, and bgjobs
+takes care of running jobs as CPUs become available, and returning
+the results.
+
+'''
+
+
+import multiprocessing
+import Queue
+
+
+class BackgroundJob(object):
+
+ '''A job to be run in the background, via multiprocessing.
+
+ This is a base class. Subclasses MUST implement the run method, which
+ does the actual job of the task. The run method should return the value
+ that is to be passed to the main process.
+
+ '''
+
+ def send_status(self, status):
+ '''Send status info to the foreground process.
+
+ The job may use this to, for example, inform a foreground UI
+ about progress with the job.
+
+ Note that the background runner sets the status_queue attribute
+ and this is necessary for this to work. This is the same queue
+ where results go; the caller is responsible for making sure
+ that the values can be differentiated from the return values
+ of the run method.
+
+ '''
+ self.status_queue.put(status)
+
+ def run(self):
+ raise Exception('Unimplemented run method.')
+
+
+def worker_process(jobs, results, status):
+ '''Run jobs and return results.'''
+
+ while True:
+ job = jobs.get()
+
+ job.status_queue = results
+ try:
+ result = job.run()
+ except BaseException, e:
+ result = e
+
+ results.put(result)
+ status.put(None)
+
+ jobs.close()
+ results.close()
+
+
+class BackgroundManager(object):
+
+ '''A manager for background jobs.
+
+ This starts and stops background processes, and gives jobs to them
+ as they become available.
+
+ Call add_job() to add a new job (subclass of BackgroundJob) to the
+ job queue, and start_jobs() to actually start executing jobs. After
+ start_jobs() has been called, background processes will continue to
+ wait for new jobs, and to execute them, just add them with add_job().
+
+ The caller MUST query the running property occasionally, and call
+ stop_jobs() when shutting down. It is not necessary to call stop_jobs()
+ until the caller application is shutting down, but it can be called
+ at any time, even when jobs are still being executed (all queued
+ and running jobs are terminated and forgotten, as are all existing
+ results).
+
+ Typical use:
+
+ manager = BackgroundManager()
+ manager.start_jobs()
+
+ while main_loop_needs_to_run:
+ if there is a new job:
+ manager.add_job()
+ do something else
+ if manager.running:
+ try:
+ result = manager.results.get(block=False)
+ except Queue.Empty:
+ pass
+ else:
+ do something with result
+
+ If you don't wish to poll results non-blockingly, just do this:
+
+ manager = BackgroundManager()
+ manager.start_jobs()
+ for job in jobs:
+ manager.add_job(job)
+
+ while manager.running:
+ result = manager.results.get(block=False)
+ do something with result
+
+ '''
+
+ # We start some child processes to run the jobs. We have a queue for
+ # unprocessed jobs (attribute jobs), and another for results. Each
+ # child process gets a job from the job queue, runs it, puts the result
+ # in the result queue.
+ #
+ # An additional queue is used for status reports, specifically the
+ # child processes send a token to the manager when they've finished
+ # running the job. This is used by the manager to keep track of when
+ # jobs have been finished. The caller needs to know this to be able
+ # to do things like report background processes in the user interface.
+
+ def __init__(self):
+ self.init()
+
+ def init(self):
+ '''Initialize things.
+
+ The user must not call this method.
+
+ '''
+ self.jobs = multiprocessing.Queue()
+ self.jobs_counter = 0
+ self.status = multiprocessing.Queue()
+ self.results = multiprocessing.Queue()
+ self.processes = []
+
+ @property
+ def running(self):
+ '''Are any child processes running jobs now?'''
+
+ if not self.processes:
+ return self.results.qsize() > 0
+
+ while True:
+ try:
+ item = self.status.get(block=False)
+ except Queue.Empty:
+ break
+ self.jobs_counter -= 1
+
+ return self.jobs_counter > 0 or self.results.qsize() > 0
+
+ def add_job(self, job):
+ '''Add a job to the queue.
+
+ The job will be executed when there is a free CPU to do it.
+
+ '''
+
+ self.jobs_counter += 1
+ self.jobs.put(job)
+
+ def start_jobs(self, maxproc=None):
+ '''Start executing jobs.
+
+ This starts the background processes. It must not be called if
+ there are any already running. A background process is started
+ for each job, unless maxproc is set, in which case that many
+ background processes are started.
+
+ '''
+
+ assert self.processes == []
+ if maxproc is None:
+ maxproc = multiprocessing.cpu_count()
+ for i in range(maxproc):
+ p = multiprocessing.Process(target=worker_process,
+ args=(self.jobs, self.results,
+ self.status))
+ self.processes.append(p)
+ p.start()
+
+ def stop_jobs(self):
+ '''Stop processing jobs.
+
+ The queue of jobs will be emptied. Currently running jobs will
+ be killed mercielssly, and will not produce results. This call
+ will block until all background processes have terminated.
+
+ '''
+
+ # Close pipes. This will shut down background threads so that
+ # things go away nicely. Not doing this will occasionally cause
+ # the background threads to throw exceptions.
+ self.jobs.close()
+ self.results.close()
+ self.status.close()
+ self.jobs.join_thread()
+ self.results.join_thread()
+ self.status.join_thread()
+
+ # Kill all processes.
+ for p in self.processes:
+ p.terminate()
+
+ # Wait for them to die.
+ for p in self.processes:
+ p.join()
+
+ # Start over.
+ self.init()
+