Nacos服务订阅流程全解析
@[toc]
# 引言
本文将基于之前源码搭建系列的前置步骤针对nacos服务订阅流程源码进行深入梳理和分析,希望对你有帮助。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 客户端发起服务订阅
为了方便讲解,笔者基于CommandLineRunner 这个扩展点主动在服务完成初始化之后通过NamingService 的subscribe发起服务订阅请求:
@Component
public class TestRunner implements CommandLineRunner {
private final static Logger log = LoggerFactory
.getLogger(TestRunner.class);
@Override
public void run(String... args) throws Exception {
//主动向nacos发起服务订阅请求
NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
//主动订阅nacos-provider这个服务的实例信息
naming.subscribe("nacos-provider", event -> {
if (event instanceof NamingEvent) {
//日志打印监听到的服务名称和结果
log.info("监听到服务名称:{},实例信息:{}", ((NamingEvent) event).getServiceName(),
((NamingEvent) event).getInstances());
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
查看NamingService 的subscribe源码可知,该方法会基于我们给定的服务名称以及分组等信息主动发起RPC主动向nacos获取nacos-provider的实例信息:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
//......
//基于RPC代理发起服务订阅
clientProxy.subscribe(serviceName, groupName, clusterString);
}
2
3
4
5
6
7
而clientProxy(底层就是NamingClientProxyDelegate)的执行逻辑也比较简单:
- 从缓存管理器
serviceInfoHolder中尝试获取需要订阅的服务信息。 - 如果不存在或者未订阅则向
nacos发起RPC请求。 - 通过
serviceInfoHolder缓存订阅的服务实例信息。

对应的我们给出NamingClientProxyDelegate获取服务实例的源码段,和上述语义一致,读者可以参考注释了解一下过程:
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
//......
//查看缓存中是否存在订阅的服务实例
//......
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//如果不存在或者未订阅则发起RPC请求
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//将查询或者请求结果缓存到本地serviceInfoMap中
serviceInfoHolder.processServiceInfo(result);
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
默认情况下,我们的缓存是没有订阅服务的信息的,所以会触发RPC请求,然后nacos就会返回当前服务的元信息,如下便是从nacos服务端返回的nacos-provider实例节点信息,后续如果需要调用,都会基于这份元信息发起请求:

我们也可以通过抓包工具看到这个请求的详细内容,如下图所示,可以看到笔者基于nacos客户端端口64393作为源端口号,nacos服务端端口号即9848作为目的端口进行抓取捕获到的TCP网络包,可以看到这个订阅的RPC接口请求参数详情:

解码后即可看到这个请求的数据就是源码调试时看到的请求参数,也就是对于nacos-provider的订阅:

基于服务端的返回结果,客户端会进行如下操作:
- 将实例信息更新到本地缓存。
- 查看对应服务实例在本地缓存中的数据,并和服务端响应的数据进行比对,若一致则说明实例信息没有更新,直接返回,如果发现不一致,进入步骤3。
- 则说明服务发生变化,则基于零拷贝将响应结果写入本地持久化,便于后续服务重启恢复数据。

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//获取原有服务信息
String serviceKey = serviceInfo.getKey();
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//......
//以服务名作为key,实例信息作为value写入缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//比对实例,如果不一致则说明服务发生变化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//......
if (changed) {
//......
//如果发生变化则基于零拷贝刷盘技术将订阅的服务信息写入本地
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Nacos基于缓存发起回复并注册该客户端
了解了客户端的订阅请求之后,我们再来聊聊nacos是如何处理该请求的,在之前的文章中我们说过,nacos是通过grpcCommonRequestAcceptor处理客户端的RPC请求,对应服务订阅请求,服务端收到的将会收到一个类型为SubscribeServiceRequest 的服务订阅请求:

找到对应的处理器SubscribeServiceRequestHandler之后,grpcCommonRequestAcceptor会从RPC请求报文中拿到参数交由该处理器进行处理:

对应的我们也给出nacos服务端处理客户端服务订阅请求的代码段,即位于GrpcRequestAcceptor的request方法:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
//......
traceIfNecessary(grpcRequest, true);
//需要使用的服务器类型,例如服务注册就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type到找到对应的请求处理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
//......
Request request = (Request) parseObj;
try {
//......
//解析参数并处理该请求
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
随后SubscribeServiceRequestHandler的handle就会执行如下步骤:
- 基于参数生成要获取的服务请求参数
service,service包含要请求的服务名称、命名空间、分组等信息。 - 从缓存
serviceStorage管理的缓存serviceDataIndexes中查询出对应服务实例即nacos-provider的所有实例信息。 - 过滤出有效即健康可用的实例响应给客户端。

对应SubscribeServiceRequestHandler的源码如下,读者可参考注释查阅:
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
//解析参数信息
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//基于参数生成服务元信息,即要获取的服务信息
Service service = Service.newService(namespaceId, groupName, serviceName, true);
//......
//基于serviceStorage从缓存中拿到订阅的服务信息,再通过selectInstancesWithHealthyProtection筛选出健康的示例
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());
//......
//返回结果
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 小结
自此我们通过源码阅读、网络抓包、断点调测查看等方式对nacos服务订阅的源码进行的较为详细的分析,希望对你有帮助。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。