从导入库开始。
from amqpstorm import Connection
使用消息时,我们首先需要定义一个函数来处理传入的消息。这可以是任何可调用的函数,并且必须采用一个消息对象或一个消息元组(取决于中to_tuple定义的参数start_consuming)。
除了处理传入消息中的数据外,我们还必须确认或拒绝消息。这很重要,因为我们需要让RabbitMQ知道我们已正确接收并处理了该消息。
def on_message(message): """This function is called on message received. :param message: Delivered message. :return: """ print("Message:", message.body) # 确认我们已成功处理该消息。 message.ack() # 拒绝邮件。 # message.reject() # 拒绝邮件,然后将其放回队列中。 # message.reject(requeue = True)
接下来,我们需要建立与RabbitMQ服务器的连接。
connection = Connection('127.0.0.1', 'guest', 'guest')
之后,我们需要建立一个频道。每个连接可以有多个通道,通常在执行多线程任务时,建议(但不是必需)每个线程一个。
channel = connection.channel()
设置好频道后,我们需要让RabbitMQ知道我们要开始使用消息。在这种情况下,我们将使用先前定义的on_message函数来处理所有消耗的消息。
我们将在RabbitMQ服务器上侦听的队列将成为simple_queue,并且我们还告诉RabbitMQ,一旦处理完所有传入消息,我们将对其进行确认。
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
最后,我们需要启动IO循环以开始处理RabbitMQ服务器传递的消息。
channel.start_consuming(to_tuple=False)