Introduction

Now that the multiprocessing library comes standard in Python 2.6, I thought I’d migrate some of my apps to take full advantage. However, there aren’t many examples out there showing how to write a basic multiprocessing program with a graphical front-end. Here’s a simple wxPython multiprocessing example. The actual calculation is trivial – the hard bit is making sure all queues are emptied and processes terminated properly upon exit.

A more complete example is listed on my website.

Objects Involved

Process Overview

There are a few key aspects to the design:

Special Concerns

None yet!

Code Sample

   1 """
   2 Simpler wxPython Multiprocessing Example
   3 ----------------------------------------
   4 
   5 This simple example uses a wx.App to control and monitor a pool of workers
   6 instructed to carry out a list of tasks.
   7 
   8 The program creates the GUI plus a list of tasks, then starts a pool of workers
   9 (processes) implemented with a classmethod. Within the GUI, the user can start
  10 and stop processing the tasks at any time.
  11 
  12 Copyright (c) 2010, Roger Stuckey. All rights reserved.
  13 """
  14 
  15 import getopt, math, random, sys, time, types, wx
  16 
  17 from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support
  18 
  19 
  20 class MyFrame(wx.Frame):
  21     """
  22     A simple Frame class.
  23     """
  24     def __init__(self, parent, id, title, processes, taskqueue, donequeue, tasks):
  25         """
  26         Initialise the Frame.
  27         """
  28         self.processes = processes
  29         self.numprocesses = len(processes)
  30         self.taskQueue = taskqueue
  31         self.doneQueue = donequeue
  32         self.Tasks = tasks
  33         self.numtasks = len(tasks)
  34 
  35         wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))
  36 
  37         # Create the panel, sizer and controls
  38         self.panel = wx.Panel(self, wx.ID_ANY)
  39         self.sizer = wx.GridBagSizer(5, 5)
  40 
  41         self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
  42         self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
  43         self.start_bt.SetDefault()
  44         self.start_bt.SetToolTipString('Start the execution of tasks')
  45         self.start_bt.ToolTip.Enable(True)
  46 
  47         self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)
  48 
  49         # Add the controls to the sizer
  50         self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP|wx.RIGHT, border=5)
  51         self.sizer.Add(self.output_tc, (1, 0), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
  52         self.sizer.AddGrowableCol(0)
  53         self.sizer.AddGrowableRow(1)
  54 
  55         self.panel.SetSizer(self.sizer)
  56 
  57         self.Bind(wx.EVT_CLOSE, self.OnClose)
  58 
  59         # Set some program flags
  60         self.keepgoing = True
  61         self.i = 0
  62         self.j = 0
  63 
  64         self.output_tc.AppendText('Number of processes = %d\n' % self.numprocesses)
  65 
  66     def OnStart(self, event):
  67         """
  68         Start the execution of tasks by the processes.
  69         """
  70         self.start_bt.Enable(False)
  71         self.output_tc.AppendText('Unordered results...\n')
  72         # Start processing tasks
  73         self.processTasks(self.update)
  74         if (self.keepgoing):
  75             self.start_bt.Enable(True)
  76 
  77     def OnClose(self, event):
  78         """
  79         Stop the task queue, terminate processes and close the window.
  80         """
  81         if (self.j < self.i):
  82             self.output_tc.AppendText('Completing queued tasks...\n')
  83         self.start_bt.Enable(False)
  84         busy = wx.BusyInfo("Waiting for processes to terminate...")
  85         # Stop processing tasks and terminate the processes
  86         self.processTerm(self.update)
  87         self.Destroy()
  88 
  89     def processTasks(self, resfunc=None):
  90         """
  91         Start the execution of tasks by the processes.
  92         """
  93         self.keepgoing = True
  94 
  95         # Submit first set of tasks
  96         numprocstart = min(self.numprocesses, self.numtasks)
  97         for self.i in range(numprocstart):
  98             self.taskQueue.put(self.Tasks[self.i])
  99 
 100         self.j = -1 # done queue index
 101         self.i = numprocstart - 1 # task queue index
 102         while (self.j < self.i):
 103             # Get and print results
 104             self.j += 1
 105             output = self.doneQueue.get()
 106             # Execute some function (Yield to a wx.Button event)
 107             if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
 108                 resfunc(output)
 109             if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
 110                 # Submit another task
 111                 self.i += 1
 112                 self.taskQueue.put(self.Tasks[self.i])
 113 
 114     def update(self, output):
 115         """
 116         Get and print the results from one completed task.
 117         """
 118         self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % output)
 119         # Give the user an opportunity to interact
 120         wx.YieldIfNeeded()
 121 
 122     def processTerm(self, resfunc=None):
 123         """
 124         Stop the execution of tasks by the processes.
 125         """
 126         self.keepgoing = False
 127 
 128         while (self.j < self.i):
 129             # Get and print any results remining in the done queue
 130             self.j += 1
 131             output = self.doneQueue.get()
 132             if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
 133                 resfunc(output)
 134 
 135         for n in range(self.numprocesses):
 136             # Terminate any running processes
 137             self.processes[n].terminate()
 138 
 139         # Wait for all processes to stop
 140         isalive = 1
 141         while isalive:
 142             isalive = 0
 143             for n in range(self.numprocesses):
 144                 isalive = isalive + self.processes[n].is_alive()
 145             time.sleep(0.5)
 146 
 147     def worker(cls, input, output):
 148         """
 149         Create a TaskProcessor object and calculate the result.
 150         """
 151         while True:
 152             args = input.get()
 153             result = 0
 154             # Calculate the result of a task
 155             for i in range(args[0]):
 156                 angle_rad = math.radians(args[1])
 157                 result += math.tanh(angle_rad)/math.cosh(angle_rad)/args[0]
 158             # Put the result on the output queue
 159             output.put(( current_process().name, current_process().pid, args[1], result ))
 160 
 161     # The worker must not require any existing object for execution!
 162     worker = classmethod(worker)
 163 
 164 
 165 class MyApp(wx.App):
 166     """
 167     A simple App class, modified to hold the processes and task queues.
 168     """
 169     def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, processes=[ ], taskqueue=[ ], donequeue=[ ], tasks=[ ]):
 170         """
 171         Initialise the App.
 172         """
 173         self.Processes = processes
 174         self.taskQueue = taskqueue
 175         self.doneQueue = donequeue
 176         self.Tasks = tasks
 177 
 178         wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)
 179 
 180     def OnInit(self):
 181         """
 182         Initialise the App with a Frame.
 183         """
 184         self.frame = MyFrame(None, -1, 'wxSimpler_MP', self.Processes, self.taskQueue, self.doneQueue, self.Tasks)
 185         self.frame.Show(True)
 186         return True
 187 
 188 
 189 if __name__ == '__main__':
 190 
 191     freeze_support()
 192 
 193     numtasks = 20
 194     # Determine the number of CPU's/cores
 195     numproc = cpu_count()
 196 
 197     # Create the task list
 198     Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]
 199 
 200     # Create the queues
 201     taskQueue = Queue()
 202     doneQueue = Queue()
 203 
 204     Processes = [ ]
 205 
 206     # The worker processes must be started here!
 207     for n in range(numproc):
 208         process = Process(target=MyFrame.worker, args=(taskQueue, doneQueue))
 209         process.start()
 210         Processes.append(process)
 211 
 212     # Create the app, including worker processes
 213     app = MyApp(redirect=True, filename='wxsimpler_mp.stderr.log', processes=Processes, taskqueue=taskQueue, donequeue=doneQueue, tasks=Tasks)
 214     app.MainLoop()

Comments

If you have any questions, please feel free to contact the author, whose information is available in his profile.

MultiProcessing (last edited 2010-07-23 04:53:24 by digger2)

NOTE: To edit pages in this wiki you must be a member of the TrustedEditorsGroup.