正文部分

英超超级联赛直播 要给Nacos的Udp通信功能点个赞

本文转载自微信公多号「程序新视界」,作者二师兄。转载本文请有关程序新视界公多号。

  中新经纬客户端8月19日电 19日,商务部新闻发言人高峰在例行新闻发布会上通报,1-7月,我对“一带一路”沿线国家非金融类直接投资112.9亿美元,同比增长9.9%,占同期总额的18%,较上年同期上升1个百分点。

  1-7月我国对“一带一路”沿线国家投资保持增长

  商务部:中美在经贸领域保持着正常沟通

  中新经纬客户端8月19日电 题:《杨德龙:正确理解共同富裕目标,把握未来投资方向》

  山西3人隐瞒中高风险区行程被查

学习不必那么功利,二师兄带你从更高维度轻盈浏览源码~

Nacos在服务注册功能中行使到了UDP的通信手段,主要功能就是用来辅助服务实例转折时对客户端进走报告。然而,对于大无数行使Nacos的程序员来说,能够还不清新这个功能,更别说变通行使了。

望完善个源码的实现,照样要为这一功能点个赞的,能够说专门奥妙和实用。但在实现上有一些不能,文末会进走指出。

本篇文章就带行家从源码层面来分析一下Nacos 2.0中是如何基于UDP制定来实现服务实例变更的报告。

UDP报告基本原理

在分析源码之前,先来从集体上望一下Nacos中UDP的实现原理。

Nacos UDP基本原理

吾们清新,UDP制定通信是双向的,异国所谓的客户端和服务端,因此在客户端和服务器端都会开启UDP的监听。客户端是单独开启一个线程来处理UDP新闻的。当采用HTTP制定与注册中央通信时,,在客户端调用服务订阅接口时,会将客户端的UPD新闻(IP和端口)上送到注册中央,注册中央以PushClient对象来进走封装和存储。

当注册中央有实例转折时,会发布一个ServiceChangeEvent事件,注册中央监听到这个事件之后,会遍历存储的PushClient,基于UDP制定对客户端进走报告。客户端授与到UDP报告,即可更新本地缓存的实例列外。

前线吾们已经清新,基于HTTP制定进走服务注册时,会有一个实例更新的时间差,由于是始末客户端准时拉取服务器中的实例列外。倘若拉取太反复,注册中央压力比较大,倘若拉取的周期比较长,实例的转折又没手段迅速感知到。而UDP制定的报告,正好弥补了这一弱点,因而说,要为基于UDP报告这个功能点个赞。

下面就来望望源码层面是如何实现的。

客户端UDP报告监听与处理

客户端在实例化NamingHttpClientProxy时英超超级联赛直播,在其组织手段中会初首化PushReceiver。

