温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

nacos server中PushService的原理和应用

发布时间:2021-06-29 14:01:07 来源:亿速云 阅读:194 作者:chen 栏目:大数据

这篇文章主要讲解了“nacos server中PushService的原理和应用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“nacos server中PushService的原理和应用”吧!

本文主要研究一下nacos server的PushService

PushService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Component public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {     @Autowired     private SwitchDomain switchDomain;     private ApplicationContext applicationContext;     private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);     private static final int MAX_RETRY_TIMES = 1;     private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap         = new ConcurrentHashMap<String, Receiver.AckEntry>();     private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap         = new ConcurrentHashMap<String, ConcurrentMap<String, PushClient>>();     private static volatile ConcurrentHashMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<String, Long>();     public static volatile ConcurrentHashMap<String, Long> pushCostMap = new ConcurrentHashMap<String, Long>();     private static int totalPush = 0;     private static int failedPush = 0;     private static ConcurrentHashMap<String, Long> lastPushMillisMap = new ConcurrentHashMap<>();     private static DatagramSocket udpSocket;     private static Map<String, Future> futureMap = new ConcurrentHashMap<>();     private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {         @Override         public Thread newThread(Runnable r) {             Thread t = new Thread(r);             t.setDaemon(true);             t.setName("com.alibaba.nacos.naming.push.retransmitter");             return t;         }     });     private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {         @Override         public Thread newThread(Runnable r) {             Thread t = new Thread(r);             t.setDaemon(true);             t.setName("com.alibaba.nacos.naming.push.udpSender");             return t;         }     });     static {         try {             udpSocket = new DatagramSocket();             Receiver receiver = new Receiver();             Thread inThread = new Thread(receiver);             inThread.setDaemon(true);             inThread.setName("com.alibaba.nacos.naming.push.receiver");             inThread.start();             executorService.scheduleWithFixedDelay(new Runnable() {                 @Override                 public void run() {                     try {                         removeClientIfZombie();                     } catch (Throwable e) {                         Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");                     }                 }             }, 0, 20, TimeUnit.SECONDS);         } catch (SocketException e) {             Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");         }     }     @Override     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {         this.applicationContext = applicationContext;     }     //......     public static void removeClientIfZombie() {         int size = 0;         for (Map.Entry<String, ConcurrentMap<String, PushClient>> entry : clientMap.entrySet()) {             ConcurrentMap<String, PushClient> clientConcurrentMap = entry.getValue();             for (Map.Entry<String, PushClient> entry1 : clientConcurrentMap.entrySet()) {                 PushClient client = entry1.getValue();                 if (client.zombie()) {                     clientConcurrentMap.remove(entry1.getKey());                 }             }             size += clientConcurrentMap.size();         }         if (Loggers.PUSH.isDebugEnabled()) {             Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size);         }     }     //...... }
  • PushService实现了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>接口;它有两个ScheduledExecutorService,一个用于retransmitter,一个用于udpSender;其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client

Receiver

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public static class Receiver implements Runnable {         @Override         public void run() {             while (true) {                 byte[] buffer = new byte[1024 * 64];                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);                 try {                     udpSocket.receive(packet);                     String json = new String(packet.getData(), 0, packet.getLength(), Charset.forName("UTF-8")).trim();                     AckPacket ackPacket = JSON.parseObject(json, AckPacket.class);                     InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();                     String ip = socketAddress.getAddress().getHostAddress();                     int port = socketAddress.getPort();                     if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {                         Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);                     }                     String ackKey = getACKKey(ip, port, ackPacket.lastRefTime);                     AckEntry ackEntry = ackMap.remove(ackKey);                     if (ackEntry == null) {                         throw new IllegalStateException("unable to find ackEntry for key: " + ackKey                             + ", ack json: " + json);                     }                     long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);                     Loggers.PUSH.info("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}",                         json, ip, port, pushCost, ackMap.size(), totalPush);                     pushCostMap.put(ackKey, pushCost);                     udpSendTimeMap.remove(ackKey);                 } catch (Throwable e) {                     Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);                 }             }         }         //......         public static class AckPacket {             public String type;             public long lastRefTime;             public String data;         }     }
  • Receiver实现了Runnable接口,其run方法使用while true循环来执行udpSocket.receive,之后解析AckPacket,从ackMap移除该ackKey,更新pushCostMap,同时从udpSendTimeMap移除该ackKey

