如何使用Python实现多线程队列

介绍..

在此示例中,我们将创建一个任务队列,其中包含所有要执行的任务,以及一个线程池,该线程池与该队列进行交互以分别处理其元素。

我们将从问题开始,什么是队列?队列是一种数据结构,是按非常特定的顺序维护的不同元素的集合。让我以现实生活为例进行说明。

假设您排队在杂货店柜台为杂货店付款(不要问我哪个杂货店)

在等待付款的人群中,您会注意到以下几点:

1.人们从线路的一端进入,从另一端退出。

2.如果人A在人B之前进入线路,则人A将在人B之前离开线路(除非人B是名人或具有更高的优先级)。

3.每个人都付完帐单后,排队的人将一无所有。

好了,回到队列以类似方式工作的编程。

1.入队-添加到队列末尾的元素。

2.出队-从队列开头删除的元素。

还有更多的先进先出(FIFO)-首先添加的元素将被首先删除。后进先出(LIFO)-最后添加的元素将被首先删除。

Python如何实现Queue数据结构?

Python中的队列模块提供了队列数据结构的简单实现。每个队列可以具有以下方法。

  • get():返回下一个元素。

  • put():添加一个新元素。

  • qsize():队列中当前元素的数量。

  • empty():返回一个布尔值,指示队列是否为空。

  • full():返回一个布尔值,指示队列是否已满。

1.我们将创建一个函数,该函数接受参数x,然后遍历1与本身(x)之间的数字以执行乘法。例如,当您将5传递给该函数时,它将迭代1到5,并保持相乘,即1乘5、2乘5、3乘5、4乘5、5乘5最终返回值作为列表。

示例

def print_multiply(x):
output_value = []
for i in range(1, x + 1):
output_value.append(i * x)
print(f"Output \n *** The multiplication result for the {x} is - {output_value}")
print_multiply(5)

输出结果

*** The multiplication result for the 5 is - [5, 10, 15, 20, 25]

2.我们将编写另一个名为process_queue()的函数,该函数将尝试获取队列对象的下一个元素。逻辑很简单,一直传递元素直到队列为空。我将使用睡眠来延迟进行一些操作。

示例

def process_queue():
while True:
try:
value = my_queue.get(block=False)
except queue.Empty:
return
else:
print_multiply(value)
time.sleep(2)

3.创建一个类,在初始化并启动新实例时,将调用process_queue()函数。

示例

class MultiThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name

def run(self):
print(f" ** Starting the thread - {self.name}")
process_queue()
print(f" ** Completed the thread - {self.name}")

4.最后,我们将传递数字输入列表并填充队列。

# setting up variables
input_values = [5, 10, 15, 20]

# fill the queue
my_queue = queue.Queue()
for x in input_values:
my_queue.put(x)

5.最后,将所有内容放在一起。

import queue
import threading
import time

# Class
class MultiThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name

def run(self):
print(f"Output \n ** Starting the thread - {self.name}")
process_queue()
print(f" ** Completed the thread - {self.name}")

# Process thr queue
def process_queue():
while True:
try:
value = my_queue.get(block=False)
except queue.Empty:
return
else:
print_multiply(value)
time.sleep(2)

# function to multiply
def print_multiply(x):
output_value = []
for i in range(1, x + 1):
output_value.append(i * x)
print(f" \n *** The multiplication result for the {x} is - {output_value}")

# Input variables
input_values = [2, 4, 6, 5,10,3]

# fill the queue
my_queue = queue.Queue()
for x in input_values:
my_queue.put(x)
# initializing and starting 3 threads
thread1 = MultiThread('First')
thread2 = MultiThread('Second')
thread3 = MultiThread('Third')
thread4 = MultiThread('Fourth')

# Start the threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()

# Join the threads
thread1.join()
thread2.join()
thread3.join()
thread4.join()

输出结果

** Starting the thread - First
*** The multiplication result for the 2 is - [2, 4]

输出结果

** Starting the thread - Second
*** The multiplication result for the 4 is - [4, 8, 12, 16]

输出结果

** Starting the thread - Third
*** The multiplication result for the 6 is - [6, 12, 18, 24, 30, 36]

输出结果

** Starting the thread - Fourth
*** The multiplication result for the 5 is - [5, 10, 15, 20, 25]
*** The multiplication result for the 10 is - [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
*** The multiplication result for the 3 is - [3, 6, 9] ** Completed the thread - Third
** Completed the thread - Fourth
** Completed the thread - Second ** Completed the thread - First

6.我们已经成功实施了队列概念。看到,我们有4个线程,但是有6个值要处理,因此谁先进入队列,谁就会执行,其他人将排队等待其他人完成。

这类似于现实生活,假设有3个柜台,但有10个人等待付款,因此10个人将排在3个队列中,而完成支付账单的人将排队并为下一个人让路。