Dubbo AbstractRegistry源码阅读
前言
最近因为工作需要在学习Dubbo的各种机制。其中深入学习了一下AbstractRegistry的实现机制。在此根据Dubbo源码对其实现进行一个总结。
Registry是干啥的
首先看一下dubbo最简单的架构图。架构图中一共有五个元素,而Registry类就是对注册中心的抽象。AbstractRegistry
是对注册中心的一个抽象的实现。
可以看到它主要实现了RegistryService
和Node
接口。这两个接口分别定义了节点属性如Url地址,是否可用,以及注册中心服务的属性如注册,注销,订阅,通知等等。
当服务启动时,会调用注册中心的register
方法将自己的服务通过URL的方式发布到注册中心,而订阅其它服务时,会将订阅的服务通过URL发送给注册中心(URL中通常包含各种配置)。当服务需要优雅关闭时,会将自己从注册中心上解除注册。当服务出现变更时,会调用notify方法触发所有的监听器。
初始化
public AbstractRegistry(URL url) { //1. 设置配置中心的地址 setUrl(url); //2. 配置中心的URL中是否配置了同步保存文件属性,否则默认为false syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); //3. 配置信息本地缓存的文件名 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache"); //逐层创建文件目录 File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; //如果现有配置缓存,则从缓存文件中加载属性 loadProperties(); notify(url.getBackupUrls()); }
private void loadProperties() { //当本地存在配置缓存文件时 if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); //读取配置文件的内容,并加载为properties的键值对存储 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry store file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry store file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
注册与取消注册
这里采用ConcurrentHashSet<URL>
来记录注册的服务。逻辑相当的简单:
@Override public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); } @Override public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); }
订阅和取消订阅
订阅关系采用的存储是ConcurrentHashMap<URL, Set<NotifyListener>>
。分别存储订阅的URL和其对应的监听器列表。
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } Set<NotifyListener> listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } listeners.add(listener); } @Override public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } Set<NotifyListener> listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } }
服务的恢复
服务的恢复包括注册服务的恢复和订阅服务的恢复。因为内存中保留了注册的服务和订阅的服务。因此在恢复的时候会重新拉取这些数据,分别调用发布和订阅的方法来重新将其录入到注册中心上。
protected void recover() throws Exception { // register Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { register(url); } } // subscribe Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { subscribe(url, listener); } } } }
这里可能大家会有点困惑,因为在AbstractRegistry的实现中,订阅和发布就是单纯的将URL添加到相应的集合中去。这里的逻辑不就相当于再添加一次吗?其实在AbstractRegsitry的具体实现中,发布意味着还需要向注册中心真正的通过RPC建立联系。而不仅仅是将地址加入对应的集合中。
通知
notify
是指将一组URL推送给订阅了该URL的订阅端。在推送的时候,会将url根据cateogry分组,之后再分别推送不同的分组。
protected void notify(List<URL> urls) { if (urls == null || urls.isEmpty()) return; for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); } }
想要了解更多开发技术,面试教程以及互联网公司内推,欢迎关注我的微信公众号!将会不定期的发放福利哦~