[Nacos] Nacos Server处理订阅请求 (九)

news/2024/2/29 3:11:08

文章目录

      • 1.InstanceController#list()
      • 2.InstanceController#doSrvIpxt()
      • 3.总结

1.InstanceController#list()

Nacos Server处理订阅请求

在这里插入图片描述

主要还是从请求中获取参数, 比如namespceId、serviceName、agent(指定提交请求的客户端是哪种类型)、clusters、clusterIP、udpPort(后续UDP通信会使用)、app、tenant, 最后调用方法对参数进行处理

2.InstanceController#doSrvIpxt()

对请求进行详细处理

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {// 不同agent,生成不同的clientInfoClientInfo clientInfo = new ClientInfo(agent);// 创建一个JSON Node,其就是当前方法返回的结果。后续代码就是对这个Node的各种初始化ObjectNode result = JacksonUtils.createEmptyJsonNode();// 从注册表中获取当前服务Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {// 创建当前发出订阅请求的Nacos client的UDP Client, PushClient// 注意,在Nacos的UDP通信中,Nacos Server充当的是UDP Client,Nacos Client充当的是UDP ServerpushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}// 若注册表中没有该服务,则直接结束if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);// 注意,hosts为空result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}// 代码直到这里,说明注册表中存在该服务// 检测该服务是否被禁。若是被禁的服务,直接抛出异常checkIfDisabled(service);List<Instance> srvedIPs;// 获取到当前服务的所有实例,包含所有持久/临时实例srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// filter ips using selector:// 若选择器不空,则根据选择算法选择可用的intance列表,默认情况下,选择器不做任何过滤if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}// 若最终选择的结果为空,则直接结束if (CollectionUtils.isEmpty(srvedIPs)) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);// 注意,hosts为空result.set("hosts", JacksonUtils.createEmptyArrayNode());result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}// 代码走到这里,说明具有可用的instanceMap<Boolean, List<Instance>> ipMap = new HashMap<>(2);// 这个map只有两个key,True与False// key为true的value中存放的是所有健康的instance// key为false的value存放的是所有不健康的instanceipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());// 根据instance的健康状态,将所有instance分流放入map的不同key的value中for (Instance ip : srvedIPs) {// 这个语句写的非常好// 健康加入健康的列表, 不健康的加入不健康的列表ipMap.get(ip.isHealthy()).add(ip);}// isCheck为true,表示需要检测instance的保护阈值if (isCheck) {// reachProtectThreshold 是否达到了保护阈值, false 为没有达到result.put("reachProtectThreshold", false);}// 获取服务的保护阈值double threshold = service.getProtectThreshold();// 若  "健康instance数量/instance总数" <= 保护阈值,则说明需要启动保护机制了if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);if (isCheck) {// true表示启动保护机制result.put("reachProtectThreshold", true);}// 健康数量小于阈值, 则从所有实例中调用, 可能会有不健康实例, 可以保证健康实例不被压崩溃// 将所有不健康的instance添加到的key为true的instance列表,// 即key为true的value中(instance列表)存放的是所有instance实例// 包含所有健康的与不健康的instanceipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));// 清空key为false的value(不健康的instance列表)ipMap.get(Boolean.FALSE).clear();}if (isCheck) {result.put("protectThreshold", service.getProtectThreshold());result.put("reachLocalSiteCallThreshold", false);return JacksonUtils.createEmptyJsonNode();}ArrayNode hosts = JacksonUtils.createEmptyArrayNode();// 注意,这个ipMap中存放着所有健康与不健康的instance列表for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();// 若客户端只要健康的instance,且当前遍历的map的key为false,则跳过if (healthyOnly && !entry.getKey()) {continue;}// 遍历的这个ips可能是所有不健康的instance列表,// 也可能是所有健康的instance列表,// 也可能是所有健康与不健康的instance列表总和for (Instance instance : ips) {// 跳过禁用的instanceif (!instance.isEnabled()) {continue;}ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();// 将当前遍历的instance转换为JSONipObj.put("ip", instance.getIp());ipObj.put("port", instance.getPort());// deprecated since nacos 1.0.0:ipObj.put("valid", entry.getKey());ipObj.put("healthy", entry.getKey());ipObj.put("marked", instance.isMarked());ipObj.put("instanceId", instance.getInstanceId());ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put("enabled", instance.isEnabled());ipObj.put("weight", instance.getWeight());ipObj.put("clusterName", instance.getClusterName());if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {ipObj.put("serviceName", instance.getServiceName());} else {ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral", instance.isEphemeral());hosts.add(ipObj);}  // end-for} // end-forresult.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}
  1. 不同agent,生成不同的clientInfo, java、c、c++、go、nginx、dnsf
    在这里插入图片描述

  2. pushService.addClient(): 创建当前发出订阅请求的Nacos client的UDP Client, PushClient, Nacos Server充当的是UDP Client,Nacos Client充当的是UDP Server
    在这里插入图片描述
    在这里插入图片描述
    获取到了UDP通信客户端PushClient, 并写入到一个缓存map中

    public void addClient(PushClient client) {// client is stored by key 'serviceName' because notify event is driven by serviceName changeString serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());// clientMap是一个缓存map,用于存放当前Nacos Server中所有instance对应的UDP Client// 其是一个双层map,外层map的key为  namespaceId##groupId@@微服务名称,value为内层map// 内层map的key为代表一个instance的字符串,value为该instance对应的UDP Client,即PushClientConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);// 若当前服务的内层map为null,则创建一个并放入到缓存mapif (clients == null) {clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));clients = clientMap.get(serviceKey);}PushClient oldClient = clients.get(client.toString());// 从内层map中获取当前instance对应的的PushClient,// 若该PushClient不为null,则更新一个最后引用时间戳;// 若该PushClient为null,则将当前这个PushClient作为PushClient// 写入到内层map,即写入到了缓存mapif (oldClient != null) {// 更新最后引用时间戳oldClient.refresh();} else {PushClient res = clients.putIfAbsent(client.toString(), client);if (res != null) {Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());}Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());}}
  1. 获取当前服务的所有实例, 包括持久和临时
    在这里插入图片描述
    public List<Instance> srvIPs(List<String> clusters) {if (CollectionUtils.isEmpty(clusters)) {clusters = new ArrayList<>();clusters.addAll(clusterMap.keySet());}// 获取到当前服务的所有cluster中的所有instancereturn allIPs(clusters);}public List<Instance> allIPs(List<String> clusters) {List<Instance> result = new ArrayList<>();for (String cluster : clusters) {Cluster clusterObj = clusterMap.get(cluster);if (clusterObj == null) {continue;}// 将当前遍历cluster的所有instance添加到result集合// 包含所有持久实例与临时实例result.addAll(clusterObj.allIPs());}return result;}public List<Instance> allIPs() {List<Instance> allInstances = new ArrayList<>();// 持久实例allInstances.addAll(persistentInstances);// 临时实例allInstances.addAll(ephemeralInstances);return allInstances;}

3.总结

Nacos Server处理订阅请求的主要任务:

  1. 创建了Nacos Client对应的UDP通信客户端PushClient, 并写入一个缓存map
  2. 从注册表中获取到指定服务的所有可用的instance, 并封装为Json

http://www.ppmy.cn/news/98674.html

相关文章

数据结构基础内容-----第五章 串

文章目录 串串的比较串的抽象数据类型串的顺序存储结构朴素的额模式匹配算法kmp模式匹配算法 串 在计算机编程中&#xff0c;串&#xff08;String&#xff09;是指由零个或多个字符组成的有限序列。它是一种基本的数据类型&#xff0c;在许多编程语言中都得到了支持和广泛应用…

数据库基础——5.运算符

这篇文章我们来讲一下SQL语句中的运算符操作。 说点题外话&#xff1a;SQL本质上也是一种计算机语言&#xff0c;和C&#xff0c;java一样的&#xff0c;只不过SQL是用来操作数据库的。在C&#xff0c;java中也有运算符&#xff0c;这两种语言中的运算符和数学中的运算符差距不…

《数据库应用系统实践》------ 包裹信息管理系统

系列文章 《数据库应用系统实践》------ 包裹信息管理系统 文章目录 系列文章一、需求分析1、系统背景2、 系统功能结构&#xff08;需包含功能结构框图和模块说明&#xff09;3&#xff0e;系统功能简介 二、概念模型设计1&#xff0e;基本要素&#xff08;符号介绍说明&…

内存泄漏、指针越界

内存泄漏 内存泄漏是指在程序运行时&#xff0c;分配给程序使用的内存空间没有被及时地释放回系统&#xff0c;一直占用着系统资源&#xff0c;导致系统内存不足&#xff0c;最终导致系统性能下降或者程序崩溃等问题。 内存泄漏分类 1. 堆内存泄漏&#xff1a; 指在程序运行期间…

yum命令安装jenkins(推荐使用yum安装,以后升级方便)

1.安装jdk&#xff1a; yum install java 2.检查jdk&#xff1a; java -version 3.linux中执行命令(设置配置文件) sudo wget -O /etc/yum.repos.d/jenkins.repo https://pkg.jenkins.io/redhat-stable/jenkins.repo sudo rpm --import https://pkg.jenkins.io/redhat-stable/…

vcruntime140.dll无法继续执行代码如何修复,使用这个方法不求人

VCRUNTIME140.dll 是由微软公司开发的一个库文件&#xff0c;属于 Visual C Redistributable 软件包的一部分。它包含了许多与 C 应用程序运行时相关的函数和数据类型。这些函数和数据类型包括内存管理、异常处理、文件 I/O 等等。如果您在运行某个程序时发现缺少了 VCRUNTIME1…

RAW、RGB 、YUV三种图像格式理解

文章目录 1. 背景2. 相关概念2.1 颜色与色彩空间2.2 RAW图像2.3 RGB图像2.4 YUV图像 3. 分类简图 RAW、RGB 、YUV三种图像格式理解 1. 背景 在工作中&#xff0c;经常听到用来描述图像格式的RAW&#xff0c;RGB与YUV&#xff0c;但一直没有系统的进行了解&#xff0c;处于局部认…

Vue自定义插件的使用

通过 Vue 实例绑定方法&#xff1a; 在 plugins.js 文件中创建 filter 过滤器&#xff0c;定义一个只返回前四个字符的方法。 export default {install(Vue){// 定义过滤器Vue.filter(mySlice,function(value){return value.slice(0,4);})} } 由于我们之前在 main.js 文件中引入…

集货运输优化:数学建模步骤,Python实现蚁群算法(解决最短路径问题), 蚁群算法解决旅行商问题(最优路径问题),节约里程算法

目录 数学建模步骤 Python实现蚁群算法(解决最短路径问题) 蚁群算法解决旅行商问题(最优路径问题) 节约里程算法

ICV报告:中国的数字经济与5G市场研究报告

近日&#xff0c;专注于前沿科技领域的国际咨询机构ICV发布了《中国的数字经济与5G市场研究报告》。报告指出&#xff0c;随着5G商用的发展&#xff0c;5G对经济社会的影响逐步显现&#xff0c;其影响突出体现在对数字产业发展的带动上。随着5G应用的不断创新与扩散&#xff0c…

27:尽量少做转型动作

转型&#xff08;cast&#xff09;破坏了类型系统。那可能导致任何种类的麻烦&#xff0c;有些容易辨识&#xff0c;有些非常隐晦。 一、转型语法 &#xff08;一&#xff09;不同风格的语法 1.C风格语法 C风格的转型动作&#xff1a; (T)expression//将expression转型为T …

K8s in Action 阅读笔记——【5】Services: enabling clients to discover and talk to pods

K8s in Action 阅读笔记——【5】Services: enabling clients to discover and talk to pods 你已了解Pod以及如何通过ReplicaSets等资源部署它们以确保持续运行。虽然某些Pod可以独立完成工作&#xff0c;但现今许多应用程序需要响应外部请求。例如&#xff0c;在微服务的情况…

win10 nvprof的性能分析表

交叉访问是全局内存中最糟糕的访问模式&#xff0c;因为它浪费总线带宽 使用多个线程块对基于交叉的全局内存访问重新排序到合并访问 https://mp.weixin.qq.com/s/h2XKth1bTujnrxyXTJ2fwg <<<numBlocks, blockSize>>> 的两个参数应该怎么设置好呢。首先&…

Redis的SDS+IntSet+Dict

一)SDS 在redis中&#xff0c;保存key的是字符串&#xff0c;value往往是字符串或者是字符串的集合&#xff0c;可见字符串是redis中最常用的一种数据结构: 但是在redis中并没有直接使用C语言的字符串&#xff0c;因为C语言的字符串存在很多问题 1)获取字符串的长度需要通过运算…

stc15w404as使用keil做库,提供头文件,供调用

背景 有个项目使用需要使用库&#xff0c;将代码封装起来&#xff0c;仅仅留下调试接口&#xff0c;给用户使用&#xff0c;调试一些参数。这样工程看起来更简单&#xff0c;也方便客户维护。 也有一些使用场景&#xff0c;需要把自己的代码封装起来&#xff0c;这个是怕被别…

LeetCode 周赛 347(2023/05/28)二维空间上的 LIS 最长递增子序列问题

本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 提问。 往期回顾&#xff1a;LeetCode 单周赛第 346 场 仅 68 人 AK 的最短路问题 周赛 347 概览 T1. 移除字符串中的尾随零&#xff08;Easy&#xff09; 标签&#xff1a;模拟、…

HTTP协议深入理解+如何使用Fiddler抓包

博主简介&#xff1a;想进大厂的打工人博主主页&#xff1a;xyk:所属专栏: JavaEE初阶 目录 文章目录 一、HTTP概述 1.1 什么是HTTP 1.2 理解应用层协议 二、抓包工具fiddler的使用 2.1 几个需要注意的点 2.2 fiddler的原理 2.3 fiddler的使用技巧 三、HTTP请求&#xff08;Re…

第十六届全国大学生信息安全竞赛创新实践能力赛(CISCN)

目录 Misc 1、被加密的生产流量 Crypto 2、Sign_in_passwd Web 3、unzip 4、dumpit Re 5、babyRE Pwn 6、funcanary Misc 1、被加密的生产流量 下载附件解压后是一段流量&#xff0c;使用wireshark打开 最开始做的时候找错了方向&#xff0c;追踪到了另一个东西 …

Spark入门介绍

目录 一、Spark框架概述 1、Spark简介 2、发展 二、Spark功能及特点 1、定义

Pytorch 的 LSTM 模型的简单示例

1. 代码 完整的源代码&#xff1a; import torch from torch import nn# 定义一个LSTM模型 class LSTM(nn.Module):def __init__(self, input_size, hidden_size, num_layers, output_size):super(LSTM, self).__init__()self.hidden_size hidden_sizeself.num_layers num_…
最新文章