当前位置: 移动技术网 > IT编程>脚本编程>Python > python3+PyQt5实现支持多线程的页面索引器应用程序

python3+PyQt5实现支持多线程的页面索引器应用程序

2019年06月17日  | 移动技术网IT编程  | 我要评论

本文通过python3+pyqt5实现了python qt gui 快速编程的19章的页面索引器应用程序例子。

/home/yrd/eric_workspace/chap19/walker_ans.py

#!/usr/bin/env python3

import codecs
import html.entities
import re
import sys
from pyqt5.qtcore import (qmutex, qthread,pyqtsignal,qt)

class walker(qthread):
 finished = pyqtsignal(bool,int)
 indexed = pyqtsignal(str,int)
 common_words_threshold = 250
 min_word_len = 3
 max_word_len = 25
 invalid_first_or_last = frozenset("0123456789_")
 striphtml_re = re.compile(r"<[^>]*?>", re.ignorecase|re.multiline)
 entity_re = re.compile(r"&(\w+?);|&#(\d+?);")
 split_re = re.compile(r"\w+", re.ignorecase|re.multiline)

 def __init__(self, index, lock, files, filenamesforwords,
     commonwords, parent=none):
  super(walker, self).__init__(parent)
  self.index = index
  self.lock = lock
  self.files = files
  self.filenamesforwords = filenamesforwords
  self.commonwords = commonwords
  self.stopped = false
  self.mutex = qmutex()
  self.completed = false


 def stop(self):
  try:
   self.mutex.lock()
   self.stopped = true
  finally:
   self.mutex.unlock()


 def isstopped(self):
  try:
   self.mutex.lock()
   return self.stopped
  finally:
   self.mutex.unlock()


 def run(self):
  self.processfiles()
  self.stop()
  self.finished.emit(self.completed,self.index)


 def processfiles(self):
  def unichrfromentity(match):
   text = match.group(match.lastindex)
   if text.isdigit():
    return chr(int(text))
   u = html.entities.name2codepoint.get(text)
   return chr(u) if u is not none else ""

  for fname in self.files:
   if self.isstopped():
    return
   words = set()
   fh = none
   try:
    fh = codecs.open(fname, "r", "utf8", "ignore")
    text = fh.read()
   except environmenterror as e:
    sys.stderr.write("error: {0}\n".format(e))
    continue
   finally:
    if fh is not none:
     fh.close()
   if self.isstopped():
    return
   text = self.striphtml_re.sub("", text)
   text = self.entity_re.sub(unichrfromentity, text)
   text = text.lower()
   for word in self.split_re.split(text):
    if (self.min_word_len <= len(word) <=
     self.max_word_len and
     word[0] not in self.invalid_first_or_last and
     word[-1] not in self.invalid_first_or_last):
     try:
      self.lock.lockforread()
      new = word not in self.commonwords
     finally:
      self.lock.unlock()
     if new:
      words.add(word)
   if self.isstopped():
    return
   for word in words:
    try:
     self.lock.lockforwrite()
     files = self.filenamesforwords[word]
     if len(files) > self.common_words_threshold:
      del self.filenamesforwords[word]
      self.commonwords.add(word)
     else:
      files.add(str(fname))
    finally:
     self.lock.unlock()
   self.indexed.emit(fname,self.index)
  self.completed = true


/home/yrd/eric_workspace/chap19/pageindexer_ans.pyw

#!/usr/bin/env python3

import collections
import os
import sys
from pyqt5.qtcore import (qdir, qreadwritelock, qmutex,qt)
from pyqt5.qtwidgets import (qapplication, qdialog, qfiledialog, qframe,
        qhboxlayout, qlcdnumber, qlabel, qlineedit, qlistwidget,
        qpushbutton, qvboxlayout)
import walker_ans as walker


def isalive(qobj):
 import sip
 try:
  sip.unwrapinstance(qobj)
 except runtimeerror:
  return false
 return true


