Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- How to implement a thread in a wxPython GUI application
- #!c:python27
- import wx
- import os
- import re
- import paramiko
- import string
- import fileinput
- import os.path
- import dircache
- import sys
- import time
- import datetime, time
- import wx
- from wxGui import *
- class MyApp(wx.App):
- def OnInit(self):
- frame = MyFrame("SecureTool v2.0.0", (50, 60), (458, 332))
- frame.Show()
- self.SetTopWindow(frame)
- return True
- class MyFrame(wx.Frame):
- def __init__(self, title, pos, size):
- wx.Frame.__init__(self, None, -1, title, pos, size)
- toolbar = self.CreateToolBar()
- toolbar.Realize()
- menuFile = wx.Menu()
- menuFile.Append(1, "&About...")
- menuFile.AppendSeparator()
- menuFile.Append(2, "E&xit")
- menuBar = wx.MenuBar()
- menuBar.Append(menuFile, "&File")
- menu2 = wx.Menu()
- menu2.Append(wx.NewId(), "&Copy", "Copy in status bar")
- menu2.AppendSeparator()
- menu2.Append(wx.NewId(), "C&ut", "")
- menu2.AppendSeparator()
- menu2.Append(wx.NewId(), "Paste", "")
- menu2.AppendSeparator()
- menu2.Append(wx.NewId(), "&Options...", "Display Options")
- menuBar.Append(menu2, "&Edit")
- self.SetMenuBar(menuBar)
- self.CreateStatusBar()
- self.SetStatusText("Welcome to SecureTool!")
- self.Bind(wx.EVT_MENU, self.OnAbout, id=1)
- self.Bind(wx.EVT_MENU, self.OnQuit, id=2)
- panel = wx.Panel(self)
- panel.SetBackgroundColour('LIGHT GREY')
- #Close button
- button = wx.Button(panel, label="EXIT", pos=(229, 160), size=(229, 80))
- self.Bind(wx.EVT_BUTTON, self.OnQuit, button)
- self.Bind(wx.EVT_CLOSE, self.OnCloseWindow)
- #Embed Server button
- button2 = wx.Button(panel, label="Embed Server", pos=(0, 160), size=(229, 80))
- self.Bind(wx.EVT_BUTTON, self.OnIP, button2)
- #Site Server
- button3 = wx.Button(panel, label="SITESERVER", pos=(0, 80), size=(229, 80))
- self.Bind(wx.EVT_BUTTON, self.OnUsrPswd, button3)
- #Local Search
- button4 = wx.Button(panel, label="LOCAL SEARCH", pos=(229, 80), size=(229, 80))
- self.Bind(wx.EVT_BUTTON, self.OnOpen, button4)
- EVT_RESULT(self, self.OnResult)
- self.worker = None
- def OnIP(self, event):
- ip_address = 0
- result = ''
- dlg = wx.TextEntryDialog(None, "Enter the IP Address.",
- 'Embed Server Connect', 'xxx.xxx.xxx.xxx')
- if dlg.ShowModal() == wx.ID_OK:
- ip_address = dlg.GetValue()
- if ip_address:
- cmsg = wx.MessageDialog(None, 'Do you want to connect to: ' + ip_address,
- 'Connect', wx.YES_NO | wx.ICON_QUESTION)
- result = cmsg.ShowModal()
- if result == wx.ID_YES:
- self.DispConnect(ip_address)
- cmsg.Destroy()
- dlg.Destroy()
- return True
- def OnUsrPswd(self, event):
- passwrd = 0
- result = ''
- result = wx.TextEntryDialog(None, 'Enter Weekly Password', 'Site Server login','')
- if result.ShowModal() == wx.ID_OK:
- passwrd = result.GetValue()
- if passwrd:
- psmsg = wx.MessageDialog(None, 'Do you want to connect to the Siteserver?', 'Connect',
- wx.YES_NO | wx.ICON_QUESTION)
- result = psmsg.ShowModal()
- if result == wx.ID_YES:
- self.SiteserverConnect(passwrd)
- psmsg.Destroy()
- result.Destroy()
- return True
- def ErrMsg(self):
- ermsg = wx.MessageDialog(None, 'Invalid Entry!', 'ConnectionDialog', wx.ICON_ERROR)
- ermsg.ShowModal()
- ermsg.Destroy()
- def GoodConnect(self):
- gdcnt = wx.MessageDialog(None, 'You are connected!', 'ConnectionStatus', wx.ICON_INFORMATION)
- gdcnt.ShowModal()
- #if gdcnt.ShowModal() == wx.ID_OK:
- gdcnt.Destroy()
- def OnFinish(self):
- finish = wx.MessageDialog(None, 'Job is finished!', 'WorkStatus', wx.ICON_INFORMATION)
- finish.ShowModal()
- finish.Destroy()
- def DispConnect(self, address):
- pattern = r"b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)b"
- port = 22
- user = 'root'
- password ='******'
- if re.match(pattern, address):
- ssh = paramiko.SSHClient()
- ssh.load_system_host_keys()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(address,port,user,password)
- Ssh = ssh
- self.GoodConnect()
- self.OnSearch(Ssh)
- else:
- self.ErrMsg()
- def SiteserverConnect(self, password):
- port = 22
- user = 'root2'
- address = '10.5.48.2'
- if password:
- ssh = paramiko.SSHClient()
- ssh.load_system_host_keys()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(address,port,user,password)
- Ssh = ssh
- self.GoodConnect()
- self.OnSiteSearch(Ssh)
- else:
- self.ErrMsg()
- def startWorker(self,a, b, c):
- self.button2.Disable()
- self.thread = Thread(target=self.LongRunningSearch)
- self.thread.start()
- def OnSearch(self, sssh):
- self.startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])
- self.OnFinish()
- def LongRunningSearch(sssh):
- ssh = sssh
- apath = '/'
- apattern = '"*.txt" -o -name "*.log"'
- rawcommand = 'find {path} -name "*.txt" -o -name "*.log"'
- command1 = rawcommand.format(path=apath, pattern=apattern)
- stdin, stdout, stderr = ssh.exec_command(command1)
- filelist = stdout.read().splitlines()
- ftp = ssh.open_sftp()
- for afile in filelist:
- (head, filename) = os.path.split(afile)
- paths = '/dispenser_result.log'
- temp = ftp.file(paths, 'w')
- from time import strftime
- temp.write("{0:^75}".format("Company -Security Report" ) + strftime(" %Y-%m-%d %H:%M:%S") + "nn")
- ustring = wx.TextEntryDialog(None, 'Enter a search string below:', 'Search', 'String Name')
- if ustring.ShowModal() == wx.ID_OK:
- userstring = ustring.GetValue()
- if userstring:
- userStrHEX = userstring.encode('hex')
- userStrASCII = ''.join(str(ord(char)) for char in userstring)
- regex = re.compile(r"(%s|%s|%s)" % ( re.escape( userstring ), re.escape( userStrHEX ), re.escape( userStrASCII )))
- else:
- sys.exit('You Must Enter A String!!!')
- count = 0
- for afile in filelist:
- (head, filename) = os.path.split(afile)
- if afile.endswith(".log") or afile.endswith(".txt"):
- f=ftp.open(afile, 'r')
- for i, line in enumerate(f.readlines()):
- result = regex.search(line)
- if result:
- count += 1
- ln = str(i)
- pathname = os.path.join(afile)
- template = "nnLine: {0}nFile: {1}nString Type: {2}nn"
- output = template.format(ln, pathname, result.group())
- ftp.get(afile, 'c:\Extracted\' + filename)
- temp.write(output)
- break
- else:
- #print "No Match in: " + os.path.join(afile)
- temp.write("nNo Match in: " + os.path.join(afile))
- f.close()
- for fnum in filelist:
- #print "nFiles Searched: ", len(filelist)
- #print "Files Matched: ", count
- num = len(filelist)
- temp.write("nnFiles Searched:" + '%sn' % (num))
- temp.write("Files Matched:"+ '%sn' % (count))
- temp.write("Search String:"+ '%sn' % (userstring))
- break
- temp.close()
- defaultFolder = "DispenserLogResults"
- if not defaultFolder.endswith(':') and not os.path.exists('c:\Extracted\DispenserLogResults'):
- os.mkdir('c:\Extracted\DispenserLogResults')
- else:
- pass
- ftp.get(paths, 'c:\Extracted\DispenserLogResults\dispenser_result.log')
- ftp.remove(paths)
- re.purge()
- ftp.close()
- ssh.close()
- def OnSiteSearch(self, sssh):
- ssh = sssh
- apath = '/var/log/apache2 /var/opt/smartmerch/log/'
- apattern = '"*.log"'
- rawcommand = 'find {path} -type f -name "*.log"'
- command1 = rawcommand.format(path=apath, pattern=apattern)
- stdin, stdout, stderr = ssh.exec_command(command1)
- filelist = stdout.read().splitlines()
- ftp = ssh.open_sftp()
- for afile in filelist:
- (head, filename) = os.path.split(afile)
- paths = '/var/tmp/siteserver_result.log'
- temp = ftp.file(paths, 'w')
- from time import strftime
- temp.write("{0:^75}".format("Gilbarco - SQA Security Report" ) + strftime(" %Y-%m-%d %H:%M:%S") + "nn")
- temp.write("n{0:^75}".format("SiteServer Logs" ))
- ustring = wx.TextEntryDialog(None, 'Enter a search string below:', 'Search', 'String Name')
- if ustring.ShowModal() == wx.ID_OK:
- userstring = ustring.GetValue()
- if userstring:
- userStrHEX = userstring.encode('hex')
- userStrASCII = ''.join(str(ord(char)) for char in userstring)
- regex = re.compile(r"(%s|%s|%s)" % ( re.escape( userstring ), re.escape( userStrHEX ), re.escape( userStrASCII )))
- else:
- sys.exit('You Must Enter A String!!!')
- count = 0
- for afile in filelist:
- (head, filename) = os.path.split(afile)
- if afile.endswith(".log") or afile.endswith(".txt"):
- f=ftp.open(afile, 'r')
- for i, line in enumerate(f.readlines()):
- result = regex.search(line)
- if result:
- count += 1
- ln = str(i)
- pathname = os.path.join(afile)
- template = "nnLine: {0}nFile: {1}nString Type: {2}nn"
- output = template.format(ln, pathname, result.group())
- ftp.get(afile, 'c:\Extracted\' + filename)
- temp.write(output)
- break
- else:
- temp.write("nNo Match in: " + os.path.join(afile))
- f.close()
- for fnum in filelist:
- num = len(filelist)
- temp.write("nnFiles Searched:" + '%sn' % (num))
- temp.write("Files Matched:"+ '%sn' % (count))
- temp.write("Search String:"+ '%sn' % (userstring))
- break
- temp.close()
- defaultFolder = "SiteServerLogResults"
- if not defaultFolder.endswith(':') and not os.path.exists('c:\Extracted\SiteServerLogResults'):
- os.mkdir('c:\Extracted\SiteServerLogResults')
- else:
- pass
- ftp.get(paths, 'c:\Extracted\SiteServerLogResults\siteserver_result.log')
- ftp.remove(paths)
- re.purge()
- ftp.close()
- ssh.close()
- self.OnFinish()
- def OnOpen(self,e):
- self.dirname = ''
- dlg = wx.FileDialog(self, "Choose a file", self.dirname, "", "*.*", wx.OPEN)
- if dlg.ShowModal() == wx.ID_OK:
- self.filename = dlg.GetFilename()
- self.dirname = dlg.GetDirectory()
- f = open(os.path.join(self.dirname, self.filename), 'r')
- self.control.SetValue(f.read())
- f.close()
- dlg.Destroy()
- def OnQuit(self, event):
- self.Close(True)
- def OnAbout(self, event):
- wx.MessageBox("This is sQAST v2.0.0",
- "About secureTool", wx.OK | wx.ICON_INFORMATION, self)
- def OnCloseWindow(self, event):
- self.Destroy()
- if __name__ == '__main__':
- app = MyApp(False)
- app.MainLoop()
- Traceback (most recent call last):
- File "C:SQA_logwxGui.py", line 87, in OnIP
- self.DispConnect(ip_address)
- File "C:SQA_logwxGui.py", line 143, in DispConnect
- self.OnSearch(Ssh)
- File "C:SQA_logwxGui.py", line 169, in OnSearch
- self.startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])
- def OnSearch(self, sssh):
- self.LongRunningSearch(sssh) # Move all the blocking code here,
- # just NOT the GUI reaction !
- # Meaning self.OnFinish()...
- self.OnFinish()
- def OnSearch(self, sssh):
- startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])
- from time import sleep
- import wx
- from wx.lib.delayedresult import startWorker
- class MainWindow(wx.Frame):
- def __init__(self, *args, **kwargs):
- wx.Frame.__init__(self, *args, **kwargs)
- self.panel = wx.Panel(self)
- self.startButton = wx.Button(self.panel, label="Long Task")
- self.abortButton = wx.Button(self.panel, label="Abort")
- self.abortButton.Disable()
- self.gauge = wx.Gauge(self.panel, size=(-1, 20))
- self.shouldAbort = False
- self.startButton.Bind(wx.EVT_BUTTON, self.OnStartButton)
- self.abortButton.Bind(wx.EVT_BUTTON, self.OnAbortButton)
- self.windowSizer = wx.BoxSizer()
- self.windowSizer.Add(self.panel, 1, wx.ALL | wx.EXPAND)
- self.sizer = wx.BoxSizer(wx.VERTICAL)
- self.sizer.Add(self.startButton)
- self.sizer.Add(self.abortButton)
- self.sizer.Add((10, 10))
- self.sizer.Add(self.gauge)
- self.border = wx.BoxSizer()
- self.border.Add(self.sizer, 1, wx.ALL | wx.EXPAND, 5)
- self.panel.SetSizerAndFit(self.border)
- self.SetSizerAndFit(self.windowSizer)
- self.Show()
- def OnStartButton(self, e):
- self.startButton.Disable()
- self.abortButton.Enable()
- startWorker(self.LongTaskDone, self.LongTask)
- def OnAbortButton(self, e):
- self.shouldAbort = True
- def LongTask(self):
- for a in range(101):
- sleep(0.05)
- wx.CallAfter(self.gauge.SetValue, a)
- if self.shouldAbort:
- break
- return self.shouldAbort
- def LongTaskDone(self, result):
- r = result.get()
- if r:
- print("Aborted!")
- else:
- print("Ended!")
- self.startButton.Enable()
- self.abortButton.Disable()
- self.shouldAbort = False
- self.gauge.SetValue(0)
- app = wx.App(False)
- win = MainWindow(None)
- app.MainLoop()
- from time import sleep
- from threading import Thread
- import wx
- class MainWindow(wx.Frame):
- def __init__(self, *args, **kwargs):
- wx.Frame.__init__(self, *args, **kwargs)
- self.panel = wx.Panel(self)
- self.startButton = wx.Button(self.panel, label="Long Task")
- self.abortButton = wx.Button(self.panel, label="Abort")
- self.abortButton.Disable()
- self.gauge = wx.Gauge(self.panel, size=(-1, 20))
- self.shouldAbort = False
- self.thread = None
- self.startButton.Bind(wx.EVT_BUTTON, self.OnStartButton)
- self.abortButton.Bind(wx.EVT_BUTTON, self.OnAbortButton)
- self.windowSizer = wx.BoxSizer()
- self.windowSizer.Add(self.panel, 1, wx.ALL | wx.EXPAND)
- self.sizer = wx.BoxSizer(wx.VERTICAL)
- self.sizer.Add(self.startButton)
- self.sizer.Add(self.abortButton)
- self.sizer.Add((10, 10))
- self.sizer.Add(self.gauge)
- self.border = wx.BoxSizer()
- self.border.Add(self.sizer, 1, wx.ALL | wx.EXPAND, 5)
- self.panel.SetSizerAndFit(self.border)
- self.SetSizerAndFit(self.windowSizer)
- self.Show()
- def OnStartButton(self, e):
- self.startButton.Disable()
- self.abortButton.Enable()
- self.thread = Thread(target=self.LongTask)
- self.thread.start()
- def OnAbortButton(self, e):
- self.shouldAbort = True
- def LongTask(self):
- for a in range(101):
- sleep(0.05)
- wx.CallAfter(self.gauge.SetValue, a)
- if self.shouldAbort:
- break
- wx.CallAfter(self.LongTaskDone, self.shouldAbort)
- def LongTaskDone(self, r):
- if r:
- print("Aborted!")
- else:
- print("Ended!")
- self.startButton.Enable()
- self.abortButton.Disable()
- self.shouldAbort = False
- self.gauge.SetValue(0)
- app = wx.App(False)
- win = MainWindow(None)
- app.MainLoop()
Add Comment
Please, Sign In to add comment