在上一章节中,我们升级了日志系统,使用了direct
交换机,而不是只能虚拟广播的fanout
交换机,并且实现了选择性接收消息的可能。
尽管direct
交换机提升了我们的系统,但还是有局限性 – 它不能基于多个维度进行路由。
在日志系统中,我们可能不仅想基于级别来订阅日志,也可能想根据日志来源来接收信息。你可能已经从Unix工具 – syslog
– 了解到相关的概念:它既能基于级别(info/warn/crit…)路由选择日志,也能基于设备(auth/cron/kern…)。
这种方式会给我们提供很多便利 – 我们可能不仅想监听来自cron
的危重错误日志,也想接收来自kern
的所有日志。
要在我们的日志系统中实现这样的功能,我们需要了解更为复杂的交换器 – topic
。
1、Topic交换器
发送给topic
交换器的消息的routing_key
通常是一组由”.”区分的词组列表,而不是某个特定的词。这些词可以是任何东西,但通常是跟消息特性相关的词,例如,stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
等。路由键可以由任意多的词组成,上限是256字节。
绑定键也必须是相同的格式。topic
交换器背后的逻辑跟direct
类似 – 带有特定路由键的消息会被发送给绑定了对应绑定键的队列。但是,绑定键有两个重要的特定场景:
*
:可以替代一个词#
:可以替代零个或多个词
结合下面的例子,更方便解释:
在上面的例子中,我们发送的信息都是描述动物。这些信息的路由键都是由三个词(包括两个“.”)组成:路由键的第一个词描述速度,第二次是颜色,第三个是种类:<speed>.<colour>.<species>
。
我们创建了三个绑定:Q1绑定*.orange.*
,Q2绑定*.*.rabbit
和lazy.#
。
这些绑定总结来说就是:
- Q1接收所有橙色动物。
- Q2接收关于兔子的所有消息和所有速度为
懒
的动物。
路由键设置为quick.orange.rabbit
的消息会被发送给两个队列。lazy.orange.elephant
也会发送给两个队列。quick.orange.fox
只会发送给Q1队列,lazy.brown.fox
只会发送给Q2队列。lazy.pink.rabbit
尽管它满足两个绑定,但也只会发送给Q2队列。quick.brown.fox
不满足任何绑定,所以会被丢弃。
如果我们不按规定发送消息,发送带一个或者四个词的消息,像orange
或quick.orange.male.rabbit
,会怎样?因为这些消息不满足所有的绑定,所以会被丢弃。
lazy.orange.male.rabbit
,尽管它是4个词,但匹配最后一条绑定,所有会被发送给Q2。
Topic交换机 Topic交换机功能强大,可以模拟其它交换机行为。
当绑定了#
,无论路由键是什么,队列会收到所有的消息 – 就像fanout
交换机。 在不使用*
和#
的绑定时,topic交换机的行为就跟direct
一样。
2、整合
我们将在日志系统中使用topic
交换机。我们假设日志的路由键包含两个词:<facility>.<servrity>
。
代码跟上一节的类似。
发送日志的代码emit_log_topic.go
代码如下:
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
接收日志消息receive_logs_topic.go
代码如下:
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
接收所有日志:
go run receive_logs_topic.go "#"
接收来自kern
的所有日志:
go run receive_logs_topic.go "kern.*"
或者只接收critical
日志:
go run receive_logs_topic.go "*.critical"
你可以创建多重绑定:
go run receive_logs_topic.go "kern.*" "*.critical"
然后使用kern.critical
路由键发送日志:
go run emit_log_topic.go "kern.critical" "A critical kernel error"
接下来我们会在下一节中介绍远程过程调用。
声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意