Python如何使用RabbitMQ的消息

示例

从导入库开始。

from amqpstorm import Connection

使用消息时,我们首先需要定义一个函数来处理传入的消息。这可以是任何可调用的函数,并且必须采用一个消息对象或一个消息元组(取决于中to_tuple定义的参数start_consuming)。

除了处理传入消息中的数据外,我们还必须确认或拒绝消息。这很重要,因为我们需要让RabbitMQ知道我们已正确接收并处理了该消息。

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # 确认我们已成功处理该消息。
    message.ack()

    # 拒绝邮件。
    # message.reject()

    # 拒绝邮件,然后将其放回队列中。
    # message.reject(requeue = True)

接下来,我们需要建立与RabbitMQ服务器的连接。

connection = Connection('127.0.0.1', 'guest', 'guest')

之后,我们需要建立一个频道。每个连接可以有多个通道,通常在执行多线程任务时,建议(但不是必需)每个线程一个。

channel = connection.channel()

设置好频道后,我们需要让RabbitMQ知道我们要开始使用消息。在这种情况下,我们将使用先前定义的on_message函数来处理所有消耗的消息。

我们将在RabbitMQ服务器上侦听的队列将成为simple_queue,并且我们还告诉RabbitMQ,一旦处理完所有传入消息,我们将对其进行确认。

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

最后,我们需要启动IO循环以开始处理RabbitMQ服务器传递的消息。

channel.start_consuming(to_tuple=False)