发布者确认是实现可靠发布的RabbitMQ扩展。当通道启用了发布者确认,客户端发送的消息会被广播站异步确认,表明这些消息已在服务端处理过了。
由于官方没有提供Golang版的教程文档,所以本章节是不完整的,主要是根据官方提供的代码和Java版教程整理的。
概览
在本章节中,我们将使用发布者确认来确保发布的消息安全地触达广播,我们将介绍异步处理发布者确认策略。
启用发布者确认
发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,所以默认情况下是没有启动的。使用NotifyPublish
可以在通道级别启用发布者确认:
confirms := make(chan amqp.Confirmation)
ch.NotifyPublish(confirms)
该方法需要在你想使用发布者确认的每个通道上调用一次。确认应该只启用一次,而不是每次发布消息都启用。
异步处理发布者确认
广播可以异步确认发布的消息,只需要在客户端注册一个chan
即可处理这些消息确认:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err.Error())
}
}
func main() {
// 建立连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 创建接收确认的chan
confirms := ch.NotifyPublish(make(chan amqp.Confirmation))
go func() {
for confirm := range confirms {
if confirm.Ack {
log.Println("Confirmed")
} else {
log.Println("Nacked")
}
}
}()
// Confirm 将此通道置于确认模式,以便客户端可以确保所有
// 服务器已成功接收发布。进入这个之后
// 模式,服务器将发送一个 basic.ack 或 basic.nack 消息与交付
// 标记设置为对应于每次发布的基于 1 的增量索引
// 此方法返回后收到。
// 将侦听器添加到 Channel.NotifyPublish 以响应确认。如果
// Channel.NotifyPublish 没有被调用,确认将被静默
// 忽略。
// 确认顺序不受交付顺序的约束。
// Ack 和 Nack 确认将在未来的某个时候到达。
// 不可路由的强制或即时消息在之后立即被确认
// 已通知任何 Channel.NotifyReturn 侦听器。其他消息是
// 当所有应该将消息路由到它们的队列都具有
// 收到送达确认或已将消息加入队列,
// 如有必要,保留消息。
// 当 noWait 为真时,客户端不会等待响应。一个频道
// 如果服务器不支持此方法,则可能会发生异常。
err = ch.Confirm(false)
failOnError(err, "Failed to Confirm(false)")
// 声明队列
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
consume(ch, q.Name)
publish(ch, q.Name, "hello")
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
forever := make(chan bool)
<-forever
}
// 消息处理
func consume(ch *amqp.Channel, qName string) {
msgs, err := ch.Consume(
qName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
}
// 发布消息
func publish(ch *amqp.Channel, qName, text string) {
err := ch.Publish("", qName, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(text),
})
failOnError(err, "Failed to publish a message")
}
声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意