发布者确认是实现可靠发布的RabbitMQ扩展。当通道启用了发布者确认,客户端发送的消息会被广播站异步确认,表明这些消息已在服务端处理过了。

由于官方没有提供Golang版的教程文档,所以本章节是不完整的,主要是根据官方提供的代码和Java版教程整理的。

概览

在本章节中,我们将使用发布者确认来确保发布的消息安全地触达广播,我们将介绍异步处理发布者确认策略。

启用发布者确认

发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,所以默认情况下是没有启动的。使用NotifyPublish可以在通道级别启用发布者确认:

confirms := make(chan amqp.Confirmation)
ch.NotifyPublish(confirms)

该方法需要在你想使用发布者确认的每个通道上调用一次。确认应该只启用一次,而不是每次发布消息都启用。

异步处理发布者确认

广播可以异步确认发布的消息,只需要在客户端注册一个chan即可处理这些消息确认:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err.Error())
	}
}

func main() {
	// 建立连接
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 创建接收确认的chan
	confirms := ch.NotifyPublish(make(chan amqp.Confirmation))
	go func() {
		for confirm := range confirms {
			if confirm.Ack {
				log.Println("Confirmed")
			} else {
				log.Println("Nacked")
			}
		}
	}()

	// Confirm 将此通道置于确认模式,以便客户端可以确保所有
	// 服务器已成功接收发布。进入这个之后
	// 模式,服务器将发送一个 basic.ack 或 basic.nack 消息与交付
	// 标记设置为对应于每次发布的基于 1 的增量索引
	// 此方法返回后收到。

	// 将侦听器添加到 Channel.NotifyPublish 以响应确认。如果
	// Channel.NotifyPublish 没有被调用,确认将被静默
	// 忽略。

	// 确认顺序不受交付顺序的约束。

	// Ack 和 Nack 确认将在未来的某个时候到达。

	// 不可路由的强制或即时消息在之后立即被确认
	// 已通知任何 Channel.NotifyReturn 侦听器。其他消息是
	// 当所有应该将消息路由到它们的队列都具有
	// 收到送达确认或已将消息加入队列,
	// 如有必要,保留消息。

	// 当 noWait 为真时,客户端不会等待响应。一个频道
	// 如果服务器不支持此方法,则可能会发生异常。
	err = ch.Confirm(false)
	failOnError(err, "Failed to Confirm(false)")

	// 声明队列
	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		false, // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

	consume(ch, q.Name)
	publish(ch, q.Name, "hello")

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	forever := make(chan bool)
	<-forever

}

// 消息处理
func consume(ch *amqp.Channel, qName string) {
	msgs, err := ch.Consume(
		qName, // queue
		"",    // consumer
		true,  // auto-ack
		false, // exclusive
		false, // no-local
		false, // no-wait
		nil,   // args
	)
	failOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()
}

// 发布消息
func publish(ch *amqp.Channel, qName, text string) {
	err := ch.Publish("", qName, false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(text),
	})
	failOnError(err, "Failed to publish a message")
}


声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意