Introduction
Now that the multiprocessing library comes standard in Python 2.6, I thought I’d migrate some of my apps to take full advantage. However, there aren’t many examples out there showing how to write a basic multiprocessing program with a graphical front-end. Here’s a simple wxPython multiprocessing example. The actual calculation is trivial – the hard bit is making sure all queues are emptied and processes terminated properly upon exit.
A more complete example is listed on my website.
Objects Involved
- Processes
- Queues
Process Overview
There are a few key aspects to the design:
On Windows, it is important to protect the “entry point” of the program by using if __name__ == '__main__': as recommended in the official documentation.
The worker processes must be independent of any object instance (ie. defined as a class method or similar) and started prior to wx.App instantiation (ie. in the __main__).
- The task queue should be given tasks only as required to prevent any residual tasks upon the cessation of processing.
Special Concerns
None yet!
Code Sample
1 """
2 Simpler wxPython Multiprocessing Example
3 ----------------------------------------
4
5 This simple example uses a wx.App to control and monitor a pool of workers
6 instructed to carry out a list of tasks.
7
8 The program creates the GUI plus a list of tasks, then starts a pool of workers
9 (processes) implemented with a classmethod. Within the GUI, the user can start
10 and stop processing the tasks at any time.
11
12 Copyright (c) 2010, Roger Stuckey. All rights reserved.
13 """
14
15 import getopt, math, random, sys, time, types, wx
16
17 from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support
18
19
20 class MyFrame(wx.Frame):
21 """
22 A simple Frame class.
23 """
24 def __init__(self, parent, id, title, processes, taskqueue, donequeue, tasks):
25 """
26 Initialise the Frame.
27 """
28 self.processes = processes
29 self.numprocesses = len(processes)
30 self.taskQueue = taskqueue
31 self.doneQueue = donequeue
32 self.Tasks = tasks
33 self.numtasks = len(tasks)
34
35 wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))
36
37 # Create the panel, sizer and controls
38 self.panel = wx.Panel(self, wx.ID_ANY)
39 self.sizer = wx.GridBagSizer(5, 5)
40
41 self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
42 self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
43 self.start_bt.SetDefault()
44 self.start_bt.SetToolTipString('Start the execution of tasks')
45 self.start_bt.ToolTip.Enable(True)
46
47 self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)
48
49 # Add the controls to the sizer
50 self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP|wx.RIGHT, border=5)
51 self.sizer.Add(self.output_tc, (1, 0), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
52 self.sizer.AddGrowableCol(0)
53 self.sizer.AddGrowableRow(1)
54
55 self.panel.SetSizer(self.sizer)
56
57 self.Bind(wx.EVT_CLOSE, self.OnClose)
58
59 # Set some program flags
60 self.keepgoing = True
61 self.i = 0
62 self.j = 0
63
64 self.output_tc.AppendText('Number of processes = %d\n' % self.numprocesses)
65
66 def OnStart(self, event):
67 """
68 Start the execution of tasks by the processes.
69 """
70 self.start_bt.Enable(False)
71 self.output_tc.AppendText('Unordered results...\n')
72 # Start processing tasks
73 self.processTasks(self.update)
74 if (self.keepgoing):
75 self.start_bt.Enable(True)
76
77 def OnClose(self, event):
78 """
79 Stop the task queue, terminate processes and close the window.
80 """
81 if (self.j < self.i):
82 self.output_tc.AppendText('Completing queued tasks...\n')
83 self.start_bt.Enable(False)
84 busy = wx.BusyInfo("Waiting for processes to terminate...")
85 # Stop processing tasks and terminate the processes
86 self.processTerm(self.update)
87 self.Destroy()
88
89 def processTasks(self, resfunc=None):
90 """
91 Start the execution of tasks by the processes.
92 """
93 self.keepgoing = True
94
95 # Submit first set of tasks
96 numprocstart = min(self.numprocesses, self.numtasks)
97 for self.i in range(numprocstart):
98 self.taskQueue.put(self.Tasks[self.i])
99
100 self.j = -1 # done queue index
101 self.i = numprocstart - 1 # task queue index
102 while (self.j < self.i):
103 # Get and print results
104 self.j += 1
105 output = self.doneQueue.get()
106 # Execute some function (Yield to a wx.Button event)
107 if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
108 resfunc(output)
109 if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
110 # Submit another task
111 self.i += 1
112 self.taskQueue.put(self.Tasks[self.i])
113
114 def update(self, output):
115 """
116 Get and print the results from one completed task.
117 """
118 self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % output)
119 # Give the user an opportunity to interact
120 wx.YieldIfNeeded()
121
122 def processTerm(self, resfunc=None):
123 """
124 Stop the execution of tasks by the processes.
125 """
126 self.keepgoing = False
127
128 while (self.j < self.i):
129 # Get and print any results remining in the done queue
130 self.j += 1
131 output = self.doneQueue.get()
132 if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
133 resfunc(output)
134
135 for n in range(self.numprocesses):
136 # Terminate any running processes
137 self.processes[n].terminate()
138
139 # Wait for all processes to stop
140 isalive = 1
141 while isalive:
142 isalive = 0
143 for n in range(self.numprocesses):
144 isalive = isalive + self.processes[n].is_alive()
145 time.sleep(0.5)
146
147 def worker(cls, input, output):
148 """
149 Create a TaskProcessor object and calculate the result.
150 """
151 while True:
152 args = input.get()
153 result = 0
154 # Calculate the result of a task
155 for i in range(args[0]):
156 angle_rad = math.radians(args[1])
157 result += math.tanh(angle_rad)/math.cosh(angle_rad)/args[0]
158 # Put the result on the output queue
159 output.put(( current_process().name, current_process().pid, args[1], result ))
160
161 # The worker must not require any existing object for execution!
162 worker = classmethod(worker)
163
164
165 class MyApp(wx.App):
166 """
167 A simple App class, modified to hold the processes and task queues.
168 """
169 def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, processes=[ ], taskqueue=[ ], donequeue=[ ], tasks=[ ]):
170 """
171 Initialise the App.
172 """
173 self.Processes = processes
174 self.taskQueue = taskqueue
175 self.doneQueue = donequeue
176 self.Tasks = tasks
177
178 wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)
179
180 def OnInit(self):
181 """
182 Initialise the App with a Frame.
183 """
184 self.frame = MyFrame(None, -1, 'wxSimpler_MP', self.Processes, self.taskQueue, self.doneQueue, self.Tasks)
185 self.frame.Show(True)
186 return True
187
188
189 if __name__ == '__main__':
190
191 freeze_support()
192
193 numtasks = 20
194 # Determine the number of CPU's/cores
195 numproc = cpu_count()
196
197 # Create the task list
198 Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]
199
200 # Create the queues
201 taskQueue = Queue()
202 doneQueue = Queue()
203
204 Processes = [ ]
205
206 # The worker processes must be started here!
207 for n in range(numproc):
208 process = Process(target=MyFrame.worker, args=(taskQueue, doneQueue))
209 process.start()
210 Processes.append(process)
211
212 # Create the app, including worker processes
213 app = MyApp(redirect=True, filename='wxsimpler_mp.stderr.log', processes=Processes, taskqueue=taskQueue, donequeue=doneQueue, tasks=Tasks)
214 app.MainLoop()
Comments
If you have any questions, please feel free to contact the author, whose information is available in his profile.