rocketMQ 的 msgId
producer 在发送消息的时候,会生成一个 "唯一" 的 msgId,broker 会为这个 msgId 创建哈希索引
UNIQ_KEY 由客户端生成
org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
msgId 由 前缀 + 内容 组成:
前缀
ip 地址,进程号,classLoader 的 hashcode
内容
时间差(当前时间减去当月一日),计数器
static { byte[] ip; try { // 获取 ip 地址,字节数组 ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 4 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 4 + 4); tempBuffer.position(0); // 写入 ip tempBuffer.put(ip); tempBuffer.position(ip.length); // 写入进程号 tempBuffer.putInt(UtilAll.getPid()); tempBuffer.position(ip.length + 4); // 写入 hashcode tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); // 二进制转十六进制字符串 FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); // 设置起始时间为当月1日 setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); }
byte 数组转十六进制字符串
// org.apache.rocketmq.common.UtilAll#bytes2string// final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();public static String bytes2string(byte[] src) { char[] hexChars = new char[src.length * 2]; for (int j = 0; j < src.length; j++) { // & 过之后,byte 转成 int int v = src[j] & 0xFF; // 无符号右移 4 位,高位补 0 ,即取字节的高 4 位 hexChars[j * 2] = HEX_ARRAY[v >>> 4]; // 取字节低 4 位 hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; } return new String(hexChars); }
设置开始时间
// org.apache.rocketmq.common.message.MessageClientIDSetter#setStartTime private synchronized static void setStartTime(long millis) { Calendar cal = Calendar.getInstance(); cal.setTimeInMillis(millis); cal.set(Calendar.DAY_OF_MONTH, 1); cal.set(Calendar.HOUR_OF_DAY, 0); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); // 当月1日为开始时间 startTime = cal.getTimeInMillis(); // 下月1日 cal.add(Calendar.MONTH, 1); nextStartTime = cal.getTimeInMillis(); }
// org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqIDBuffer private static byte[] createUniqIDBuffer() { ByteBuffer buffer = ByteBuffer.allocate(4 + 2); long current = System.currentTimeMillis(); if (current >= nextStartTime) { setStartTime(current); } buffer.position(0); // 当前时间减去当月一日 buffer.putInt((int) (System.currentTimeMillis() - startTime)); // 计数器 buffer.putShort((short) COUNTER.getAndIncrement()); return buffer.array(); }
public static String createUniqID() { // 1 个字节,8 位,每 4 位一个十六进制字符 StringBuilder sb = new StringBuilder(LEN * 2); // 前缀:ip,pid,hashcode sb.append(FIX_STRING); // 时间差 + 计数器 sb.append(UtilAll.bytes2string(createUniqIDBuffer())); return sb.toString(); }
相关推荐
LCFlxfldy 2020-08-17
IT农场 2020-11-13
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30