rocketMQ 的 msgId

producer 在发送消息的时候,会生成一个 "唯一" 的 msgId,broker 会为这个 msgId 创建哈希索引

UNIQ_KEY 由客户端生成

org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID

msgId 由 前缀 + 内容 组成:
前缀
ip 地址,进程号,classLoader 的 hashcode
内容
时间差(当前时间减去当月一日),计数器

static {
    byte[] ip;
    try {
        // 获取 ip 地址,字节数组
        ip = UtilAll.getIP();
    } catch (Exception e) {
        ip = createFakeIP();
    }
    LEN = ip.length + 4 + 4 + 4 + 2;
    ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 4 + 4);
    tempBuffer.position(0);
    // 写入 ip
    tempBuffer.put(ip);
    tempBuffer.position(ip.length);
    // 写入进程号
    tempBuffer.putInt(UtilAll.getPid());
    tempBuffer.position(ip.length + 4);
    // 写入 hashcode
    tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
    // 二进制转十六进制字符串
    FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
    // 设置起始时间为当月1日
    setStartTime(System.currentTimeMillis());
    COUNTER = new AtomicInteger(0);
}

byte 数组转十六进制字符串

// org.apache.rocketmq.common.UtilAll#bytes2string// final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();public static String bytes2string(byte[] src) {
    char[] hexChars = new char[src.length * 2];
    for (int j = 0; j < src.length; j++) {
        // & 过之后,byte 转成 int
        int v = src[j] & 0xFF;
        // 无符号右移 4 位,高位补 0 ,即取字节的高 4 位
        hexChars[j * 2] = HEX_ARRAY[v >>> 4];
        // 取字节低 4 位
        hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
    }
    return new String(hexChars);
}

设置开始时间

// org.apache.rocketmq.common.message.MessageClientIDSetter#setStartTime
private synchronized static void setStartTime(long millis) {
    Calendar cal = Calendar.getInstance();
    cal.setTimeInMillis(millis);
    cal.set(Calendar.DAY_OF_MONTH, 1);
    cal.set(Calendar.HOUR_OF_DAY, 0);
    cal.set(Calendar.MINUTE, 0);
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    // 当月1日为开始时间
    startTime = cal.getTimeInMillis();
    // 下月1日
    cal.add(Calendar.MONTH, 1);
    nextStartTime = cal.getTimeInMillis();
}
// org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqIDBuffer
private static byte[] createUniqIDBuffer() {
    ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
    long current = System.currentTimeMillis();
    if (current >= nextStartTime) {
        setStartTime(current);
    }
    buffer.position(0);
    // 当前时间减去当月一日
    buffer.putInt((int) (System.currentTimeMillis() - startTime));
    // 计数器
    buffer.putShort((short) COUNTER.getAndIncrement());
    return buffer.array();
}
public static String createUniqID() {
    // 1 个字节,8 位,每 4 位一个十六进制字符
    StringBuilder sb = new StringBuilder(LEN * 2);
    // 前缀:ip,pid,hashcode
    sb.append(FIX_STRING);
    // 时间差 + 计数器
    sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
    return sb.toString();
}

相关推荐