十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
这篇文章给大家分享的是有关RocketMQ源码中如何实现注册服务器的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
成都网站设计、成都网站制作、外贸网站建设,成都做网站公司-创新互联已向数千家企业提供了,网站设计,网站制作,网络营销等服务!设计与技术结合,多年网站推广经验,合理的价格为您打造企业品质网站。
该类用于启动注册服务器。其main
方法委托了main0
方法,该方法的执行逻辑如下:
调用方法NamesrvStartup#createNamesrvController
创建一个NamesrvController
实例,声明为controller
。
调用方法NamesrvStartup#start
将这个controller启动。
那么下面就分别来看下两个方法的具体内容。
这个方法最重要的就是用构造方法创建了NamesrvController
对象。而在调用构造方法之前有较多的代码是用于解析命令行对象,以及可能的情况下读取文件中的配置信息、打印当前的整体配置信息。
这些额外配置不存在的时候,默认配置下,注册服务器是监听于9876端口。
该方法的作用是启动入参的NamesrvController
实例。具体来说,流程如下:
执行方法NamesrvController#initialize
进行初始化。
为运行时添加一个hook,在JVM关闭的时候,执行方法NamesrvController#shutdown
对注册服务器执行优雅关闭。
执行方法NamesrvController#start
启动注册服务器。
这个类用于控制注册服务器。
构造方法中主要是为了几个重要属性进行赋值操作。比如初始化kvConfigManager
和routeInfoManager
这两个重要的属性。
该方法用于初始化注册服务器,执行逻辑如下:
执行方法kvconfig.KVConfigManager#load
加载配置信息。默认情况下,加载 ${user.home}/namesrv/kvConfig.json 文件的内容到属性kvconfig.KVConfigManager#configTable
中。
新建一个NettyRemotingServer对象,为属性NamesrvController#remotingServer
赋值。这个新建的对象,使用了BrokerHousekeepingService
作为入参。该BrokerHousekeepingService
的作用就是在发生通道关闭、异常、空闲等情况时,将该通道从路由信息里删除。
创建一个线程池,赋值给属性NamesrvController#remotingExecutor
,用于注册服务器在Netty中的业务执行。
调用方法NamesrvController#registerProcessor
将业务处理器注册到RemotingServer
中。使用的线程池就是步骤3创建的线程池。
创建一个间隔时间为10秒的周期性任务,任务内容是调用方法RouteInfoManager#scanNotActiveBroker
扫描非激活模式的Broker
。
该方法没有更多内容,只是简单了启动了RemotingServer
。在这个方法之后,就可以开始监听Broker
上送的注册请求。
该类是注册服务器的配置存储类。会将配置信息存储在文件 ${user.home}/namesrv/kvConfig.json 。内部用来存储配置信息的是一个HashMap
结构,也就是两级结构。
第一级是命名空间,第二集是KV对,都是字符串形式。
该类的load
方法可以从文件中加载数据到内存里,persist
方法可以将内存中的数据再写入到文件中。
这个类是 rocketmq-namesrv 这个包下面,代码量最多的类了。因为业务处理都实现在了这个类上面。
按照NettyRequestProcessor
接口的实现套路,业务请求的分流都是在processRequest
方法中,这里也是,接下来就一个个看这个类支持的命令。
该命令没有请求体,请求头中有namespace
、key
、value
字段,调用方法kvconfig.KVConfigManager#putKVConfig
将配置项放入到配置管理器中即可。
该命令没有请求体,请求头中有namespace
、key
字段,调用方法kvconfig.KVConfigManager#getKVConfig
获取对应配置项。
如果配置项存在,返回成功响应。如果配置信息不存在,返回失败响应,响应码为QUERY_NOT_FOUND。
该命令没有请求体,请求头中有namespace
、key
字段,调用方法kvconfig.KVConfigManager#deleteKVConfig
删除对应配置项。
该命令用于查询注册服务器上Broker
的数据版本号。具体执行逻辑如下:
从命令的内容体解析出DataVersion
对象,从请求头中解析出BrokerAddr
数据。使用这两个作为入参,调用方法RouteInfoManager#isBrokerTopicConfigChanged
判断与服务器上该BrokerAddr
的版本号是否一致,将结果声明为changed
。
如果changed
为false
,表明版本号没有变化,那么服务器上的数据在当前时间还是有效的,调用方法RouteInfoManager#updateBrokerInfoUpdateTimestamp
更新这个数据的有效时间。
调用方法RouteInfoManager#queryBrokerTopicConfig
查询服务器上BrokerAddr
对应的版本号,声明为nameSeverDataVersion
。
构建命令响应对象,如果nameSeverDataVersion
不为null,则编码后设置到内容体。在响应头中设置changed
属性,值为步骤1产生的声明对象。
该命令用于Broker
信息的注册。首先获取请求头中MQ的版本号,如果版本号大于等于3.0.11,则调用方法processor.DefaultRequestProcessor#registerBrokerWithFilterServer
进行信息注册;否则调用方法processor.DefaultRequestProcessor#registerBroker
进行信息注册。
方法的执行逻辑如下:
对请求命令进行解码工作,创建出RegisterBrokerRequestHeader
对象。使用该对象对象和请求中的body字段执行crc校验,如果校验失败,返回系统错误响应。否则,继续后续流程。
如果命令请求对象中包含内容体,则解码出RegisterBrokerBody
对象,声明为registerBrokerBody
。如果命令请求对象不包含内容体,则手动创建RegisterBrokerBody
对象,并且将其DataVersion
的版本号设置为0,时间戳设置为0.
调用方法RouteInfoManager#registerBroker
注册路由信息,将结果声明为result
。
创建类型为RegisterBrokerResponseHeader
的响应头对象,声明为responseHeader
。将result
的masterAddr
和HaServerAddr
属性设置到响应头对象中。
从配置管理器中以ORDER_TOPIC_CONFIG作为命名空间,取出该命名空间下面的配置数据对象,编码后将二进制设置为响应的内容体。
返回响应对象。
与registerBrokerWithFilterServer
方法的流程基本一致,只不过在调用方法RouteInfoManager#registerBroker
的时候,入参的filterServerList
为null。
该命令用于注销 Broker 的注册。调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker
完成,而该方法内部则是委托给了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
。
该命名用于查询主题的路由信息,调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
。
该方法用于在路由管理器中根据主题名称获取全量的路由信息,具体流程如下:
使用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData
根据请求的主题名称得到类型为TopicRouteData
的结果,声明为topicRouteData
。
如果topicRouteData
不为null,则执行如下子流程。
如果配置org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable
开启,则从命名空间ORDER_TOPIC_CONFIG下面,获取入参主题名称的配置信息,声明为orderTopicConf
。将orderTopicConf
设置到属性org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf
。
将topicRouteData
进行编码,设置为响应的内容体,返回响应对象。
如果topicRouteData
为null,则返回TOPIC_NOT_EXIST响应。
该命令调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo
。该方法的逻辑就是调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo
得到一个编码后的内容体,将这个内容体设置为响应的内容体,返回响应对象即可。
编码的内容体数据结构类是ClusterInfo
,其属性如下
HashMapbrokerAddrTable; HashMap > clusterAddrTable;
该命令用于擦除Broker的写权限,也就说所有在该Broker
上的主题都没有写入权限了。调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker
实现,该方法的逻辑如下:
调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock
擦除入参Broker
的写权限,方法的返回值为擦除的队列信息个数。将结果声明为wipeTopicCnt
。
将wipeTopicCnt
设置到响应头的对应属性,返回响应。
该命令用于获取注册服务器上全量的主题信息,调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver
实现。
该方法内部调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList
获取所有的主题名称形成的列表,并且编码为二进制数组,设置为响应的内容体,将响应返回。
该命令用于删除服务器上的主题信息,通过方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv
实现。方法实现也简单,直接从topicQueueTable
中删除对应的主题名称即可。
该命令用于获取服务器上特定命名空间下的配置信息。通过方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace
获取到对应的配置信息,并且编码为二进制数组。
如果数组存在,则设置到响应的内容体中,返回成功响应。
如果数组不存在,则返回QUERY_NOT_FOUND响应。
该命令用于获取集群下所有的主题名称,调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster
完成。该方法内部调用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster
获取集群下的所有主题名称的编码结果,将编码结果的二进制数组设置到响应的内容体中,返回成功响应。
这个命令有点奇怪,看命令名称是获取系统主题列表。但是从方法实现上,内部的内容整体是混乱的。这个命令暂且放下,等看到相关联的请求查询的时候在处理。
该命令用于获取集群下,有unit标识的主题名称集合。通过方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList
实现,该方法内部调用了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics
来返回具备unit标识的主题名称集合的编码后二进制数组。将这个数组设置为响应的内容体,并且返回。
该命令用于获取集群下,有unit_sub标识的主题名称集合。做法上与GET_UNIT_TOPIC_LIST命令是相同的,只不过用的标识不同。
该命令用于获取集群下,同时有unit和unit_sub标识的主题名称集合。做法上与上述的一致,只不过用的标识不同。
这个命令是用于管理端直接发送配置的文本到注册服务,用于更新注册服务自身的配置,而后将配置信息持久化到磁盘文件。
这个命令用于获取注册服务的配置信息,将配置信息设置到响应的内容体中。
该类是路由信息的管理器,其中使用了多个类来抽象各种路由信息。下面先看下这些定义类。
QueueData
该类保存了Broker中的队列信息。有如下属性:
brokerName,Broker的名称,默认情况下是Broker所在机器的域名,可以由配置定义。
readQueueNums,用于读取的队列数量。
writeQueueNums,用于写入的队列数量。
perm,该Broker的权限信息,权限指的是是否可读、是否可写。
topicSynFlag,主题同步标识。
BrokerData
该类保存了Broker集群的地址信息,有如下属性:
cluster,集群标识。
brokerName,Broker名称。
brokerAddrs,brokerId和BrokerAddr的映射表。该属性存储了同一个Broker名称下id和地址的映射关系。
BrokerLiveInfo
该类保存了具体某个Broker的存活信息,有如下属性
lastUpdateTimestamp,最近一次数据更新时间。
dataVersion,该Broker的主题配置信息的版本号。
channel,Netty的Channel对象,该对象即是Broker与服务器之间的链接对象。
haServerAddr,高可用主节点地址。格式为${ip}:${port} 。
RouteInfoManager内部管理着5个Map结构,用于存储路由相关信息,这些信息用代码来看会更清晰一些,如下:
HashMap> topicQueueTable; HashMap brokerAddrTable; HashMap > clusterAddrTable; HashMap brokerLiveTable; HashMap /* Filter Server */> filterServerTable;
该方法用于实现Broker信息注册到路由管理器上,具体方法流程如下:
从clusterAddrTable
中以入参的clusterName
获取集群下所有Broker
的名称,声明为brokerNames
。
如果brokerNames
为null,则为其赋值一个空的HashSet
。并且在clusterAddrTable
放入这个clusterName
和brokerNames
两个值。
在brokerNames
中添加本次注册上来的Broker的名称。
从brokerAddrTable
以brokerName
获取BrokerData
对象,如果不存在则新建一个并且放入到brokerAddrTable
中。
取出步骤4中brokerData
中的brokerAddrs
映射,遍历其中的元素,如果值与入参的brokerAddr
相等,键与入参的brokerId
不等,则删除这个这一键值对。这种情况说明此时该IP对应的Broker信息已经发生了变化。
将入参的brokerId
和brokerAddr
放入到brokerAddrs
中。
如果brokerId为0也就是主节点,并且入参的topicConfigWrapper
不为null,也就是说Broker发送的注册命令是包含了请求体,那么执行子流程。否则继续后续流程。
从brokerLiveTable
查询该broker
的版本号,与topicConfigWrapper
的版本号对比,确认是否有变化。如果有变化,或者该Broker
是新注册的(brokerName
第一次注册或者brokerId
第一次注册),那么就很有可能本次携带了新的主题配置信息。则需要更更新注册服务器上主题配置信息。也就执行后续流程。否则结束子流程,继续执行步骤8.
遍历属性TopicConfigSerializeWrapper#topicConfigTable
,对集合中每一个元素调用方法RouteInfoManager#createAndUpdateQueueData
,更新主题对应的队列信息。
构建BrokerLiveInfo
对象,放入brokerLiveTable
中。
如果入参的filterServerList
不为null,则放入filterServerTable
。
如果brokerId不为0,也就是当前是从节点在注册自己,则从brokerAddrs
获取主节点的地址。如果主节点地址存在,则进一步获取其 HaServer 地址。将这两个数据设置到返回的结果对象result中。
返回结果对象result。从代码可以看出,如果当前注册不是从节点,或者对应的主节点不存在,则result是一个空对象。
该方法是用于创建或更新 topicQueueTable 中的QueueData
对象的。具体流程如下:
构建一个QueuData对象,里面的属性来自brokerName、topicConfig 对象。
从 topicQueueTable 中获取topicConfig 主题对应的 queueDataList 对象。
如果 queueDataList
不存在,意味着该主题是第一次出现在注册服务器中。构建一个新的linkedList
对象,添加queueData
对象到其中,并且将queueDataList
放入到topicQueueTable
中。流程结束。
如果queueDataList
存在,则对其元素遍历,执行如下子操作。
元素的brokerName
属性与入参的brokerName
值相同,则继续执行后续流程,否则进入下一次循环迭代。
判断元素与步骤1构建的对象是否相同,如果相同,不做操作;如果不同,意味着数据有变化,将元素从集合中删除。
如果步骤4中有元素被删除,则将步骤1的对象,添加到queueDataList
中。
该方法用于在删除路由管理器中某一个Broker
的信息。具体流程如下:
在brokerLiveTable
中删除该Broker
信息。
在filterServerTable
删除该Broker
的信息。
声明一个局部变量removeBrokerName
。从brokerAddrTable
获取该BrokerName
对应的brokerData
。如果其不为空,则执行子流程。
从brokerData
的brokerAddrs
删除该brokerId
对应的映射。
如果brokerAddrs
集合为空,则从brokerAddrTable
删除该brokerName
对应的映射。为removeBrokerName
赋值true
。
如果removeBrokerName
为真,则执行子流程,否则流程结束。
从clusterAddrTable
获取该clusterName
对应的brokerName
的集合,声明为nameSet
。
nameSet
不为null的情况下,从nameSet
删除本次的brokerName
。如果删除后nameSet
为空,则从clusterAddrTable
删除该brokerName
的映射。
调用方法removeTopicByBrokerName
删除brokerName
对应的主题信息。
该方法用于删除brokerName
对应的主题配置信息,具体执行逻辑如下:
遍历topicQueueTable
,为每一个元素执行后续逻辑。
针对每一个元素,取出其QueueData
列表,遍历该对象。执行子流程。
遍历QueueData
列表,如果元素QueueData
的brokerName
与入参brokerName
相同,则从列表中删除该元素。
遍历完毕后,如果列表为空,则从topicQueueTable
中删除该映射。
首先来看下数据结构对象TopicRouteData
的定义,其属性如下
String orderTopicConf; ListqueueDatas; List brokerDatas; HashMap /* Filter Server */> filterServerTable
从数据结构对象也可以简单的推测出pickupTopicRouteData
方法的实现逻辑。大致来说分为几个步骤:
从topicQueueTable
按照主题名称查询queueDatas
。
根据queueDatas
中每一个元素QueueData
的brokerName
属性从brokerAddrTable
取得brokerData
对象,组成成一个List,也就是brokerDatas
。
根据步骤2的brokerDatas
从filterServerTable
查询到对应的filterServer
列表,组装为映射。
将步骤1到3的值组装为TopicRouteData
对象返回给调用者。
该方法会以可中断的方式获取写锁,获取成功后调用方法wipeWritePermOfBroker
。如果获取失败则返回0,获取成功则执行方法wipeWritePermOfBroker
执行擦除工作。
wipeWritePermOfBroker
方法的内容也很简单,遍历topicQueueTable
,针对每一个元素,在遍历其QueueData
,如果brokerName
与入参的brokerName
相同就意味着找到对应的QueueData
。将这个里面的perm
属性重新设置值,去掉代表写权限的标志位即可。
该方法用于获取具备unit标识的主题名称集合。具体流程如下:
以可中断的方式获取读锁。遍历topicQueueTable
元素。
如果键值对中的QueueData
列表的首个元素的topicSynFlag
属性值包含了unit标识,将这个键值对的key
,也即是主题名称加入到临时集合中。
遍历完后后,返回临时集合编码的二进制数组。
当一个Broker的通道关闭的时候,会触发到这个方法。这个方法的代码虽然比较多,但是方法思路很简单,首先通过Channel在brokerLiveTable
中找到对应的BrokerLiveInfo对象。并且依靠这个对象的信息,在路由管理器中删除所有相关的信息接口。
感谢各位的阅读!关于“RocketMQ源码中如何实现注册服务器”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!