Attachment 'ThreadedProcess_Demo.py'
Download 1 """
2 ThreadedProcess_Try.py
3
4 Derived from "wxPython and Threads" in "Mouse vs python" by Mike Driscol
5 @ http://www.blog.pythonlibrary.org/2010/05/22/wxpython-and-threads/
6
7 Demonstration of a threaded process which sends back multiple
8 message types using a single message name.
9
10 """
11
12 import time
13 import random
14 from threading import Thread
15
16 import wx
17
18 from wx.lib.pubsub import Publisher
19
20 #------------------------------------------------------------------------------
21
22 class MyFrame( wx.Frame ) :
23
24 def __init__( self ) :
25
26 wx.Frame.__init__( self, None, -1, 'Threading Tutorial', pos=(40, 40) )
27 self.SetClientSize( (500, 500) )
28
29 self.frmPanel = wx.Panel( self, wx.ID_ANY )
30
31 self.btn = wx.Button( self.frmPanel, label='Start Thread' )
32 self.btn.Bind( wx.EVT_BUTTON, self.OnButton )
33
34 self.displayTxtCtrl = wx.TextCtrl( self.frmPanel, -1,
35 style=wx.TE_MULTILINE|wx.TE_READONLY )
36 font = wx.Font( 9, wx.MODERN, wx.NORMAL, wx.NORMAL, False )
37 self.displayTxtCtrl.SetFont( font ) # a bigger font
38
39 #----- Lay out the two controls
40
41 self.LayoutFrmPanel()
42
43 #-----
44
45 # Create a pubsub receiver.
46 Publisher().subscribe( self.DisplayThreadMessages, 'update' )
47
48 #end __init__
49
50 #----------------------------------
51
52 def LayoutFrmPanel( self ) :
53 """ A simple and clean layout. """
54
55 sizer = wx.BoxSizer( wx.VERTICAL )
56 sizer.Add ( (0, 15) ) # a vertical spacer at the top
57 sizer.Add( self.btn, 0, wx.CENTER ) # center using its "best size"
58 sizer.Add ( (0, 15) ) # spacer between controls
59 sizer.Add( self.displayTxtCtrl, 1, wx.EXPAND|wx.CENTER ) # Expand along both axes.
60 sizer.Add ( (0, 15) ) # bottom spacer
61
62 self.frmPanel.SetSizer( sizer )
63
64 #----------------------------------
65
66 def OnButton( self, event ) :
67 """ Creates the thread and then starts it. """
68
69 self.btn.Disable()
70
71 status = TestThread() # Create and start the thread.
72 statusMsg = 'OnButton(): status = %s\n\n' % (status)
73
74 self.displayTxtCtrl.WriteText( '\nOnButton(): Thread started.\n' )
75 self.displayTxtCtrl.WriteText( statusMsg )
76
77 self.btn.Disable()
78
79 #end def
80
81 #----------------------------------
82
83 def DisplayThreadMessages( self, msg ) :
84 """ Receives data from thread and updates the display. """
85
86 msgData = msg.data
87 if isinstance( msgData, str ) :
88 self.displayTxtCtrl.WriteText( 'Textual message = [ %s ]\n' % (msgData) )
89
90 elif isinstance( msgData, float ) :
91 self.displayTxtCtrl.WriteText( '\n' ) # A blank line separator.
92 self.displayTxtCtrl.WriteText( 'Processing time was [ %s ] secs.\n' % (msgData) )
93
94 elif isinstance( msgData, int ) :
95
96 if (msgData == -1) : # This flag value indicates 'Thread processing has completed'.
97 self.btn.Enable() # The GUI is now ready for another thread to start.
98 self.displayTxtCtrl.WriteText( 'Integer ThreadCompletedFlag = [ %d ]\n' % (msgData) )
99
100 else :
101 self.displayTxtCtrl.WriteText( 'Integer Calculation Result = [ %d ]\n' % (msgData) )
102 #end if
103
104 #end DisplayThreadMessages def
105
106 #end MyFrame class
107
108 #------------------------------------------------------------------------------
109 #==============================================================================
110 #------------------------------------------------------------------------------
111
112 class TestThread( Thread ) :
113 """Test Worker Thread Class."""
114
115 def __init__( self ) :
116 """Init Worker Thread Class."""
117
118 Thread.__init__( self )
119 self.start() # Start this thread
120
121 #end __init__
122
123 #----------------------------------
124
125 def run( self ) : # Do NOT change this function's name.
126 """
127 Execute the worker thread's processing.
128
129 NOTE: This function overides the Publisher function of the same name.
130 """
131
132 wx.CallAfter( Publisher().sendMessage, 'update', 'TestThread::run(): Thread started.' )
133
134 # This is the code executed in the thread.
135 for result in range( 1, 6 ) :
136
137 # Simulate a long and variable processing duration each loop.
138 processingDuration = random.randint( 100, 1500) / 1000.0 # 0.2 to 1.5 seconds
139 print processingDuration
140 time.sleep( processingDuration )
141
142 wx.CallAfter( self.PostResults, processingDuration )
143 wx.CallAfter( self.PostResults, result )
144
145 #end for
146
147 wx.CallAfter( Publisher().sendMessage, 'update', 'TestThread::run(): Thread finished.' )
148 wx.CallAfter( Publisher().sendMessage, 'update', -1 )
149
150 #end Run def
151
152 #----------------------------------
153
154 def PostResults( self, result ) :
155 """ Send time to GUI """
156
157 Publisher().sendMessage( 'update', result )
158
159 #end def
160
161 #end TestThread class
162
163 #==============================================================================
164
165 if __name__ == '__main__' :
166
167 app = wx.PySimpleApp( redirect=False )
168
169 frame = MyFrame()
170 frame.Show()
171
172 app.MainLoop()
173
174 #end if
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.