民工兄Redis教程(四):发布订阅(Pub/Sub)
什么是发布订阅?
Redis 发布订阅(Pub/Sub)是一种消息通信模型:发送者(Pub)发送消息,订阅者(Sub)接收消息。
Redis 的订阅命令允许客户端订阅任意数量的频道。每当有新信息发送到订阅的频道时,该信息都会发送到订阅指定频道的所有客户端。
☛ 下图展示了通道Channel1和订阅该通道的三个客户端——Client2、Client5和Client1之间的关系: ☛ 当通过Publish命令向通道Channel1发送新消息时,是这样的消息发送给订阅的三个客户:
为什么要用发布订阅?熟悉消息中间件的同学都知道,对于消息订阅的发布功能,市场上很多大厂都使用kafka
、Rabbitmq
、ActiveMQ
、qactive、qRocketMQ 等、redis的订阅和发布功能比较简单,相比这三个,而且对数据的准确性和安全性要求不是那么高,可以立即使用。它适合小型公司。
Redis 的列表数据类型结构提供了 blpop 和 brpop 命令结合 rpush 和 lpush 命令来实现基于双向链表的消息队列机制和发布订阅功能
这种方法有两个局限性:
- 不支持一对多消息分发。
- 如果生产者的生成速率远大于消费者的消费速率,很容易积累大量未使用的消息
◇ 双队列图如下:
✦ 分析:双队列模式只能一个或几个消费者轮流消费,但不能同时向其他消费者发送消息分析:Redis订阅发布模型,生产者在生产消息后通过通道将消息分发给所有订阅的消费者到频道
如何使用发布/订阅?
Redis 有两种发布/订阅模式:
- 基于通道的发布/订阅
- 基于模式的发布/订阅
操作命令如下 “发布/订阅”包含 2 个角色:发布者和订阅者。发布者可以向指定渠道(channels)发送消息;订阅者可以订阅一个或多个频道,所有订阅该频道的订阅者都会收到此消息。
- 订阅者订阅频道subscribe频道[频道...]
--------------------------客户端1(订阅者) :订阅频道 ---------------------
# 订阅 “meihuashisan” 和 “csdn” 频道(如果不存在则会创建频道)
127.0.0.1:6379> subscribe meihuashisan csdn
Reading messages... (press Ctrl-C to quit)
1) "subscribe" -- 返回值类型:表示订阅成功!
2) "meihuashisan" -- 订阅频道的名称
3) (integer) 1 -- 当前客户端已订阅频道的数量
1) "subscribe"
2) "csdn"
3) (integer) 2
#注意:订阅后,该客户端会一直监听消息,如果发送者有消息发给频道,这里会立刻接收到消息
- 发布者发布消息publish频道消息
-----------------------客户端2(发布者):发布消息给频道 -------------------
# 给“meihuashisan”这个频道 发送一条消息:“I am meihuashisan”
127.0.0.1:6379> publish meihuashisan "I am meihuashisan"
(integer) 1 # 接收到信息的订阅者数量,无订阅者返回0
客户端2(发布者)将消息发布到频道后,此时我们将观察客户端1(订阅者)客户端窗口变化:
--------------------------客户端1(订阅者) :订阅频道 -----------------
127.0.0.1:6379> subscribe meihuashisan csdn
Reading messages... (press Ctrl-C to quit)
1) "subscribe" -- 返回值类型:表示订阅成功!
2) "meihuashisan" -- 订阅频道的名称
3) (integer) 1 -- 当前客户端已订阅频道的数量
1) "subscribe"
2) "csdn"
3) (integer) 2
--------------------变化如下:(实时接收到了该频道的发布者的消息)------------
1) "message" -- 返回值类型:消息
2) "meihuashisan" -- 来源(从哪个频道发过来的)
3) "I am meihuashisan" -- 消息内容
命令操作图如下:
注意:如果先发布消息再订阅频道,订阅前不会收到频道上发布的消息订阅了! 注意:进入订阅状态的客户只能使用
subscribe
、unsubscribe
、⸀♼和⸝p♝ punsubscribe 这四个属于其他命令如“发布/订阅”,否则会报错!
这里的客户端指的是Jedis和Salad的客户端,redis-cli无法退出订阅状态!更多学习Redis的文章请参见上:NoSQL数据库系列-Redis。该系列持续更新。
实现原理
下层通过字典实现。 pubsub_channels
是一个字典类型,存储订阅频道的信息:字典的键是订阅的频道,字典的值是一个链表。链表存储了所有订阅频道
struct redisServer {
/* General */
pid_t pid;
//省略百十行
// 将频道映射到已订阅客户端的列表(就是保存客户端和订阅的频道信息)
dict *pubsub_channels; /* Map channels to list of subscribed clients */
}
的客户,实现图如下:
频道订阅:订阅频道时,首先检查该字段内部是否存在;如果不存在,则为当前频道创建一个字典,并创建一个链表来存储客户端ID;否则,直接在链表中输入客户ID。
取消频道订阅:取消时,将客户ID从对应链表中移除;如果删除后链表已为空,则该通道将从字典中删除。
发布:先根据通道找到字典的key,然后将信息发送给字典值列表中的所有客户端
基于模式(pattern)发布/订阅
如果有某个/某些模式匹配此频道,所有订阅此/这些频道的客户也会收到该信息。
图解
下图是一个通道和模式的例子,其中com.ahead.*通道对应com.ahead.juc通道和com.ahead.thread通道,分别有不同的客户端。订阅其中三个,如下图:
当信息发送到 com.ahead.thread 通道时,信息除了发送到客户端 4 和客户端 5 之外,还会发送到连接到 com.ahead.thread 通道的客户端。订阅客户端 com.ahead.* 通道模式 x 和客户端 y✦ 分析:反之亦然,当消息发送到 com.ahead.juc 通道时,消息不仅发送到订阅 juc 的客户端通道是,但也会发送到订阅 com.ahead.* 的客户端。通道客户端:客户端 x,客户端 y
在通配符中,?代表 1 个占位符,* 代表任意数量的占位符(包括 0 个),?* 代表超过 1 个占位符。
- 订阅者订阅通道 psubscribe 模式 [Pattern...]
--------------------------客户端1(订阅者) :订阅频道 --------------------
# 1. ------------订阅 “a?” "com.*" 2种模式频道--------------
127.0.0.1:6379> psubscribe a? com.*
# 进入订阅状态后处于阻塞,可以按Ctrl+C键退出订阅状态
Reading messages... (press Ctrl-C to quit)
---------------订阅成功-------------------
1) "psubscribe" -- 返回值的类型:显示订阅成功
2) "a?" -- 订阅的模式
3) (integer) 1 -- 目前已订阅的模式的数量
1) "psubscribe"
2) "com.*"
3) (integer) 2
---------------接收消息 (已订阅 “a?” "com.*" 两种模式!)-----------------
# ---- 发布者第1条命令:publish ahead "hello"
结果:没有接收到消息,匹配失败,不满足 “a?” ,“?”表示一个占位符, a后面的head有4个占位符
# ---- 发布者第2条命令: publish aa "hello" (满足 “a?”)
1) "pmessage" -- 返回值的类型:信息
2) "a?" -- 信息匹配的模式:a?
3) "aa" -- 信息本身的目标频道:aa
4) "hello" -- 信息的内容:"hello"
# ---- 发布者第3条命令:publish com.juc "hello2"(满足 “com.*”, *表示任意个占位符)
1) "pmessage" -- 返回值的类型:信息
2) "com.*" -- 匹配模式:com.*
3) "com.juc" -- 实际频道:com.juc
4) "hello2" -- 信息:"hello2"
---- 发布者第4条命令:publish com. "hello3"(满足 “com.*”, *表示任意个占位符)
1) "pmessage" -- 返回值的类型:信息
2) "com.*" -- 匹配模式:com.*
3) "com." -- 实际频道:com.
4) "hello3" -- 信息:"hello3"
- 发布者发布消息发布通道消息
------------------------客户端2(发布者):发布消息给频道 ------------------
注意:订阅者已订阅 “a?” "com.*" 两种模式!
# 1. ahead 不符合“a?”模式,?表示1个占位符
127.0.0.1:6379> publish ahead "hello"
(integer) 0 -- 匹配失败,0:无订阅者
# 2. aa 符合“a?”模式,?表示1个占位符
127.0.0.1:6379> publish aa "hello"
(integer) 1
# 3. 符合“com.*”模式,*表示任意个占位符
127.0.0.1:6379> publish com.juc "hello2"
(integer) 1
# 4. 符合“com.*”模式,*表示任意个占位符
127.0.0.1:6379> publish com. "hello3"
(integer) 1
命令操作图如下:
实现原理❙下面的链接是 pubsubPattern 列表节点。 struct redisServer {
//...
list *pubsub_patterns;
// ...
}
// 1303行订阅模式列表结构:
typedef struct pubsubPattern {
client *client; -- 订阅模式客户端
robj *pattern; -- 被订阅的模式
} pubsubPattern;
struct redisServer {
//...
list *pubsub_patterns;
// ...
}
// 1303行订阅模式列表结构:
typedef struct pubsubPattern {
client *client; -- 订阅模式客户端
robj *pattern; -- 被订阅的模式
} pubsubPattern;
的实现图如下:
模式订阅:在链表的最后一端添加一个pubsub_pattern
数据结构,同时存储客户ID。
Pattern取消订阅:从当前链表pubsub_pattern
结构体中删除需要取消订阅的pubsubpattern结构体。
使用总结
订阅者(监听者)负责订阅频道(频道);发送者(发布者)负责将二进制字符串消息发送到通道,然后,当通道接收到消息时,将其推送给订阅者。
使用场景
- 在电商中,用户成功下单后,向指定渠道发送消息,下游业务订阅支付结果。该通道自行处理相关业务逻辑
- 粉丝关注功能
- 文章推送
使用注意
- 客户端必须及时消费和处理消息。
- 客户端订阅频道后,如果没有及时收到消息,可能会导致DCS实例采集消息。当消息累积达到阈值(默认为32MB),或者在一段时间内(默认为1分钟)达到一定水平(默认为8MB)时,服务器会自动断开客户端连接,以避免内存不足。
- 客户端必须支持重连。
- 如果连接断开,客户端必须订阅或者使用psubscribe重新订阅,否则无法接收进一步的消息。
- 不建议在消息可靠性要求较高的场景中使用。
- Redis 的 pubsub 不是一个可靠的消息系统。如果客户端连接断开,或者在极端情况下服务器在主备之间切换,未消费的消息将被丢弃。
更多学习Redis的文章请参见上:NoSQL数据库系列-Redis。该系列持续更新。
深入理解
我们通过一些问题来深入了解Redis的订阅和发布机制
基于通道的发布/订阅是如何实现的?
底层是通过字典实现的(图中的pubsub_channels)。该字典用于存储订阅频道的信息:字典的键是订阅的频道,字典的值是一个链表,存储了订阅该频道的所有客户。
- 数据结构
例如,在下面的 pubsub_channels 示例中,Client2、Client5 和 Client1 订阅了 Channel1,其他频道也被其他客户端订阅:
- 订阅
当客户端调用SUBSCRIBE 命令,程序将客户端与 pubsub_channels 字典中要订阅的频道关联起来。
例如,客户端Client10086执行命令SUBSCRIBE Channel1 Channel2 Channel3
,那么之前显示的Pubsub_Channels将是这样的: PU BLISH频道消息命令,程序定位首先根据通道获取字典的key,然后将信息发送给字典值列表中的所有客户端。
例如,对于以下 pubsub_channels 实例,如果客户端执行命令 PUBLISH Channel1 "hello moto"
,则 client2、client5 和 clienthello moto 都会收到信息:
使用UNSUBSCRIBE 命令取消订阅指定频道。此命令执行订阅的反向操作:它删除 pubsub_channels 字典中给定通道(键)的当前客户端的信息。这样,取消订阅频道的信息就不再发送给该客户端。
基于模式的发布/订阅是如何实现的?
底层是pubsubPattern节点的链表。
- 数据结构 redisServer.pubsub_patterns 属性是一个链表,存储了所有与模式相关的信息:
struct redisServer {
// ...
list *pubsub_patterns;
// ...
};
链表中的每个节点都包含一个 redis.h/pubsubPattern 结构: Client 属性 存储模式,以及模式属性保存订阅的模式。
每当调用 PSUBSCRIBE 命令订阅模式时,程序都会创建一个包含客户端信息和订阅模式的 pubsubPattern 结构体,并将该结构体添加到 redisServer.pubsub_patterns 链表中。
作为示例,下图显示了具有两种模式的 pubsub_patterns 链表,其中 client123 和 client256 都订阅了 tweet.shop。* 模式:
- 订阅
当客户端 PSSC UBRI 执行 1008broadcast.list 时。 * ,则 pubsub_patterns 链表更新如下: 通过遍历整个 pubsub_patterns 链表,程序可以检查所有订阅的模式以及订阅这些模式的客户端。
- 发布
向模式发送信息的工作也是由 PUBLISH 命令执行的。当然,它符合获取通道,然后将消息发送给客户端的模式。
- 取消订阅
使用 PUNSUBSCRIBE 命令取消订阅指定模式。该命令执行订阅模式的逆操作:程序删除链表redisServer.pubsub_patterns中与取消订阅模式关联的所有pubsubPattern结构。这样,客户端就不再从与该模式匹配的通道接收信息。更多学习Redis的文章请参见上:NoSQL数据库系列-Redis。该系列持续更新。
SpringBoot 与 Redis 发布/订阅实例结合?
最佳实践是使用RedisTemplate。关键代码如下:
// 发布
redisTemplate.convertAndSend("my_topic_name", "message_content");
// 配置订阅
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(xxxMessageListenerAdapter, "my_topic_name");
摘要
1。 Redis订阅频道信息由Redis服务器进程本身保存在pubsub_channels链表字典中。字典的KEY是订阅的频道,value是订阅的客户。
2。当发送者发送消息时,Redis服务器会向所有匹配该频道的客户端广播,然后将消息发送给订阅的客户端。
3。信息发送后,除订阅频道的客户和订阅相应频道的客户外,其他客户将不会收到该信息。
4。订阅模式分配频道和订阅模式订阅频道是两组相反的操作。
应用场景
俗话说,知识学得好不好,关键看怎么用。不管怎样,笔者看到Redis发布订阅模型的特点后,首先想到的就是它可以用来做一个实时聊天系统,也可以用于进程,分布式架构。利用Redis的实时发布功能,将值分发给每个写入程序快速快速地写入,保证分布式架构中数据的完全一致性。再比如博客系统和自媒体平台中的粉丝关注功能。例如,如果我现在有20万粉丝,当我发布文章时,我可以及时将文章推送到粉丝的客户端。总而言之,有很多应用场景需要大家多思考、多沟通。 参考来源:blog.csdn.net/w15558056319/article/details/121490953 pdai.tech/md/db/nosql-redis/dp-ubwww-redis-x.html .wenjians.com/doc/ mt0ueji7b8sc
版权声明
本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。