
Now that the multiprocessing library comes standard in Python 2.6, I thought I’d migrate some of my apps to take full advantage. However, there aren’t many examples out there showing how to write a basic multiprocessing program with a graphical front-end. Here’s a simple wxPython multiprocessing example. The actual calculation is trivial – the hard bit is making sure all queues are emptied and processes terminated properly upon exit.

   1 """
   2 Simpler wxPython Multiprocessing Example
   3 ----------------------------------------
   5 This simple example uses a wx.App to control and monitor a pool of workers
   6 instructed to carry out a list of tasks.
   8 The program creates the GUI plus a list of tasks, then starts a pool of workers
   9 (processes) implemented with a classmethod. Within the GUI, the user can start
  10 and stop processing the tasks at any time.
  12 Copyright (c) 2010, Roger Stuckey. All rights reserved.
  13 """
  15 import getopt, math, random, sys, time, types, wx
  17 from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support
  20 class MyFrame(wx.Frame):
  21     """
  22     A simple Frame class.
  23     """
  24     def __init__(self, parent, id, title, processes, taskqueue, donequeue, tasks):
  25         """
  26         Initialise the Frame.
  27         """
  28         self.processes = processes
  29         self.numprocesses = len(processes)
  30         self.taskQueue = taskqueue
  31         self.doneQueue = donequeue
  32         self.Tasks = tasks
  33         self.numtasks = len(tasks)
  35         wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))
  37         # Create the panel, sizer and controls
  38         self.panel = wx.Panel(self, wx.ID_ANY)
  39         self.sizer = wx.GridBagSizer(5, 5)
  41         self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
  42         self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
  43         self.start_bt.SetDefault()
  44         self.start_bt.SetToolTipString('Start the execution of tasks')
  45         self.start_bt.ToolTip.Enable(True)
  47         self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)
  49         # Add the controls to the sizer
  50         self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP|wx.RIGHT, border=5)
  51         self.sizer.Add(self.output_tc, (1, 0), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
  52         self.sizer.AddGrowableCol(0)
  53         self.sizer.AddGrowableRow(1)
  55         self.panel.SetSizer(self.sizer)
  57         self.Bind(wx.EVT_CLOSE, self.OnClose)
  59         # Set some program flags
  60         self.keepgoing = True
  61         self.i = 0
  62         self.j = 0
  64         self.output_tc.AppendText('Number of processes = %d\n' % self.numprocesses)
  66     def OnStart(self, event):
  67         """
  68         Start the execution of tasks by the processes.
  69         """
  70         self.start_bt.Enable(False)
  71         self.output_tc.AppendText('Unordered results...\n')
  72         # Start processing tasks
  73         self.processTasks(self.update)
  74         if (self.keepgoing):
  75             self.start_bt.Enable(True)
  77     def OnClose(self, event):
  78         """
  79         Stop the task queue, terminate processes and close the window.
  80         """
  81         if (self.j < self.i):
  82             self.output_tc.AppendText('Completing queued tasks...\n')
  83         self.start_bt.Enable(False)
  84         busy = wx.BusyInfo("Waiting for processes to terminate...")
  85         # Stop processing tasks and terminate the processes
  86         self.processTerm(self.update)
  87         self.Destroy()
  89     def processTasks(self, resfunc=None):
  90         """
  91         Start the execution of tasks by the processes.
  92         """
  93         self.keepgoing = True
  95         # Submit first set of tasks
  96         numprocstart = min(self.numprocesses, self.numtasks)
  97         for self.i in range(numprocstart):
  98             self.taskQueue.put(self.Tasks[self.i])
 100         self.j = -1 # done queue index
 101         self.i = numprocstart - 1 # task queue index
 102         while (self.j < self.i):
 103             # Get and print results
 104             self.j += 1
 105             output = self.doneQueue.get()
 106             # Execute some function (Yield to a wx.Button event)
 107             if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
 108                 resfunc(output)
 109             if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
 110                 # Submit another task
 111                 self.i += 1
 112                 self.taskQueue.put(self.Tasks[self.i])
 114     def update(self, output):
 115         """
 116         Get and print the results from one completed task.
 117         """
 118         self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % output)
 119         # Give the user an opportunity to interact
 120         wx.YieldIfNeeded()
 122     def processTerm(self, resfunc=None):
 123         """
 124         Stop the execution of tasks by the processes.
 125         """
 126         self.keepgoing = False
 128         while (self.j < self.i):
 129             # Get and print any results remining in the done queue
 130             self.j += 1
 131             output = self.doneQueue.get()
 132             if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
 133                 resfunc(output)
 135         for n in range(self.numprocesses):
 136             # Terminate any running processes
 137             self.processes[n].terminate()
 139         # Wait for all processes to stop
 140         isalive = 1
 141         while isalive:
 142             isalive = 0
 143             for n in range(self.numprocesses):
 144                 isalive = isalive + self.processes[n].is_alive()
 145             time.sleep(0.5)
 147     def worker(cls, input, output):
 148         """
 149         Create a TaskProcessor object and calculate the result.
 150         """
 151         while True:
 152             args = input.get()
 153             result = 0
 154             # Calculate the result of a task
 155             for i in range(args[0]):
 156                 angle_rad = math.radians(args[1])
 157                 result += math.tanh(angle_rad)/math.cosh(angle_rad)/args[0]
 158             # Put the result on the output queue
 159             output.put(( current_process().name, current_process().pid, args[1], result ))
 161     # The worker must not require any existing object for execution!
 162     worker = classmethod(worker)
 165 class MyApp(wx.App):
 166     """
 167     A simple App class, modified to hold the processes and task queues.
 168     """
 169     def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, processes=[ ], taskqueue=[ ], donequeue=[ ], tasks=[ ]):
 170         """
 171         Initialise the App.
 172         """
 173         self.Processes = processes
 174         self.taskQueue = taskqueue
 175         self.doneQueue = donequeue
 176         self.Tasks = tasks
 178         wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)
 180     def OnInit(self):
 181         """
 182         Initialise the App with a Frame.
 183         """
 184         self.frame = MyFrame(None, -1, 'wxSimpler_MP', self.Processes, self.taskQueue, self.doneQueue, self.Tasks)
 185         self.frame.Show(True)
 186         return True
 189 if __name__ == '__main__':
 191     freeze_support()
 193     numtasks = 20
 194     # Determine the number of CPU's/cores
 195     numproc = cpu_count()
 197     # Create the task list
 198     Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]
 200     # Create the queues
 201     taskQueue = Queue()
 202     doneQueue = Queue()
 204     Processes = [ ]
 206     # The worker processes must be started here!
 207     for n in range(numproc):
 208         process = Process(target=MyFrame.worker, args=(taskQueue, doneQueue))
 209         process.start()
 210         Processes.append(process)
 212     # Create the app, including worker processes
 213     app = MyApp(redirect=True, filename='wxsimpler_mp.stderr.log', processes=Processes, taskqueue=taskQueue, donequeue=doneQueue, tasks=Tasks)
 214     app.MainLoop()


If you have any questions, please feel free to contact the author, whose information is available in his profile.

