精华 一张图进阶 RocketMQ

“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:
本文是“一张图”系列的第一个板块:一张图解析。
本文是《一张图解析 》系列的第 1 篇,今天的内容主要分为三个部分:
整体架构
什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息 。那有了消息队列就得有人往里面放,有人往里面取 。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?
别急,先来回顾一下 “生产者-消费者模式” 这个老朋友 。简单来说,这个模型是由两类线程和一个队列构成:
有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产 。
这意味着什么呢,生产者和消费者之间实现解藕和异步 。这就厉害了,因为我们生活中很多都是异步的 。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽 。
具体 “生产者-消费者模式” 怎么实现,想必各位小学都学过了,我们来看看这个模式还有什么问题吧 。最大的问题就是我们小学学的 “生产者-消费者模式” 是个单机版的,只能自嗨 。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,然后我再从外卖队列里面去取,一顿操作猛如虎呀!于是就有了进化版,我们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式消息队列了 。

精华 一张图进阶 RocketMQ

文章插图
生产者生产的消息通过网络传递给队列存储,消费者通过网络从队列获取消息 。但是还有问题,消息可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧 。于是我们就需要区分不同类型消息,相同类型的消息称为一个 Topic 。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组和消费者组 。
但还是有问题呀,小区那么大,一个队列放不下 。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt 。于是物业在东南西北门各设了一个外卖快递放置点 。也就是我们有多个队列,组成队列集群 。
可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么知道送到哪里去,我又怎么知道去哪里取?很简单,导航呀 。我们把导航的信息称为路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取 。为这些路由信息的设置了管理员 ,当然也可以有很多个,组成集群 。
到这里,你就应该知道里面都穿了什么啦 。包括了生产者(),消费者(), 以及队列本身() 。是代理的意思,负责队列的存取等操作,我们可以把理解为队列本身 。
元数据管理
因为 、 和都需要和交互,负责的三此君不得不先和大家唠唠是何方神圣 。上面有说道是集群的元数据管理中心,那它到底管理了哪些元数据?我们来看看里面又穿了什么,看完了记得关注、转发、点赞、收藏哦 。
简单来说, 是我们的整个集群的元数据管理中心,负责集群元数据的增删改查 。先不管这个增删改查是怎么实现的,我们甚至可以理解就是数据库的增删改查,但是我们一定要知道这些元数据都长什么样子 。才能知道 、 及是如何根据这些数据进行消息收发的 。
如图所示,二主二从的集群相关的元数据信息,包括 、、、、 (暂时不用关注,图中未画出) 。
精华 一张图进阶 RocketMQ

文章插图
其他角色会主动向上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息 。
那么多数据,相信大家看的有点晕,三此君简单总结下: 通过来维护存活的。会获取上面的路由信息,发送消息的时候指定发送到哪个 Topic,根据 Topic 可以从选择一个 ,根据可以从获取到 IP 地址 。有了地址就可以将消息通过网络传递给。
消息收发示例部署
刚刚我们了解整体架构,那怎么样通过收发消息呢?需要先通过部署一套 :
如果你没有安装 ,可以根据菜鸟教程 MacOS安装/安装 进行安装 。然后,通过 - 部署 :
注意:如果你现在不了解不重要,只需要按照步骤部署好即可,并不会阻碍我们理解相关内容 。
部署完成后我们就可以在中看到相关容器,包括 、 及 ( 控制台),到这里我们就可以使用部署的收发消息了 。
已经部署好了,接下来先来看一个简单的消息收发示例,可以说是的 "Hello World" 。
消息发送
12345678910111213141516171819public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("Topic1","Tag", "Key","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);// 如果不再发送消息,关闭Producer实例 。producer.shutdown();}}
消息接收
【精华 一张图进阶 RocketMQ】1234567891011121314151617181920212223public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息erbconsumerijun.subscribe("sancijun", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();}}