Rabbitmq介绍以及在Node.js中的使用

Rabbitmq介绍以及在Node.js中的使用

十月 16, 2019

使用场景

在项目中,将一些无需即时返回且耗时的操作提取出来,进行一步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了吞吐量。
Example:

  • 日志采集
  • 报名
  • 短信发送

相关概念介绍

Connection和Channel

Connection: 是RabbitMQ的socket链接,封装了socket协议相关部分逻辑。
Channel: 定义Queue,Exchange,绑定Queue与Exchange,发布消息等。

Queue

RabbitMQ中的消息都只能存在Queue中,生产者(P)生产消息并最终投递到Queue中,消费者(C)可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时候Queue消息会平均分摊给多个消费者处理,而不每个消费者都接收到消息。
消费

Message acknowledgment

实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完就宕机的情况,这种情况下就可能会导致消息丢失了。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给mq,mq收到回执后才将消息从Queue中删除。
PS:千万要发送回执给mq,不然会导致bug– Queue中的消息堆积如山。

Durability

如果我们希望即使在mq服务重启的情况下,也不会丢失消息,我们可以将queue和message都设置成可持久化(durable),这样可以保证绝大部分情况下我们的mq消息不会丢失。

Exchange

在介绍Queue的时候我们看到生产者将消息投递到Queue中,实际上在mq中这种事情永远不会发生,实际情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或者多个Queue中。
Exchange

routing key

生产者将消息发送给Exchange的时候,一般会指定一个routing key,用来指定这个消息的路由规则,而这个routing key需要与Exchange Type和binding key联合使用才能最终生效。

binding key

在绑定Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key。当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
在绑定多个Queue到同一个Exchange的时候,这些绑定允许使用同样的binding key,但是并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchage的Queue。

Exchange Type

  • fanout
    会把所有发送到该Exchange的消息路遥到所有与它绑定的Queue中。
    fanout
  • direct
    会把消息路由到那些bingding key 和 routing key完全匹配的Queue中
  • topic
    和direct类似,但是没有direct那么严格,它支持通配符
  • header
    根据发送消息内容中的headers属性进行匹配,一般用的很少,不多做介绍

与Kafka对比

  • 应用场景

    • RabbitMQ:用于实时的,对可靠性要求较高的消息传递上。
    • kafka:用于处于活跃的流式数据,大数据量的数据处理上。
  • 吞吐量方面

    • RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。
    • kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。
  • 集群负载均衡方面

    • RabbitMQ:本身不支持负载均衡,需要loadbalancer的支持
    • kafka:采用zookeeper对集群中的broker,consumer进行管理,可以注册topic到zookeeper上,通过zookeeper的协调机制,producer保存对应的topic的broker信息,可以随机或者轮询发送到broker上,producer可以基于语义指定分片,消息发送到broker的某个分片上。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 生产端 
const amqp = require('amqplib');

async function producer() {
// 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');

// 获取通道
const channel = await connection.createChannel();

// 声明参数
const exchangeName = 'direct_exchange_name';
const routingKey = 'direct_routingKey';
const msg = 'hello world';

// 交换机
await channel.assertExchange(exchangeName, 'direct', {
durable: true,
});

// 发送消息
await channel.publish(exchangeName, routingKey, Buffer.from(msg));

// 关闭链接
await channel.close();
await connection.close();
}

producer();

// 消费端
const amqp = require('amqplib');

async function consumer() {
// 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');

// 获取通道
const channel = await connection.createChannel();

// 声明参数
const exchangeName = 'direct_exchange_name';
const queueName = 'direct_queue';
const bindingKey = 'direct_routingKey';

// 声明一个交换机
await channel.assertExchange(exchangeName, 'direct', { durable: true });

// 声明一个队列
await channel.assertQueue(queueName);

// 绑定关系(队列、交换机、路由键)
await channel.bindQueue(queueName, exchangeName, bindingKey);

// 消费
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg);
}, { noAck: false });

console.log('消费端启动成功!');
}

consumer();

扩展

消息TTL

消息的 TTL 指的是消息的存活时间,RabbitMQ 支持消息、队列两种方式设置 TTL,分别如下:

  • 消息设置 TTL:对消息的设置是在发送时进行 TTL 设置,通过 x-message-ttl 或expiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的 TTL 可不同。
  • 队列设置 TTL:对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。

注意:如果以上两种方式都做了设置,消息的 TTL 则以两者之中最小的那个为准。

死信队列

死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。
死信队列产生的几种情况:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

设置DLX的两个参数:

  • deadLetterExchange: 设置 DLX,当正常队列的消息成为死信后会被路由到 DLX 中
  • deadLetterRoutingKey: 设置 DLX 指定的路由键

通过上面这两个功能,我们完全可以实现延迟队列,用于例如订单过期判断等等业务场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 路由一个死信队列
* @param { Object } connnection
*/
async function producerDLX() {
// 创建链接对象
const connnection = await amqp.connect('amqp://localhost:5672');
const testExchange = 'test';
const testQueue = 'testQ';
const testExchangeDLX = 'testExDLX';
const testRoutingKeyDLX = 'testRoutingKeyDLX';

const ch = await connnection.createChannel();
await ch.assertExchange(testExchange, 'direct', { durable: true });
const queueResult = await ch.assertQueue(testQueue, {
exclusive: false,
deadLetterExchange: testExchangeDLX,
deadLetterRoutingKey: testRoutingKeyDLX,
});
await ch.bindQueue(queueResult.queue, testExchange);
const msg = 'hello world!';
console.log('producer msg:', msg, new Date());
await ch.sendToQueue(queueResult.queue, new Buffer(msg), {
expiration: '10000'
});

ch.close();
}
producerDLX()

/**
* 消费一个死信队列
* @param { Object } connnection
*/
async function consumerDLX() {
// 创建链接对象
const connnection = await amqp.connect('amqp://localhost:5672');
const testExchangeDLX = 'testExDLX';
const testRoutingKeyDLX = 'testRoutingKeyDLX';
const testQueueDLX = 'testQueueDLX';

const ch = await connnection.createChannel();
await ch.assertExchange(testExchangeDLX, 'direct', { durable: true });
const queueResult = await ch.assertQueue(testQueueDLX, {
exclusive: false,
});
await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX);
await ch.consume(queueResult.queue, msg => {
console.log('consumer msg:', msg.content.toString(), new Date());
}, { noAck: true });
}

consumerDLX()