summaryrefslogtreecommitdiff
path: root/trunk/dimbola/bgjobs.py
blob: 2b37143a8124cfa0dc471485cc1a1d502e1a209b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# Copyright (C) 2009  Lars Wirzenius <liw@liw.fi>
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


'''Manage heavy background processing tasks.

bgjobs is a package that wraps around the multiprocessing module in
the standard Python library. It gives a higher level abstraction for
running CPU intensive background jobs in other processes. Each job is
abstracted into a class with a specific interface, and bgjobs
takes care of running jobs as CPUs become available, and returning
the results.

'''


import multiprocessing
import Queue


class BackgroundJob(object):

    '''A job to be run in the background, via multiprocessing.
    
    This is a base class. Subclasses MUST implement the run method, which
    does the actual job of the task. The run method should return the value
    that is to be passed to the main process.
    
    '''

    def send_status(self, status):
        '''Send status info to the foreground process.
        
        The job may use this to, for example, inform a foreground UI
        about progress with the job.
        
        Note that the background runner sets the status_queue attribute
        and this is necessary for this to work. This is the same queue
        where results go; the caller is responsible for making sure
        that the values can be differentiated from the return values
        of the run method.
        
        '''
        self.status_queue.put(status)

    def run(self):
        raise Exception('Unimplemented run method.')


def worker_process(jobs, results, status):
    '''Run jobs and return results.'''

    while True:
        job = jobs.get()

        job.status_queue = results
        try:
            result = job.run()
        except BaseException, e:
            result = e
        
        results.put(result)
        status.put(None)

    jobs.close()
    results.close()


class BackgroundManager(object):

    '''A manager for background jobs.
    
    This starts and stops background processes, and gives jobs to them
    as they become available.
    
    Call add_job() to add a new job (subclass of BackgroundJob) to the
    job queue, and start_jobs() to actually start executing jobs. After
    start_jobs() has been called, background processes will continue to
    wait for new jobs, and to execute them, just add them with add_job().
    
    The caller MUST query the running property occasionally, and call
    stop_jobs() when shutting down. It is not necessary to call stop_jobs()
    until the caller application is shutting down, but it can be called
    at any time, even when jobs are still being executed (all queued
    and running jobs are terminated and forgotten, as are all existing
    results).
    
    Typical use:
    
        manager = BackgroundManager()
        manager.start_jobs()
        
        while main_loop_needs_to_run:
            if there is a new job:
                manager.add_job()
            do something else
            if manager.running:
                try:
                    result = manager.results.get(block=False)
                except Queue.Empty:
                    pass
                else:
                    do something with result

    If you don't wish to poll results non-blockingly, just do this:

        manager = BackgroundManager()
        manager.start_jobs()
        for job in jobs:
            manager.add_job(job)
        
        while manager.running:
            result = manager.results.get(block=False)
            do something with result
    
    '''
    
    # We start some child processes to run the jobs. We have a queue for
    # unprocessed jobs (attribute jobs), and another for results. Each
    # child process gets a job from the job queue, runs it, puts the result
    # in the result queue.
    #
    # An additional queue is used for status reports, specifically the
    # child processes send a token to the manager when they've finished
    # running the job. This is used by the manager to keep track of when
    # jobs have been finished. The caller needs to know this to be able
    # to do things like report background processes in the user interface.
    
    def __init__(self):
        self.init()

    def init(self):
        '''Initialize things.
        
        The user must not call this method.
        
        '''
        self.jobs = multiprocessing.Queue()
        self.jobs_counter = 0
        self.status = multiprocessing.Queue()
        self.results = multiprocessing.Queue()
        self.processes = []
        
    @property
    def running(self):
        '''Are any child processes running jobs now?'''

        if not self.processes:
            return self.results.qsize() > 0

        while True:
            try:
                item = self.status.get(block=False)
            except Queue.Empty:
                break
            self.jobs_counter -= 1
        
        return self.jobs_counter > 0 or self.results.qsize() > 0

    def add_job(self, job):
        '''Add a job to the queue.
        
        The job will be executed when there is a free CPU to do it.
        
        '''

        self.jobs_counter += 1
        self.jobs.put(job)

    def start_jobs(self, maxproc=None):
        '''Start executing jobs.
        
        This starts the background processes. It must not be called if
        there are any already running. A background process is started
        for each job, unless maxproc is set, in which case that many
        background processes are started.
        
        '''

        assert self.processes == []
        if maxproc is None:
            maxproc = multiprocessing.cpu_count()
        for i in range(maxproc):
            p = multiprocessing.Process(target=worker_process, 
                                        args=(self.jobs, self.results,
                                              self.status))
            self.processes.append(p)
            p.start()

    def stop_jobs(self):
        '''Stop processing jobs.
        
        The queue of jobs will be emptied. Currently running jobs will
        be killed mercielssly, and will not produce results. This call 
        will block until all background processes have terminated.
        
        '''
        
        # Close pipes. This will shut down background threads so that
        # things go away nicely. Not doing this will occasionally cause
        # the background threads to throw exceptions.
        self.jobs.close()
        self.results.close()
        self.status.close()
        self.jobs.join_thread()
        self.results.join_thread()
        self.status.join_thread()

        # Kill all processes.
        for p in self.processes:
            p.terminate()

        # Wait for them to die.
        for p in self.processes:
            p.join()

        # Start over.
        self.init()