环境 pika 0.10 RabbitMQ 3.5.4, Erlang 18.0
生产者
cOnnection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() queue_name="route_test" channel.exchange_declare(exchange='logs') channel.basic_qos(prefetch_count=1) message = ' '.join(sys.argv[1:]) or "info: Hello World!" i = 0 while i < 10: channel.basic_publish(exchange='logs', routing_key=queue_name, body=message, properties=pika.BasicProperties( delivery_mode=2 )) i += 1 print(" [x] Sent %r" % message) connection.close() 消费者
def on_start(): cOnnection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() queue_name="route_test" channel.exchange_declare(exchange='logs') channel.queue_declare(queue=queue_name) channel.queue_bind(exchange='logs', queue=queue_name) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=queue_name, no_ack=True) return channel def callback(ch, method, properties, body): print(" [x] %r" % body) time.sleep(3) while True: try: channel = on_start() print(' [*] Waiting for logs. To exit press CTRL+C') channel.start_consuming() except: print "connect error" 我已经在 channel 中设置了channel.basic_qos(prefetch_count=1),但是我如果先执行消费者,然后再执行生产者,就会一次性的把所有的消息都扔给消费者,这样,我如果手动再启动一个消费者,就无法获得还没有被执行的消息了。
请问,这是为什么,或者怎么样能够达到我想要的效果,即每次消费者都只获得一个消息,剩下的都保存在消息队列里面。