public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,         Properties properties, ServiceInfoHolder serviceInfoHolder) {     // ...     // 构建BeatReactor     this.beatReactor = new BeatReactor(this, properties);     // 构建UDP端口监听     this.pushReceiver = new PushReceiver(serviceInfoHolder);     // ... } 

PushReceiver的组织手段,如下:

public PushReceiver(ServiceInfoHolder serviceInfoHolder) {     try {         // 持有ServiceInfoHolder引用         this.serviceInfoHolder = serviceInfoHolder;         // 获取UDP端口         String udpPort = getPushReceiverUdpPort();         // 按照端口情况,构建DatagramSocket,倘若未竖立端口,则采用随机端口         if (StringUtils.isEmpty(udpPort)) {             this.udpSocket = new DatagramSocket();         } else {             this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));         }         // 创建只有一个线程的ScheduledExecutorService         this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 Thread thread = new Thread(r);                 thread.setDaemon(true);                 thread.setName("com.alibaba.nacos.naming.push.receiver");                 return thread;             }         });          // 实走线程,PushReceiver实现了Runnable接口         this.executorService.execute(this);     } catch (Exception e) {         NAMING_LOGGER.error("[NA] init udp socket failed", e);     } } 

PushReceiver的组织手段做了以下操作:

第一、持有ServiceInfoHolder对象引用; 第二、获取UDP端口; 第三、实例化DatagramSocket对象,用于发送和授与Socket数据; 第四,创建线程池,并实走PushReceiver(实现了Runnable接口);

既然PushReceiver实现了Runnable接口,run手段一定是必要重新实现的:

@Override public void run() {     while (!closed) {         try {                          // byte[] is initialized with 0 full filled by default             byte[] buffer = new byte[UDP_MSS];             // 创建DatagramPacket用于存储授与到的报文             DatagramPacket packet = new DatagramPacket(buffer, buffer.length);             // 授与报文,在未授与到报文时会进走线程壅塞             udpSocket.receive(packet);             // 将报文转换为json格式             String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();             NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());             // 将json格式的报文转换为PushPacket对象             PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);             String ack;             // 倘若相符条件,则调用ServiceInfoHolder进走授与报文处理,并返回答答报文             if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {                 serviceInfoHolder.processServiceInfo(pushPacket.data);                                  // send ack to server                 ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"                         + "\"\"}";             } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {                 // dump data to server                 ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"                         + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))                         + "\"}";             } else {                 // do nothing send ack only                 ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime                         + "\", \"data\":" + "\"\"}";             }             // 发送答答报文             udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,                     packet.getSocketAddress()));         } catch (Exception e) {             if (closed) {                 return;             }             NAMING_LOGGER.error("[NA] error while receiving push data", e);         }     } } 

PushReceiver#run手段主要处理了以下操作:

第一、构建DatagramPacket用于授与报文数据; 第二、始末DatagramSocket#receive手段壅塞期待报文的到来; 第三、DatagramSocket#receive授与到报文之后,手段不息实走; 第四、解析JSON格式的报文为PushPacket对象; 第五、判定报文类型,调用ServiceInfoHolder#processServiceInfo处理授与到的报文新闻,在该手段中会将PushPacket转化为ServiceInfo对象; 第六、封装ACK新闻(即答答报文新闻); 第七、始末DatagramSocket发送答答报文;

上面吾们望到了Nacos客户端是如何基于UDP进走报文的监听和处理的,但并未找到客户端是如何将UDP新闻上送给注册中央的。下面吾们就来梳理一下,上送UDP新闻的逻辑。

客户端上送UDP新闻

在NamingHttpClientProxy中存储了UDP_PORT_PARAM,即UDP的端口参数新闻。

UDP端口新闻始末实例查询类接口进走传递,比如:查询实例列外、查询单个健康实例、查询一切实例、订阅接口、订阅的更新义务UpdateTask等接口。在这些手段中都调用了NamingClientProxy#queryInstancesOfService手段。

NamingHttpClientProxy中的queryInstancesOfService手段实现:

@Override public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,         boolean healthyOnly) throws NacosException {     final Map<String, String> params = new HashMap<String, String>(8);     params.put(CommonParams.NAMESPACE_ID, namespaceId);     params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));     params.put(CLUSTERS_PARAM, clusters);     // 获取UDP端口     params.put(UDP_PORT_PARAM, String.valueOf(udpPort));     params.put(CLIENT_IP_PARAM, NetUtils.localIP());     params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly));     String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);     if (StringUtils.isNotEmpty(result)) {         return JacksonUtils.toObj(result, ServiceInfo.class);     }     return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters); } 

但查望源码会发现,查询实例列外、查询单个健康实例、查询一切实例、订阅的更新义务UpdateTask中,UDP端口传递的参数值均为0。只有HTTP制定的订阅接口取值为PushReceiver中的UDP端口号。

@Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {     return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false); } 

在上面的代码中吾们已经清新PushReceiver中有一个getPushReceiverUdpPort的手段:

public static String getPushReceiverUdpPort() {     return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT); } 

很清晰,UDP的端口是始末环境变量竖立的,对答的key为“push.receiver.udp.port”。

而在1.4.2版本中,HostReactor中的NamingProxy成员变量的queryList手段也会传递UDP端口:

public void updateService(String serviceName, String clusters) throws NacosException {     ServiceInfo oldService = getServiceInfo0(serviceName, clusters);     try {         String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);         if (StringUtils.isNotEmpty(result)) {             processServiceJson(result);         }     } finally {         // ...     } } 

关于1.4.2版本中的实现,行家自走望源码即可,这边不再睁开。

完善了客户端UDP基本新闻的传递,再来望望服务器端是如何授与和存储这些新闻的。

UDP服务存储

服务器端在获取实例列外的接口中,对UDP端口进走了处理。

@GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public Object list(HttpServletRequest request) throws Exception {     // ...     // 倘若异国获得UDP端口新闻,则默认端口为0     int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));     // ...     // 客户端的IP、UDP端口封装到Subscriber对象中     Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,             udpPort, clusters);     return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); } 

在getInstanceOperator()手段中会获得现在采用的哪个制定,然后选择对答的处理类:

/**  * 判定并返回采用V1版本或V2版本的操作服务  * @return V1:Jraft制定(服务器端);V2:gRpc制定(客户端)  */ private InstanceOperator getInstanceOperator() {     return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1; } 

这边详细的实现类为InstanceOperatorServiceImpl:

@Override public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,         boolean healthOnly) throws Exception {     ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());     String clientIP = subscriber.getIp();     ServiceInfo result = new ServiceInfo(serviceName, cluster);     Service service = serviceManager.getService(namespaceId, serviceName);     long cacheMillis = switchDomain.getDefaultCacheMillis();     // now try to enable the push     try {         // 处理声援UDP制定的客户端新闻         if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {             subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),                     new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,                     StringUtils.EMPTY);             cacheMillis = switchDomain.getPushCacheMillis(serviceName);         }     } catch (Exception e) {         // ...     }     // ... } 

当UDP端口大于0,且agent参数定义的客户端声援UDP,则将对答的客户端新闻封装到InetSocketAddress对象中,然后放入NamingSubscriberServiceV1Impl中(该类已经被废舍,望后续如何调整该手段实现)。

在NamingSubscriberServiceV1Impl中,会将对答的参数封装为PushClient,存放在Map当中。

public void addClient(String namespaceId, String serviceName, String clusters, String agent,         InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {          PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,             app);     addClient(client); } 

addClient手段会将PushClient新闻存放到ConcurrentMap

private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();  public void addClient(PushClient client) {         // client is stored by key 'serviceName' because notify event is driven by serviceName change         String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());         ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);         if (clients == null) {             clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));             clients = clientMap.get(serviceKey);         }                  PushClient oldClient = clients.get(client.toString());         if (oldClient != null) {             oldClient.refresh();         } else {             PushClient res = clients.putIfAbsent(client.toString(), client);            // ...         }     } 

此时,UDP的IP、端口新闻已经封装到PushClient当中,并存储在NamingSubscriberServiceV1Impl的成员变量当中。

注册中央的UDP报告

当服务端发现某个实例发生了转折,比如主动刊出了,会发布一个ServiceChangeEvent事件,UdpPushService会监听到该事件,并进走营业处理。

在UdpPushService的onApplicationEvent手段中,会按照PushClient的详细情况进走移除或发送UDP报告。onApplicationEvent中中央逻辑代码如下:

ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()         .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) {     return; }  Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); for (PushClient client : clients.values()) {     // 移除僵尸客户端     if (client.zombie()) {         Loggers.PUSH.debug("client is zombie: " + client);         clients.remove(client.toString());         Loggers.PUSH.debug("client is zombie: " + client);         continue;     }          AckEntry ackEntry;     String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());     byte[] compressData = null;     Map<String, Object> data = null;     if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {         org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);         compressData = (byte[]) (pair.getValue0());         data = (Map<String, Object>) pair.getValue1();     }          // 封装AckEntry对象     if (compressData != null) {         ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);     } else {         ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);         if (ackEntry != null) {             cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));         }     }     // 始末UDP报告其他客户端     udpPush(ackEntry); } 

事件处理的中央逻辑是就是先判定PushClient的状态新闻,倘若已经是僵尸客户端,则移除。然后将发送UDP的报文新闻和授与客户端的新闻封装为AckEntry对象,然后调用udpPush手段,进走UDP新闻的发送。

注册中央的UDP授与

在望客户端源码的时候,吾们望到客户端不光会授与UDP乞求,而且还会进走答答。那么注册中央怎么授与答答呢?也在UdpPushService类中,该类内部的静态代码块初首化一个UDP的DatagramSocket,用来授与新闻:

static {     try {         udpSocket = new DatagramSocket();         Receiver receiver = new Receiver();         Thread inThread = new Thread(receiver);         inThread.setDaemon(true);         inThread.setName("com.alibaba.nacos.naming.push.receiver");         inThread.start();     } catch (SocketException e) {         Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");     } } 

Receiver是一个内部类,实现了Runnable接口,在其run手段中主要就是授与报文新闻,然后进走报文新闻的判定,按照判定效果,操作本地Map中数据。

UDP设计不能

文章最最先就写到,UDP的设计专门棒,即弥补了HTTP准时拉取的不能,又不至于太影响性能。但现在Nacos在UDP方面有一些不能,也能够是幼我的吹毛求疵吧。

第一,文档中异国清晰表明UDP的功能如何行使,这导致许多行使者在行使时并不清新UDP功能的存在,以及行使的限定条件。

第二,对云服务不友益。客户端的UDP端口能够自定义,但服务器端的UDP端口是随机获取到。在云服务中,即便是内网服务,UDP端口也是被防火墙限定的。倘若服务端的UDP端口是随机获取(客户端默认也是),那么UDP的通信将直接被防火墙阻截失踪,而用户根本望不到任何变态(UDP制定不关注客户端是否收到新闻)。

至于这两点,说首来算是瑕不掩瑜,读完源码或读过吾这篇文章的友人也许已经清新怎么用了。后续能够给官方挑一个Issue,望望是否能够改进。

幼结

本文重点从三个方面讲解的Nacos基于UDP的服务实例变更报告:

第一,客户端监听UDP端口,当授与注册中央发来的服务实例转折,能够及时的更新本地的实例缓存;

第二,客户端始末订阅接口,将自己的UDP新闻发送给注册中央,注册中央进走存储;

第三,注册中央中实例发生了转折,始末事件机制,将变更新闻始末UDP制定发送给客户端。

经过本篇文章,想必你不光晓畅了Nacos中UDP制定的报告机制。同时,也开拓了一个新的思路,即如何行使UDP,在什么场景下行使UDP,以及在云服务中行使UDP能够会存在的题目。倘若这篇文章对你有协助,关注或点赞都能够。

【编辑选举】英超超级联赛直播

鸿蒙官方战略配相符共建——HarmonyOS技术社区 Linux 编辑器之神 vim 的 IO 存储原理 Longhorn 企业级云原生容器存储解决方案-安放篇 携程持久化KV存储挑衅Redis,狂省90%成本…… 边缘计算存储是物联网的下一个前沿 Web3.0时代区块链分布式存储的作用和影响

Powered by 英超超级联赛直播 @2018 RSS地图 HTML地图