聊聊rocketmq的KVConfigManager
序
本文主要研究一下rocketmq的KVConfigManager
KVConfigManager
org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
public class KVConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvController namesrvController; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>(); public KVConfigManager(NamesrvController namesrvController) { this.namesrvController = namesrvController; } public void load() { String content = null; try { content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); } catch (IOException e) { log.warn("Load KV config table exception", e); } if (content != null) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); } } } public void putKVConfig(final String namespace, final String key, final String value) { try { this.lock.writeLock().lockInterruptibly(); try { HashMap<String, String> kvTable = this.configTable.get(namespace); if (null == kvTable) { kvTable = new HashMap<String, String>(); this.configTable.put(namespace, kvTable); log.info("putKVConfig create new Namespace {}", namespace); } final String prev = kvTable.put(key, value); if (null != prev) { log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } else { log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { this.lock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("putKVConfig InterruptedException", e); } this.persist(); } public void persist() { try { this.lock.readLock().lockInterruptibly(); try { KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper(); kvConfigSerializeWrapper.setConfigTable(this.configTable); String content = kvConfigSerializeWrapper.toJson(); if (null != content) { MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath()); } } catch (IOException e) { log.error("persist kvconfig Exception, " + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e); } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("persist InterruptedException", e); } } public void deleteKVConfig(final String namespace, final String key) { try { this.lock.writeLock().lockInterruptibly(); try { HashMap<String, String> kvTable = this.configTable.get(namespace); if (null != kvTable) { String value = kvTable.remove(key); log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { this.lock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("deleteKVConfig InterruptedException", e); } this.persist(); } public byte[] getKVListByNamespace(final String namespace) { try { this.lock.readLock().lockInterruptibly(); try { HashMap<String, String> kvTable = this.configTable.get(namespace); if (null != kvTable) { KVTable table = new KVTable(); table.setTable(kvTable); return table.encode(); } } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getKVListByNamespace InterruptedException", e); } return null; } public String getKVConfig(final String namespace, final String key) { try { this.lock.readLock().lockInterruptibly(); try { HashMap<String, String> kvTable = this.configTable.get(namespace); if (null != kvTable) { return kvTable.get(key); } } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getKVConfig InterruptedException", e); } return null; } public void printAllPeriodically() { try { this.lock.readLock().lockInterruptibly(); try { log.info("--------------------------------------------------------"); { log.info("configTable SIZE: {}", this.configTable.size()); Iterator<Entry<String, HashMap<String, String>>> it = this.configTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<String, String>> next = it.next(); Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator(); while (itSub.hasNext()) { Entry<String, String> nextSub = itSub.next(); log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(), nextSub.getValue()); } } } } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("printAllPeriodically InterruptedException", e); } } }
- 这里使用HashMap,然后通过ReentrantReadWriteLock进行并发控制,map的key是namespace,而value是一个HashMap
- putKVConfig及deleteKVConfig使用的是写锁
- persist、getKVListByNamespace、getKVConfig、printAllPeriodically使用的是读锁
MixAll.string2File
org/apache/rocketmq/common/MixAll.java
public static void string2File(final String str, final String fileName) throws IOException { String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } File file = new File(fileName); file.delete(); file = new File(tmpFile); file.renameTo(new File(fileName)); } public static void string2FileNotSafe(final String str, final String fileName) throws IOException { File file = new File(fileName); File fileParent = file.getParentFile(); if (fileParent != null) { fileParent.mkdirs(); } FileWriter fileWriter = null; try { fileWriter = new FileWriter(file); fileWriter.write(str); } catch (IOException e) { throw e; } finally { if (fileWriter != null) { fileWriter.close(); } } }
- 将文本内容写到指定路径的文件
- 这里先写到.tmp文件,然后备份上一个版本的内容,在删除上一个版本的文件,最后将tmp文件重命名为正式的文件名
RemotingSerializable
org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
public abstract class RemotingSerializable { private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8"); public static byte[] encode(final Object obj) { final String json = toJson(obj, false); if (json != null) { return json.getBytes(CHARSET_UTF8); } return null; } public static String toJson(final Object obj, boolean prettyFormat) { return JSON.toJSONString(obj, prettyFormat); } public static <T> T decode(final byte[] data, Class<T> classOfT) { final String json = new String(data, CHARSET_UTF8); return fromJson(json, classOfT); } public static <T> T fromJson(String json, Class<T> classOfT) { return JSON.parseObject(json, classOfT); } public byte[] encode() { final String json = this.toJson(); if (json != null) { return json.getBytes(CHARSET_UTF8); } return null; } public String toJson() { return toJson(false); } public String toJson(final boolean prettyFormat) { return toJson(this, prettyFormat); } }
- 这里toJson使用的是fastjson的方法
小结
- rocketmq的KVConfigManager采用的是HashMap来存配置项,key为namespace,value为HashMap,存储的值采用的是String
- 采用ReentrantReadWriteLock进行并发控制,支持序列JSON到磁盘,也支持从磁盘文件加载到内存
doc
相关推荐
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26
梦的天空 2020-08-25