Python多线程之Queue

hresh 426 0

Python多线程之Queue

本文希望达到的目标:

  • 学习 Queue 模块
  • 了解多线程同步与 Queue 的关系
  • 理解 Queue 队列中 join()与 task_done()的关系

1.Queue 模块

Queue 模块实现了多生产者多消费者队列, 尤其适合多线程编程.Queue 类中实现了所有需要的锁原语。该模块实现了三种类型的队列,它们的区别仅在于检索条目的顺序。在 FIFO 队列中,添加的第一个任务是第一个检索的任务。在 LIFO 队列中,最近添加的条目是第一个检索的(像堆栈一样运行)。使用优先级队列,条目将保持排序(使用 heapq 模块),并首先检索最低值的条目。

2.Queue 对象

队列对象(Queue,LifoQueue 或 PriorityQueue)提供下面描述的公共方法。

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False) Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号,告知等待的队列(Queue.join()这里在等待)任务的处理已完成。
  • Queue.join()实际上意味着等到队列为空,再执行别的操作

3.Queue 模块与线程实现线程同步

import threading
import time
import queue

def get_num(workQueue):
    print("Starting " + threading.currentThread().name)
    while True:
        if workQueue.empty():
            break
        num = workQueue.get_nowait()
        print('Current Thread Name %s, Url: %s ' % (threading.currentThread().name, num))
        time.sleep(0.3)

        # try:
        #     num = workQueue.get_nowait()
        #     if not num:
        #         break
        #     print('Current Thread Name %s, Url: %s ' % (threading.currentThread().name, num))
        # except:
        #     break
        # time.sleep(0.3)
    print("Ending " + threading.currentThread().name)


if __name__ == '__main__':
    start_time = time.time()
    workQueue = queue.Queue()

    for i in range(1,10):
        workQueue.put(i)

    threads = []
    thread_num = 4  #线程数
    for i in range(thread_num):
        t = threading.Thread(target=get_num, args=(workQueue,))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    end_time = time.time()
    print('耗时{}s'.format((end_time - start_time)))

输出结果:

Starting Thread-1
Current Thread Name Thread-1, data: 1, time: Sat May 11 10:48:27 2019 
Starting Thread-2
Current Thread Name Thread-2, data: 2, time: Sat May 11 10:48:27 2019 
Starting Thread-3
Current Thread Name Thread-3, data: 3, time: Sat May 11 10:48:27 2019 
Starting Thread-4
Current Thread Name Thread-4, data: 4, time: Sat May 11 10:48:27 2019 
Current Thread Name Thread-2, data: 5, time: Sat May 11 10:48:27 2019 
Current Thread Name Thread-1, data: 6, time: Sat May 11 10:48:27 2019 
Current Thread Name Thread-4, data: 7, time: Sat May 11 10:48:27 2019 
Current Thread Name Thread-3, data: 8, time: Sat May 11 10:48:27 2019 
Current Thread Name Thread-2, data: 9, time: Sat May 11 10:48:28 2019 
Ending Thread-1
Ending Thread-4
Ending Thread-3
Ending Thread-2
耗时0.9028356075286865s

4.Queue队列中 join()与task_done()的使用

关于 join() 和 task_done()方法的使用,在网上经过查阅后,参考危险的小熊的博客内容,以生产者消费者示例代码来演示。

from threading import Thread
import time
import random
from queue import Queue
from collections import deque
import threading

# 创建队列,设置队列最大数限制为3个
queue = Queue(3)
threadLock = threading.Lock()

# 生产者线程
class Pro_Thread(Thread):
    def run(self):
        # 原材料准备,等待被生产
        tasks = deque([1, 2, 3, 4, 5, 6, 7, 8])
        global queue
        while True:
            try:
                # 从原材料左边开始生产,如果tasks中没有元素,调用popleft()则会抛出错误
                task = tasks.popleft()
                queue.put(task)
                print("生产", task, "现在队列数:", queue.qsize())

                # 休眠随机时间
                time.sleep(random.random())
            # 如果原材料被生产完,生产线程跳出循环
            except IndexError:
                break
        print("生产完毕", "现在队列数:", queue.qsize())


# 消费者线程
class Con_Thread(Thread):
    def run(self):
        global queue
        global threadLock
        while True:
            # threadLock.acquire()
            if not queue.empty():
                # 通过get(),这里已经将队列减去了1
                task = queue.get()
                time.sleep(2)
                # 发出完成的信号,不发的话,join会永远阻塞,程序不会停止
                queue.task_done()
                print('Current Thread Name {0},--消费-- {1}, 现在队列数: {2}'.format(threading.currentThread().name,task, queue.qsize()))
                # threadLock.release()
            else:
                # threadLock.release()
                break


# r入口方法,主线程
def main():
    start_time = time.time()
    Pro_1 = Pro_Thread()
    # 启动线程
    Pro_1.start()
    # 这里休眠一秒钟,等到队列有值,否则队列创建时是空的,主线程直接就结束了,实验失败,造成误导
    time.sleep(1)

    tasks = [Con_Thread() for x in range(2)]
    for t in tasks:
        # 启动线程
        t.start()

    for t in tasks:
        t.join()

    global queue
    # 接收信号,主线程在这里等待队列被处理完毕后再做下一步
    queue.join()
    print("消费完毕")
    print("主线程结束")
    end_time = time.time()
    print('耗时{}s'.format((end_time - start_time)))


if __name__ == '__main__':
    main()

输出结果:

生产 1 现在队列数: 1
生产 2 现在队列数: 2
生产 3 现在队列数: 3
生产 4 现在队列数: 3
生产 5 现在队列数: 3
Current Thread Name Thread-2,--消费-- 1, 现在队列数: 3
生产 6 现在队列数: 3
Current Thread Name Thread-3,--消费-- 2, 现在队列数: 3
生产 7 现在队列数: 3
Current Thread Name Thread-2,--消费-- 3, 现在队列数: 3
生产 8 现在队列数: 3
Current Thread Name Thread-3,--消费-- 4, 现在队列数: 3
生产完毕 现在队列数: 2
Current Thread Name Thread-2,--消费-- 5, 现在队列数: 2
Current Thread Name Thread-3,--消费-- 6, 现在队列数: 1
Current Thread Name Thread-2,--消费-- 7, 现在队列数: 0
Current Thread Name Thread-3,--消费-- 8, 现在队列数: 0
消费完毕
主线程结束
耗时9.004081010818481s

在该实例中,生产者与消费者模型符合我们实际生活中的过程,设置队列大小为3,说明生产者库存最多为3;而线程数为2,代表两个消费者,保证了生产速度不小于消费速度。

最后值得注意的是,threading.Thread().join()方法和queue.join)()的区别:线程的join()是主线程等待子线程的执行完毕再执行;队列的join()是主线程等待队列中的任务都消耗完再执行

扩展

import threading
import queue
import time

thread_num = 4
class MyThread1(threading.Thread):
    def __init__(self, que):
        threading.Thread.__init__(self)
        self.que = que

    def run(self):
        while True:
            if not self.que.empty():
                item = self.que.get()
                print('Current Thread Name %s, data: %s ' % (threading.currentThread().name, item))
                self.que.task_done()
            else:
                break

class MyThread2(threading.Thread):
    '''
    如果生产者速度与消费者速度相当,或者生产速度小于消费速度,则靠task_done()来实现队列减一则不靠谱,队列会时常处于供不应求的状态,常为empty,所以用empty来判断则不靠谱。
     那么这种情况会导致 join可以判断出队列结束了,但是线程里不能依靠empty()来判断线程是否可以结束。
    '''
    def __init__(self, que):
        threading.Thread.__init__(self)
        self.que = que

    def run(self):
        while True:
            item = self.que.get()
            # 这里要放到判断前,否则取最后最后一个的时候已经为空,直接break,task_done执行不了,join()判断队列一直没结束
            self.que.task_done()
            if item == None:
                break
            print('Current Thread Name %s, data: %s ' % (threading.currentThread().name, item))

def thread_test1(que):
    tasks = [MyThread1(que) for x in range(thread_num)]

    for t in tasks:
        t.start()

    for t in tasks:
        t.join()

    que.join()

def thread_test2(que):
    tasks = [MyThread2(que) for x in range(thread_num)]

    for t in tasks:
        t.start()

    for t in tasks:
        t.que.put(None)

    que.join()

if __name__ == '__main__':
    start_time = time.time()

    que = queue.Queue()
    for i in range(1, 200):
        que.put(i)
    #使用Queue.task_done()方法,在重写Thread的run()方法时,有两个不同的写法,都没有错
    thread_test1(que)
    # thread_test2(que)

    print('--------success--------')
    end_time = time.time()
    print('耗时{}s'.format((end_time - start_time)))

经测试,thread_test1()和thread_test2()两种方法都能正常运行。

发表评论 取消回复
表情 图片 链接 代码

分享