在上一章节中,我们升级了日志系统,使用了direct交换机,而不是只能虚拟广播的fanout交换机,并且实现了选择性接收消息的可能。

尽管direct交换机提升了我们的系统,但还是有局限性 – 它不能基于多个维度进行路由。

在日志系统中,我们可能不仅想基于级别来订阅日志,也可能想根据日志来源来接收信息。你可能已经从Unix工具 – syslog – 了解到相关的概念:它既能基于级别(info/warn/crit…)路由选择日志,也能基于设备(auth/cron/kern…)。

这种方式会给我们提供很多便利 – 我们可能不仅想监听来自cron的危重错误日志,也想接收来自kern的所有日志。

要在我们的日志系统中实现这样的功能,我们需要了解更为复杂的交换器 – topic


1、Topic交换器

发送给topic交换器的消息的routing_key通常是一组由”.”区分的词组列表,而不是某个特定的词。这些词可以是任何东西,但通常是跟消息特性相关的词,例如,stock.usd.nysenyse.vmwquick.orange.rabbit等。路由键可以由任意多的词组成,上限是256字节。

绑定键也必须是相同的格式。topic交换器背后的逻辑跟direct类似 – 带有特定路由键的消息会被发送给绑定了对应绑定键的队列。但是,绑定键有两个重要的特定场景:

  • *:可以替代一个词
  • #:可以替代零个或多个词

结合下面的例子,更方便解释:

在上面的例子中,我们发送的信息都是描述动物。这些信息的路由键都是由三个词(包括两个“.”)组成:路由键的第一个词描述速度,第二次是颜色,第三个是种类:<speed>.<colour>.<species>

我们创建了三个绑定:Q1绑定*.orange.*,Q2绑定*.*.rabbitlazy.#

这些绑定总结来说就是:

  • Q1接收所有橙色动物。
  • Q2接收关于兔子的所有消息和所有速度为的动物。

路由键设置为quick.orange.rabbit的消息会被发送给两个队列。lazy.orange.elephant也会发送给两个队列。quick.orange.fox只会发送给Q1队列,lazy.brown.fox只会发送给Q2队列。lazy.pink.rabbit尽管它满足两个绑定,但也只会发送给Q2队列。quick.brown.fox不满足任何绑定,所以会被丢弃。

如果我们不按规定发送消息,发送带一个或者四个词的消息,像orangequick.orange.male.rabbit,会怎样?因为这些消息不满足所有的绑定,所以会被丢弃。

lazy.orange.male.rabbit,尽管它是4个词,但匹配最后一条绑定,所有会被发送给Q2。

Topic交换机 Topic交换机功能强大,可以模拟其它交换机行为。
当绑定了#,无论路由键是什么,队列会收到所有的消息 – 就像fanout交换机。 在不使用*#的绑定时,topic交换机的行为就跟direct一样。


2、整合

我们将在日志系统中使用topic交换机。我们假设日志的路由键包含两个词:<facility>.<servrity>

代码跟上一节的类似。

发送日志的代码emit_log_topic.go代码如下:

package main

import (
	"log"
	"os"
	"strings"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"logs_topic", // name
		"topic",      // type
		true,         // durable
		false,        // auto-deleted
		false,        // internal
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args)
	err = ch.Publish(
		"logs_topic",          // exchange
		severityFrom(os.Args), // routing key
		false, // mandatory
		false, // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
	var s string
	if (len(args) < 3) || os.Args[2] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[2:], " ")
	}
	return s
}

func severityFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = "anonymous.info"
	} else {
		s = os.Args[1]
	}
	return s
}

接收日志消息receive_logs_topic.go代码如下:

package main

import (
	"log"
	"os"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"logs_topic", // name
		"topic",      // type
		true,         // durable
		false,        // auto-deleted
		false,        // internal
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

	if len(os.Args) < 2 {
		log.Printf("Usage: %s [binding_key]...", os.Args[0])
		os.Exit(0)
	}
	for _, s := range os.Args[1:] {
		log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s)
		err = ch.QueueBind(
			q.Name,       // queue name
			s,            // routing key
			"logs_topic", // exchange
			false,
			nil)
		failOnError(err, "Failed to bind a queue")
	}

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto ack
		false,  // exclusive
		false,  // no local
		false,  // no wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

接收所有日志:

go run receive_logs_topic.go "#"

接收来自kern的所有日志:

go run receive_logs_topic.go "kern.*"

或者只接收critical日志:

go run receive_logs_topic.go "*.critical"

你可以创建多重绑定:

go run receive_logs_topic.go "kern.*" "*.critical"

然后使用kern.critical路由键发送日志:

go run emit_log_topic.go "kern.critical" "A critical kernel error"

接下来我们会在下一节中介绍远程过程调用。


声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意