Python如何在RabbitMQ中创建延迟队列

示例

首先,我们需要设置两个基本通道,一个用于主队列,一个用于延迟队列。在最后的示例中,我添加了一些不需要的其他标志,但这些标志使代码更可靠。如confirm delivery,delivery_mode和durable。您可以在RabbitMQ手册中找到有关这些的更多信息。

设置通道后,我们将绑定添加到主通道,以便将消息从延迟通道发送到主队列。

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

接下来,我们需要配置延迟通道,以在消息过期后将其转发到主队列。

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})

  • x-message-ttl (消息-生存时间)

    通常,它用于在特定的持续时间后自动删除队列中的旧消息,但是通过添加两个可选参数,我们可以更改此行为,而是让此参数以毫秒为单位确定消息在延迟队列中保留的时间。

  • x死信路由键

    此变量使我们可以在消息过期后将其传输到其他队列,而不是将其完全删除的默认行为。

  • x死信交换

    此变量确定用于将邮件从hello_delay传输到hello队列的Exchange。

发布到延迟队列

设置完所有基本的Pika参数后,您只需使用基本发布就可以将消息发送到延迟队列。

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

一旦执行了脚本,您应该会在RabbitMQ管理模块中看到以下队列。

例。

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# 创建普通的“ Hello World”类型频道。
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# 我们需要将此渠道绑定到交易所,该交易所将用于转移
# 来自我们的延迟队列的消息。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# 创建我们的延迟频道。
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# 在这里我们声明延迟,并为延迟通道进行路由。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # 延迟直到消息以毫秒为单位传输。
    'x-dead-letter-exchange': 'amq.direct', # Exchange用于将邮件从A传输到B。
    'x-dead-letter-routing-key': 'hello' # 我们要将邮件传输到的队列的名称。
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")