class form(qdialog):

 def __init__(self, parent=none):
  super(form, self).__init__(parent)

  self.mutex = qmutex()
  self.filecount = 0
  self.filenamesforwords = collections.defaultdict(set)
  self.commonwords = set()
  self.lock = qreadwritelock()
  self.path = qdir.homepath()
  pathlabel = qlabel("indexing path:")
  self.pathlabel = qlabel()
  self.pathlabel.setframestyle(qframe.styledpanel|qframe.sunken)
  self.pathbutton = qpushbutton("set &path...")
  self.pathbutton.setautodefault(false)
  findlabel = qlabel("&find word:")
  self.findedit = qlineedit()
  findlabel.setbuddy(self.findedit)
  commonwordslabel = qlabel("&common words:")
  self.commonwordslistwidget = qlistwidget()
  commonwordslabel.setbuddy(self.commonwordslistwidget)
  fileslabel = qlabel("files containing the &word:")
  self.fileslistwidget = qlistwidget()
  fileslabel.setbuddy(self.fileslistwidget)
  filesindexedlabel = qlabel("files indexed")
  self.filesindexedlcd = qlcdnumber()
  self.filesindexedlcd.setsegmentstyle(qlcdnumber.flat)
  wordsindexedlabel = qlabel("words indexed")
  self.wordsindexedlcd = qlcdnumber()
  self.wordsindexedlcd.setsegmentstyle(qlcdnumber.flat)
  commonwordslcdlabel = qlabel("common words")
  self.commonwordslcd = qlcdnumber()
  self.commonwordslcd.setsegmentstyle(qlcdnumber.flat)
  self.statuslabel = qlabel("click the 'set path' "
         "button to start indexing")
  self.statuslabel.setframestyle(qframe.styledpanel|qframe.sunken)

  toplayout = qhboxlayout()
  toplayout.addwidget(pathlabel)
  toplayout.addwidget(self.pathlabel, 1)
  toplayout.addwidget(self.pathbutton)
  toplayout.addwidget(findlabel)
  toplayout.addwidget(self.findedit, 1)
  leftlayout = qvboxlayout()
  leftlayout.addwidget(fileslabel)
  leftlayout.addwidget(self.fileslistwidget)
  rightlayout = qvboxlayout()
  rightlayout.addwidget(commonwordslabel)
  rightlayout.addwidget(self.commonwordslistwidget)
  middlelayout = qhboxlayout()
  middlelayout.addlayout(leftlayout, 1)
  middlelayout.addlayout(rightlayout)
  bottomlayout = qhboxlayout()
  bottomlayout.addwidget(filesindexedlabel)
  bottomlayout.addwidget(self.filesindexedlcd)
  bottomlayout.addwidget(wordsindexedlabel)
  bottomlayout.addwidget(self.wordsindexedlcd)
  bottomlayout.addwidget(commonwordslcdlabel)
  bottomlayout.addwidget(self.commonwordslcd)
  bottomlayout.addstretch()
  layout = qvboxlayout()
  layout.addlayout(toplayout)
  layout.addlayout(middlelayout)
  layout.addlayout(bottomlayout)
  layout.addwidget(self.statuslabel)
  self.setlayout(layout)

  self.walkers = []
  self.completed = []
  self.pathbutton.clicked.connect(self.setpath)
  self.findedit.returnpressed.connect(self.find)
  self.setwindowtitle("page indexer")


 def stopwalkers(self):
  for walker in self.walkers:
   if isalive(walker) and walker.isrunning():
    walker.stop()
  for walker in self.walkers:
   if isalive(walker) and walker.isrunning():
    walker.wait()
  self.walkers = []
  self.completed = []


 def setpath(self):
  self.stopwalkers()
  self.pathbutton.setenabled(false)
  path = qfiledialog.getexistingdirectory(self,
     "choose a path to index", self.path)
  if not path:
   self.statuslabel.settext("click the 'set path' "
          "button to start indexing")
   self.pathbutton.setenabled(true)
   return
  self.statuslabel.settext("scanning directories...")
  qapplication.processevents() # needed for windows
  self.path = qdir.tonativeseparators(path)
  self.findedit.setfocus()
  self.pathlabel.settext(self.path)
  self.statuslabel.clear()
  self.fileslistwidget.clear()
  self.filecount = 0
  self.filenamesforwords = collections.defaultdict(set)
  self.commonwords = set()
  nofilesfound = true
  files = []
  index = 0
  for root, dirs, fnames in os.walk(str(self.path)):
   for name in [name for name in fnames
       if name.endswith((".htm", ".html"))]:
    files.append(os.path.join(root, name))
    if len(files) == 1000:
     self.processfiles(index, files[:])
     files = []
     index += 1
     nofilesfound = false
  if files:
   self.processfiles(index, files[:])
   nofilesfound = false
  if nofilesfound:
   self.finishedindexing()
   self.statuslabel.settext(
     "no html files found in the given path")


 def processfiles(self, index, files):
  thread = walker.walker(index, self.lock, files,
    self.filenamesforwords, self.commonwords, self)
  thread.indexed[str,int].connect(self.indexed)
  thread.finished[bool,int].connect(self.finished)
  thread.finished.connect(thread.deletelater)
  self.walkers.append(thread)
  self.completed.append(false)
  thread.start()
  thread.wait(300) # needed for windows


 def find(self):
  word = str(self.findedit.text())
  if not word:
   try:
    self.mutex.lock()
    self.statuslabel.settext("enter a word to find in files")
   finally:
    self.mutex.unlock()
   return
  try:
   self.mutex.lock()
   self.statuslabel.clear()
   self.fileslistwidget.clear()
  finally:
   self.mutex.unlock()
  word = word.lower()
  if " " in word:
   word = word.split()[0]
  try:
   self.lock.lockforread()
   found = word in self.commonwords
  finally:
   self.lock.unlock()
  if found:
   try:
    self.mutex.lock()
    self.statuslabel.settext("common words like '{0}' "
      "are not indexed".format(word))
   finally:
    self.mutex.unlock()
   return
  try:
   self.lock.lockforread()
   files = self.filenamesforwords.get(word, set()).copy()
  finally:
   self.lock.unlock()
  if not files:
   try:
    self.mutex.lock()
    self.statuslabel.settext("no indexed file contains "
      "the word '{0}'".format(word))
   finally:
    self.mutex.unlock()
   return
  files = [qdir.tonativeseparators(name) for name in
     sorted(files, key=str.lower)]
  try:
   self.mutex.lock()
   self.fileslistwidget.additems(files)
   self.statuslabel.settext(
     "{0} indexed files contain the word '{1}'".format(
     len(files), word))
  finally:
   self.mutex.unlock()


 def indexed(self, fname, index):
  try:
   self.mutex.lock()
   self.statuslabel.settext(fname)
   self.filecount += 1
   count = self.filecount
  finally:
   self.mutex.unlock()
  if count % 25 == 0:
   try:
    self.lock.lockforread()
    indexedwordcount = len(self.filenamesforwords)
    commonwordcount = len(self.commonwords)
   finally:
    self.lock.unlock()
   try:
    self.mutex.lock()
    self.filesindexedlcd.display(count)
    self.wordsindexedlcd.display(indexedwordcount)
    self.commonwordslcd.display(commonwordcount)
   finally:
    self.mutex.unlock()
  elif count % 101 == 0:
   try:
    self.lock.lockforread()
    words = self.commonwords.copy()
   finally:
    self.lock.unlock()
   try:
    self.mutex.lock()
    self.commonwordslistwidget.clear()
    self.commonwordslistwidget.additems(sorted(words))
   finally:
    self.mutex.unlock()


 def finished(self, completed, index):
  done = false
  if self.walkers:
   self.completed[index] = true
   if all(self.completed):
    try:
     self.mutex.lock()
     self.statuslabel.settext("finished")
     done = true
    finally:
     self.mutex.unlock()
  else:
   try:
    self.mutex.lock()
    self.statuslabel.settext("finished")
    done = true
   finally:
    self.mutex.unlock()
  if done:
   self.finishedindexing()


 def reject(self):
  if not all(self.completed):
   self.stopwalkers()
   self.finishedindexing()
  else:
   self.accept()


 def closeevent(self, event=none):
  self.stopwalkers()


 def finishedindexing(self):
  self.filesindexedlcd.display(self.filecount)
  self.wordsindexedlcd.display(len(self.filenamesforwords))
  self.commonwordslcd.display(len(self.commonwords))
  self.pathbutton.setenabled(true)
  qapplication.processevents() # needed for windows


app = qapplication(sys.argv)
form = form()
form.show()
app.exec_()

运行结果:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网