Nacos服务变更推送流程全解析
@[toc]
# 写在文章开头
我们以Nacos 2.3.x版本为例,作为服务注册中心,它的服务注册和服务下线实时性感知相较于Consul的Raft一致性确认,亦或者Eureka定时拉取同步的机制来说做了很好的折中,既避免了实现的复杂性又能保证较好的实时性,所以本文将从一次服务下线的请求结合并结合源码分析的方式来讲解一下Nacos服务实例状态变更时是如何实时推送的服务消费者的。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 项目架构介绍
# 服务通信流程
这里我们先简单介绍的一下项目的架构,笔者将基于Nacos的源码搭建一套基础的服务注册中心,然后提供两个nacos-provider服务作为服务提供者(端口号分别是9001和9002),而nacos-consumer作为服务消费者通过nacos或者nacos-provider的信息发起服务调用。

基于这个架构,我们会严格按照如下步骤完成实验和源码分析:
- 将
9002端口的服务提供者下线,查看服务提供者如何完成服务下线通知。 - 查看
nacos收到该请求后,内部如何处理该消息,并将消息通知给消费者。 - 消费者收到该请求后,如何更新本地缓存。
# 服务提供者查询接口
这里我们也给出服务提供者nacos-provider的http接口,可以看到该接口会返回当前服务的名称和端口号:
@GetMapping("/provide")
public String provide() {
log.info("请求打到服务提供者provide上");
Map<String, String> map = new HashMap<>();
map.put("provider", env.getProperty("spring.application.name"));
map.put("port", env.getProperty("server.port"));
return JSONUtil.toJsonStr(map);
}
2
3
4
5
6
7
8
# 服务消费者
而服务消费者也通过feign声明引入该调用:
@FeignClient(name = "nacos-provider")
public interface NacosProvider {
@GetMapping("/provide")
String provide();
}
2
3
4
5
6
7
8
后续我们就可以通过服务提供者的test接口调用到nacos-provider的provide接口:
@Resource
private NacosProvider nacosProvider;
@GetMapping("/test")
public String test() {
return nacosProvider.provide();
}
2
3
4
5
6
7
需要注意的是,nacos-consumer如果没有显示调用nacos-provider是不会订阅该提供者的所有实例信息,所以我们为了方便索性在服务启动时主动发起订阅:
@Component
public class TestRunner implements CommandLineRunner {
private final static Logger log = LoggerFactory
.getLogger(TestRunner.class);
@Override
public void run(String... args) throws Exception {
//主动向nacos发起服务订阅请求
NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
naming.subscribe("nacos-provider", event -> {
if (event instanceof NamingEvent) {
//日志打印监听到的服务名称和结果
log.info("监听到服务名称:{},实例信息:{}", ((NamingEvent) event).getServiceName(),
((NamingEvent) event).getInstances());
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 基于服务提供者下线详解Nacos实例状态推送
# 服务提供者优雅关闭并推送服务下线消息
基于上述架构,我们通过IDEA将9002的服务提供者关闭,注意如果用IDEA停止按钮操作就会断开调试的连接,我们就无法调试服务下线的源码,正确是做法是如下代码的方式主动获取启动时的上下文通过close方法显示关闭:
@SpringBootApplication
@EnableDiscoveryClient
public class NacosProviderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(NacosProviderApplication.class, args);
//主动将springboot容器关闭
context.close();
}
}
2
3
4
5
6
7
8
9
10
11
Spring上下文close方法执行关闭操作,此时spring就会遍历所有的虚拟机钩子即shutdown hook,对应我们的服务提供者在启动时注册的shutdown Hook即NacosAutoServiceRegistration的close方法就会发起服务下线请求,一旦完成服务下线请求通知之后,服务提供者就会销毁RPC连接以及所有工作线程:

对应的我给出这个close方法的入口,因为NacosAutoServiceRegistration继承自AbstractAutoServiceRegistration,所以它继承了这个抽象类的shutdown hook方法destroy,这就使得spring boot容器关闭后,就会触发下面这个方法:
@PreDestroy
public void destroy() {
stop();
}
2
3
4
此时,这个stop方法在进行CAS乐观锁状态修改后,执行如下两件事:
- 发起
RPC下线请求。 - 销毁相关工作线程和nacos维护的RPC连接。
public void stop() {
if (this.getRunning().compareAndSet(true, false) && isEnabled()) {
//发起RPC下线请求
deregister();
//......
//销毁相关工作线程和nacos维护的RPC连接
this.serviceRegistry.close();
}
}
2
3
4
5
6
7
8
9
此时deregister会通过getRegistration拿到nacos的元信息,再通过NacosServiceRegistry的deregister发起服务下线请求:
protected void deregister() {
this.serviceRegistry.deregister(getRegistration());
}
2
3
最终就会走到NacosNamingService的deregisterInstance,很直观的看到,它通过RPC代理clientProxy传入服务名、分组、和实例信息并调用deregisterService发起服务下线请求:
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
clientProxy.deregisterService(serviceName, groupName, instance);
}
2
3
4
对应我们给出发起调用时传入的参数信息:

# Nacos服务端基于RPC推送服务下线
随后Nacos服务端GrpcRequestAcceptor收到该请求后,流程比较长,执行如下步骤:
- 基于请求定位到处理器
RequestHandler以服务下线为例就是InstanceRequestHandler。 InstanceRequestHandler发布一个ClientDeregisterServiceEvent事件,交由NotifyCenter投递到任务队列中。NamingEventPublisher从队列获取到这个任务之后,找到ClientServiceIndexesManager处理该事件。ClientServiceIndexesManager还是发布一个ServiceChangedEvent到上述的阻塞队列中。NamingSubscriberServiceV2Impl将其封成一个延迟任务提交到tasks中。- 此时有个100ms执行一次的定时器也就是
PushDelayTaskExecuteEngine,将任务取出分发给TaskExecuteWorker,这个执行者就会生成RPC请求将服务状态变更通知给所有服务消费者。
总体来说,Nacos服务端收到下线请求后,为避免下线通知影响服务端整体性能,其内部设计了一套非常好的事件通知订阅模型,当服务端收到请求后,其内部会根据请求类型找到相应的处理器发布事件,让对应的订阅者异步处理该消息。基于该消息最终会封装成指定类型的任务,提交到工作线程池中的某个worker的队列中让其异步消费,由此种大量解耦结合线程池的方式基于了nacos服务端最大的吞吐量和调优空间。
对应的我们也给出整体的业务流程图,读者可以参考该图了解一下全过程:

对应我们找到GrpcRequestAcceptor的request方法,可以看到它会基于该请求找到对应的处理器,然后调用处理器的handleRequest方法处理该请求:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
traceIfNecessary(grpcRequest, true);
//需要使用的服务器类型,例如服务下线就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type到容器中获取到响应的处理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
Request request = (Request) parseObj;
try {
//组装连接信息
Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
requestMeta.setAbilityTable(connection.getAbilityTable());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
//实际处理rpc请求的方法
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
于是就找到了InstanceRequestHandler,该方法就会通过clientOperationService(也就是EphemeralClientOperationServiceImpl)发布ClientDeregisterServiceEvent事件:
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
//基于ClientDeregisterServiceEvent发布服务下线事件
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
//.....
}
2
3
4
5
6
随后NamingEventPublisher收到该事件后调用handleEvent找到对应的事件处理器处理器该事件:
private void handleEvents() {
while (!shutdown) {
try {
//取出上述的任务
final Event event = queue.take();
handleEvent(event);//处理发布的事件
} catch (InterruptedException e) {
//......
}
}
}
2
3
4
5
6
7
8
9
10
11
如下便是笔者的调试记录,可以看到服务下线事件定位到了ClientServiceIndexesManager这个管理器进行处理:

于是就来到了ClientServiceIndexesManager的onEvent方法,再次发布一个ServiceChangedEvent事件到上述提到的同一个阻塞队列中:
@Override
public void onEvent(Event event) {
if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
} else if (event instanceof ClientOperationEvent) {//处理服务注册或者下线后的事件
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理服务注册事件,实际上就是发布一个ServiceChangedEvent事件
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { //......
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//......
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
队列会通知NamingSubscriberServiceV2Impl进行处理,它会将事件推送到延迟队列中,这个队列内部是采用并发安全的ConcurrentHashMap进行管理。
@Override
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {//给客户端的服务改变事件
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
//将处理事件推送到队列中
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service);
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
任务提交之后就会另外一个线程processingExecutor(100ms处理一次)会将其取出后找到任务处理器处理PushDelayTaskExecuteEngine,随后,这个任务处理引擎将任务交给NacosExecuteTaskExecuteEngine这个任务处理引擎:
protected void processTasks() {//通过remove拿出队列的数据
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
//......
//找到相应处理器即PushDelayTaskExecuteEngine
NacosTaskProcessor processor = getProcessor(taskKey);
try {
// ReAdd task if process failed
if (!processor.process(task)) {//PushDelayTaskExecuteEngine将任务交给NacosExecuteTaskExecuteEngine这个任务处理引擎
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
//......
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
最后PushDelayTaskExecuteEngine会将该任务交给NacosExecuteTaskExecuteEngine中的某个工作线程TaskExecuteWorker的阻塞队列中,最后TaskExecuteWorker就会取出该任务并消费:
@Override
public void run() {
while (!closed.get()) {
try {
//取出任务并处理
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
//......
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e, e);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
最后这个服务下线的任务即PushExecuteTask就会遍历所有客户端并通知它们nacos-provider下线:
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {//逐个遍历客户端,然后事件推送
Client client = clientManager.getClient(each);
//......
//通过RPC接口推送服务下线通知
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));//发起RPC通知消费者
}
} catch (Exception e) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 消费者更新实例缓存
服务消费者RpcClient收到该请求后,会基于请求类型定位到服务端请求处理器,以我们下线通知为例就是NamingPushRequestHandler,由该处理器更新客户端中记录9002端口号的服务提供者nacos-provider状态更新为下线,后续服务消费者看到缓存中记录的提供不可用时就会调用9001端口号的nacos-provider:

对应的我们给出RpcClient的处理服务端请求的方法handleServerRequest,该方法会遍历所有的服务端请求处理器,只要有一个处理器处理结果非空,就说明找到相应的处理器处理了,直接将响应结果返回:
protected Response handleServerRequest(final Request request) {
//.....
//遍历所有的服务端请求处理器
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try {
//交给该处理器看看能否处理,若能处理则返回值非空
Response response = serverRequestHandler.requestReply(request);
//若非空说明处理完成,直接返回结果
if (response != null) {
//.....
return response;
}
} catch (Exception e) {
//.....
}
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
实际上上述的步骤会走到NamingPushRequestHandler处理服务端下线请求,该处理器会调用serviceInfoHolder将请求中的实例信息更新到缓存中,由此保证客户端可以完成正确的服务调用:
@Override
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
//从请求中拿到服务实例信息,并调用processServiceInfo更新缓存
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
2
3
4
5
6
7
8
9
10
最终ServiceInfoHolder的processServiceInfo就会基于入参拿到服务示例信息,并将缓存更新,然后发布一个实例更新的事件并将更新结果持久化到磁盘中:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//拿到服务实例缓存
String serviceKey = serviceInfo.getKey();
//若为空直接返回
if (serviceKey == null) {
return null;
}
//取出缓存中原有缓存信息
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//......
//基于请求更新缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//比对新旧缓存变化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//......
//如果缓存发生变化,则发布一个实例更新的事件InstancesChangeEvent,并将更新结果采用零拷贝的方式持久化到磁盘中
if (changed) {
//.....
//发布实例更新事件
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//零拷贝持久化
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
对应的我们给出下线9002端口的nacos-provider下线的请求值,可以看到基于这个结果,服务端给出的可用服务实例值仅有9001号端口的nacos-provider,服务消费者基于此信息更新缓存,保证了服务消费的正确性:

# 小结
本文以代码示例为导向通过源码的方式完成了Nacos服务实例状态变更推送的讲解,这里笔者也简单的补充一下个人对于源码阅读的一些技巧:
- 在阅读源码前,明确了解项目的设计理念和原理,即对项目有个基础的认知。
- 以问题为导向针对性的进行调试理解。
- 适当查找一些高质量的源码分析文章,针对性的梳理源码结构。
- 如果能够明确源码的最终断点,我们可以采用以终为始的方式,在目标断点上打住,结合调试的栈帧了解整体调用过程。
- 调试过程中注意观察各个类之间的继承、聚合等关系,以便梳理设计架构和理念。
- 最后一点,建议直接拉取源码进行调试,方便注释。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 参考
✨Nacos2✨服务订阅与推送✨:https://juejin.cn/post/7381333093548720166 (opens new window)
✨Nacos2✨服务订阅与推送✨:https://juejin.cn/post/7442993300851081243 (opens new window)