PushClient

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public class PushClient {         private String namespaceId;         private String serviceName;         private String clusters;         private String agent;         private String tenant;         private String app;         private InetSocketAddress socketAddr;         private DataSource dataSource;         private Map<String, String[]> params;         public Map<String, String[]> getParams() {             return params;         }         public void setParams(Map<String, String[]> params) {             this.params = params;         }         public long lastRefTime = System.currentTimeMillis();         public PushClient(String namespaceId,                           String serviceName,                           String clusters,                           String agent,                           InetSocketAddress socketAddr,                           DataSource dataSource,                           String tenant,                           String app) {             this.namespaceId = namespaceId;             this.serviceName = serviceName;             this.clusters = clusters;             this.agent = agent;             this.socketAddr = socketAddr;             this.dataSource = dataSource;             this.tenant = tenant;             this.app = app;         }         public DataSource getDataSource() {             return dataSource;         }         public PushClient(InetSocketAddress socketAddr) {             this.socketAddr = socketAddr;         }         public boolean zombie() {             return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);         }         @Override         public String toString() {             return "serviceName: " + serviceName                 + ", clusters: " + clusters                 + ", ip: " + socketAddr.getAddress().getHostAddress()                 + ", port: " + socketAddr.getPort()                 + ", agent: " + agent;         }         public String getAgent() {             return agent;         }         public String getAddrStr() {             return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();         }         public String getIp() {             return socketAddr.getAddress().getHostAddress();         }         @Override         public int hashCode() {             return Objects.hash(serviceName, clusters, socketAddr);         }         @Override         public boolean equals(Object obj) {             if (!(obj instanceof PushClient)) {                 return false;             }             PushClient other = (PushClient) obj;             return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);         }         public String getClusters() {             return clusters;         }         public void setClusters(String clusters) {             this.clusters = clusters;         }         public String getNamespaceId() {             return namespaceId;         }         public void setNamespaceId(String namespaceId) {             this.namespaceId = namespaceId;         }         public String getServiceName() {             return serviceName;         }         public void setServiceName(String serviceName) {             this.serviceName = serviceName;         }         public String getTenant() {             return tenant;         }         public void setTenant(String tenant) {             this.tenant = tenant;         }         public String getApp() {             return app;         }         public void setApp(String app) {             this.app = app;         }         public InetSocketAddress getSocketAddr() {             return socketAddr;         }         public void refresh() {             lastRefTime = System.currentTimeMillis();         }     }
  • PushClient封装了要推送的目标服务地址等信息,它提供了zombie方法来判断目标服务是否zombie,它判断距离lastRefTime的时间差是否超过switchDomain指定的该serviceName的PushCacheMillis(默认为10秒),超过则判定为zombie

PushService.onApplicationEvent

@Component public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {	//......     @Override     public void onApplicationEvent(ServiceChangeEvent event) {         Service service = event.getService();         String serviceName = service.getName();         String namespaceId = service.getNamespaceId();         Future future = udpSender.schedule(new Runnable() {             @Override             public void run() {                 try {                     Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");                     ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));                     if (MapUtils.isEmpty(clients)) {                         return;                     }                     Map<String, Object> cache = new HashMap<>(16);                     long lastRefTime = System.nanoTime();                     for (PushClient client : clients.values()) {                         if (client.zombie()) {                             Loggers.PUSH.debug("client is zombie: " + client.toString());                             clients.remove(client.toString());                             Loggers.PUSH.debug("client is zombie: " + client.toString());                             continue;                         }                         Receiver.AckEntry ackEntry;                         Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());                         String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());                         byte[] compressData = null;                         Map<String, Object> data = null;                         if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {                             org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);                             compressData = (byte[]) (pair.getValue0());                             data = (Map<String, Object>) pair.getValue1();                             Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());                         }                         if (compressData != null) {                             ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);                         } else {                             ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);                             if (ackEntry != null) {                                 cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));                             }                         }                         Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",                             client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));                         udpPush(ackEntry);                     }                 } catch (Exception e) {                     Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);                 } finally {                     futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));                 }             }         }, 1000, TimeUnit.MILLISECONDS);         futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);     }     //......     public void serviceChanged(Service service) {         // merge some change events to reduce the push frequency:         if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {             return;         }         this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));     }     //......     }
  • onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent

PushService.udpPush

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Component public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {	//......     public static class Receiver implements Runnable {     	//......         public static class AckEntry {             public AckEntry(String key, DatagramPacket packet) {                 this.key = key;                 this.origin = packet;             }             public void increaseRetryTime() {                 retryTimes.incrementAndGet();             }             public int getRetryTimes() {                 return retryTimes.get();             }             public String key;             public DatagramPacket origin;             private AtomicInteger retryTimes = new AtomicInteger(0);             public Map<String, Object> data;         }     	//......     }     private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {         if (ackEntry == null) {             Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");             return null;         }         if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {             Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);             ackMap.remove(ackEntry.key);             udpSendTimeMap.remove(ackEntry.key);             failedPush += 1;             return ackEntry;         }         try {             if (!ackMap.containsKey(ackEntry.key)) {                 totalPush++;             }             ackMap.put(ackEntry.key, ackEntry);             udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());             Loggers.PUSH.info("send udp packet: " + ackEntry.key);             udpSocket.send(ackEntry.origin);             ackEntry.increaseRetryTime();             executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),                 TimeUnit.MILLISECONDS);             return ackEntry;         } catch (Exception e) {             Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",                 ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);             ackMap.remove(ackEntry.key);             udpSendTimeMap.remove(ackEntry.key);             failedPush += 1;             return null;         }     }	//...... }
  • udpPush方法会根据Receiver.AckEntry的信息进行判断,如果其重试次数大于MAX_RETRY_TIMES则终止push,将其从ackMap、udpSendTimeMap中移除;如果可以重试则将其ackEntry.key放入ackMap及udpSendTimeMap,然后执行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),并注册Retransmitter的延时任务;如果出现异常则将其从ackMap、udpSendTimeMap移除

Retransmitter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public static class Retransmitter implements Runnable {         Receiver.AckEntry ackEntry;         public Retransmitter(Receiver.AckEntry ackEntry) {             this.ackEntry = ackEntry;         }         @Override         public void run() {             if (ackMap.containsKey(ackEntry.key)) {                 Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);                 udpPush(ackEntry);             }         }     }
  • Retransmitter实现了Runnable方法,其run方法在ackMap包含ackEntry.key的条件下执行udpPush重试

小结

  • PushService实现了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>接口

  • 其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client

  • 其onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent

感谢各位的阅读,以上就是“nacos server中PushService的原理和应用”的内容了,经过本文的学习后,相信大家对nacos server中PushService的原理和应用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI