在上一个教程中,我们编写了程序来发送消息给有名队列并从中接收消息。这次,我们将创建一个工作队列来分发耗时任务给多个消费者。
工作队列(又名任务队列)的主要思想是避免立即执行那些资源密集型任务且不需要等待完成的任务,我们将这些任务推后执行。我们将任务封装成消息并发送给队列。在后台运行的工作进程取出这些任务并进行处理。如果你运行了多个工作进程,这些进程之间将会共享这些任务。
这个概念在web应用中尤其有用,因为在短暂的HTTP请求窗口内处理复杂的任务是不可能的。
1. 准备
在教程的第一部分中,我们仅发送了条“Hello World”消息。现在我们将发送些代表复杂任务的信息。我们并没有真实的任务场景,像重置图形大小或渲染PDF文件,所以我们使用time.Sleep
函数来假装我们很忙。我们使用字符串中.
的数量来表示任务的复杂程度:一个.
代表1s的工作。例如“Hello…”这样的任务需要3s时间。
我们需要简单地修改下上一节的send.go文件,这样它就能读取从命令行输入的字符串。修改后的程序会将任务发送到工作队列中,我们将之命名为new_task.go
:
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
bodyFrom()
函数声明如下:
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
receive.go也需要修改下:它需要为消息中的每个.
伪造1s的工作量。修改后的程序会将从队列中弹出消息并进行处理,所以我们将它命名为worker.go
:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
我们使用了time.Sleep
来模拟任务执行。
像第一节一样,在命令行下执行:
# shell 1
go run worker.go
# shell 2
go run new_task.go
2. 循环调用
使用任务队列的一大优势就是可以并行工作。如果积压了大量的工作,那我们只需增加“工人”就可以很方便的提高效率。
首先,我们需要同时运行两个worker.go
。它们都能从队列中获取消息,但实际呢?
这次你需要打开三个终端。两个运行worker.go
,这两个就是两个消费者 – C1和C2:
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
最后一个终端我们用了发布消息。启动消费者之后,你可以发送下面的信息:
# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
此时,两个消费者终端会显示如下信息:
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ会顺序地把消息发送给下个消费者。整体而言,每个消费者都会得到相同数量的消息。这种分发消息的方式被称作循环。你可以自行尝试使用更多的“工人”来验证。
3. 消息确认
执行任务需要时间。你可能会好奇当一个消费者处理一项耗时漫长的任务,但只处理了一部分就挂了会怎样。对我们当前的代码来说,一旦消息被分发给消费者,RabbitMQ就会立即把它标记为删除。这种情况下,如果你kill一个“工人”,那我们就会丢失它正在处理的消息。不止于此,我们还会丢失分发给它的其它未处理的消息。
但,实际上我们并不想丢失任何任务。如果一个处理程序挂了,我们希望这个任务会被分发给其它处理程序。
为了确保所有消息都不会丢失,RabbitMQ支持消息确认。当消费者接收到消息并处理结束后,它会给RabbitMQ发送ack(nowledgement),告诉RabbitMQ可以删除该消息了。
如果消费者没有发送ack就挂了(可能是通道关闭了、连接关闭了,或TCP连接丢失了),RabbitMQ就会知道消息没有完全处理完成,并把它重新放入队列中。如果此时有其它消费者在线,RabbitMQ很快就会将它重新发送给其它消费者。这样即使有处理程序掉线,RabbitMQ也能保证不会丢失消息。
消息没有超时限制。消费者掉线后RabbitMQ会重发消息,即使处理这条消息会花费很长很长的时间。
在本教程中,我们会将auto-ack
设置为false,在处理程序处理完消息后,会通过d.Ack(false)
(发送单个确认)确认信号给RabbitMQ。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
使用上面的代码,我们可以确保即使你在处理程序工作时使用CTRL-C
关闭程序也不是丢失任何信息。在处理程序下线之后不久,所有未确认的信息都会被重发。
确认信号必须通过接收分发的通道发送。尝试使用不同的通道发送确认信号会引发通道级别的协议异常,详见doc guide on confirmations。
忘记确认
ack
缺失是常见的错误。虽然只是个简单的错误,但造成的后果很严重。在客户端离线后消息会被重发,但RabbitMQ占用的内存会越来越多,因为它无法释放未经确认的消息。 你可以使用rabbitmqctl
命令输出messages_unacknowledgerd
字段来调试这类错误:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows下执行rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
4. 消息持久化
我们已经知道了如何在消费者掉线的情况下保证任务不会丢失。但RabbitMQ服务关闭后,任务还是会丢失的。
除非事先设置好,否则在服务下线或崩溃时,RabbitMQ仍会遗失现有的队列和消息。要求确保消息不会遗失,我们需要做两件事:持久化队列的同时也要持久化消息。
首先,我们需要确保在RabbitMQ节点重启后,队列不会遗失,所以我们需要将队列持久化:
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
尽管上面的命令是正确的,但我们当前的配置并不会生效,因为我们已经声明了一个非持久的hello
队列。RabbitMQ不允许使用不同参数来重定义一个已存在的队列,任何试图执行这样操作的程序都会报错。我们需要声明一个不同的队列,例如task_queue
:
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
生产者和消费者代码中的durable
选项都需要设置。
此时,即使RabbitMQ重启,task_queue
队列也不会丢失。现在需要让我们的消息也持久化 – 通过amqp.Oersistent
选项来amqp.Publishing
任务:
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
消息持久化注意事项 消息持久化并不能完全保证消息不会丢失。即使已经告诉了RabbitMQ要把消息保存到硬盘,但RabbitMQ从接收到消息到将消息写入硬盘,期间还是需要一小段时间的。而且RabbitMQ并不会对每条消息都执行
fsync(2)
– 只是将消息保存到缓存中,而不是真正写入硬盘。尽管持久化保证并不强,但对于我们的简单任务队列来说,这已经足够了。如果你需要更强劲的保证,可以使用publisher confirms。
5. 公平分发
你可能已经意识到了,分发工作并不是像我们预期的那样。例如在有两个“工人”的场景中,当所有奇数消息都很耗时、偶数消息都是轻量级时,会出现一个处理程序很繁忙,而另一个却几乎没什么工作。因为RabbitMQ并不了解这种情况,还是会均匀地分发消息。
出现这种情况是因为RabbitMQ在有消息进入队列之后只单纯地分发消息,并不会检查消费者未确认的消息数量。它只会盲目的将第n条消息发送给第n个消费者。
要解决上面的问题,我们需要将每个消费者的预取数量设置为1。即告诉RabbitMQ同一时间最多分发一条消息给“工人”。换言之,在“工人”处理完并确认前一条消息前不要分发给它新的消息,而是将消息分发给其它空闲的“工人”。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
队列大小 如果所有的“工人”都很忙,那你的队列就可能被塞满。你需要密切关注这一点,可以增加处理程序来避免这个问题,或者采取其它策略。
6. 整合到一起
new_task.go
如下:
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err.Error())
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", //exchange
q.Name, //routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], s)
}
return s
}
worker.go
:
package main
import (
"bytes"
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err.Error())
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
<-forever
}
通过消息确认和预处理数量,你可以设置一个工作队列。持久化选项的使用可以在RabbitMQ重启的情况下也不丢失任务。
amqp.Channel
更多方法和消息特性的介绍,详见amqp API reference。
下一章节我们将学习如何将一条消息同时发送给多个消费者。
声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意