当前位置: 移动技术网 > IT编程>开发语言>C/C++ > QThread 线程终止

QThread 线程终止

2020年07月30日  | 移动技术网IT编程  | 我要评论

QThread

使用PyQt5开发应用程序时,处理耗时较长的任务时为避免主程序卡死就要用到QThread开子线程,有时子线程是去执行For/While 循环任务,我们如果想临时终止线程,就可以增加一个属性用作判定是否要临时终止的标志位,每轮循环开始前做一次校验判定是否要退出。

示例

以下示例是做LED循环开关及调光测试的小程序,循环次数通常都设置在数万次,运行后如发现DUT表现异常操作员就要终止测试。
Cycle

代码

主程序代码(片段)

	self.ui.dimmer_cycle_exec.clicked.connect(self.dimmer_cycle)
	self.ui.dimmer_cycle_terminate.clicked.connect(self.cycle_stop)

 ##Cycle
    def dimmer_cycle(self):
        self.cycle = cyclingCtrl()
        self.cycle.setTerminationEnabled()
        self.cycle.dimer_targets = self.ui.dimer_targets.text()
        self.cycle.dimmer_hold_time = self.ui.dimmer_hold_time.text()
        self.cycle.dimmer_cycle_times = self.ui.dimmer_cycle_times.text()
        self.cycle.port = self.ui.cycle_ser_port.currentText()
        self.cycle.dimmer_cycle_exec_is_clicked = True
        self.cycle.flare.connect(self.sinShow)
        if self.ser:
            self.cycle.ser = self.ser
        self.cycle.start()

    def cycle_stop(self):
        # 点击”终止“按钮后将子线程 stop_flag属性置1,终止循环
        self.cycle.stop_flag = 1
        self.ui.textBrowser.append("\t循环终止!")

子线程代码

# -*- coding:utf-8 -*-
from PyQt5.QtCore import *
import serial
import time
import datetime
import logging
from math import ceil


class CyclingCtrl(QThread):
    
    """
   Cycling control devices to test the performace.
    """
    
    flare = pyqtSignal(str)

    def __init__(self):
        super(CyclingCtrl, self).__init__()
        self.dimer_targets = None
        self.dimmer_hold_time = None
        self.dimmer_cycle_times = None
        self.dimmer_cycle_exec_is_clicked = False

        self.led_onoff_cycle_targets = None
        self.led_onoff_transtime = None
        self.led_onoff_cycle_times = None
        self.led_onoff_cycle_exec_is_clicked = False

        self.turn_onoff_cycle_targets = None
        self.turn_onoff_hold_time = None
        self.turn_onoff_cycle_times = None
        self.turn_onoff_cycle_exec_is_clicked = False

        self.ser = None
        self.port = None

        self.CycleCtrl_log_file = None
        self.stop_flag = 0  # 线程终止信号,主线程通过修改子线程的该属性控制线程终止

    def run(self):
        if self.ser is None:
            self.ser = serial.Serial(port=self.port, baudrate=115200, timeout=3)
        elif self.ser.is_open:
            self.flare.emit("%s 连接成功" % self.port)
        else:
            self.flare.emit("%s 连接失败" % self.port)

        if self.dimmer_cycle_exec_is_clicked:
            self.flare.emit("开始执行LED dimmer Cycling 程序")
            self.led_dimmer_cycling()
        elif self.led_onoff_cycle_exec_is_clicked:
            self.flare.emit("开始执行LED OnOff Cycling 程序")
            self.led_onoff_cycling()
        elif self.turn_onoff_cycle_exec_is_clicked:
            self.flare.emit("开始执行Turn OnOff Cycling 程序")
            self.turn_onoff_cycling()
        else:
            self.flare.emit("循环控制指令发送失败")

    def led_dimmer_cycling(self):
        logDate = datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss")
        self.CycleCtrl_log_file = "./logs/log%s_dimmerCycling_logs.log" % logDate
        logging.basicConfig(filename=self.CycleCtrl_log_file, level=logging.INFO, format="%(asctime)s :\n%(message)s")
        for cycle in range(0, int(self.dimmer_cycle_times)+1):
            for i in range(0, 21):
                if self.stop_flag:
                    break   # 用户点击 “Stop" 时主线程将self.stop_flag置1,子线程循环终止
                if i < 11:
                    # move level up from 0% to 100%
                    self.ser.write(("\r\n" + "zcl level-control o-mv-to-level {0} 0\r\n".format(
                        ceil(i * 25.5)) + "send {} 1 1\r\n".format(self.dimer_targets)).encode('utf-8'))
                    self.flare.emit("%s\tCycle-%d: o-mv-to-levle=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, i*10))
                    logging.info("%s\tCycle-%d: o-mv-to-levle=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, i*10))
                    time.sleep(int(self.dimmer_hold_time))
                else:
                    # move level down from 90% to 0%
                    self.ser.write(("\r\n" + "zcl level-control o-mv-to-level {0} 0\r\n".format(
                        ceil(510-ceil(i * 25.5))) + "send {} 1 1\r\n".format(self.dimer_targets)).encode('utf-8'))
                    self.flare.emit("%s\tCycle-%d:o-mv-to-level=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, (20-i)*10))
                    logging.info("%s\tCycle-%d:o-mv-to-level=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, (20-i)*10))
                    time.sleep(int(self.dimmer_hold_time))

    def cycling_pause(self):
        # TODO:暂停线程
        self.flare.emit("Dimmer循环暂停!")
        pass

    def cycling_continue(self):
        # TODO: 线程继续执行
        pass

TODO

线程挂起、继续还没找到比较靠谱的实现方法。

本文地址:https://blog.csdn.net/willyan2002/article/details/107653906

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网