一个线程安全的 lrucache 实现 --- 读 leveldb 源码

缓存是计算机的每一个层次中都是一个非常重要的概念,缓存的存在可以大大提高软件的运行速度。Least Recently Used(lru) cache 即最近最久未使用的缓存,多见与页面置换算法,lru 缓存算法在缓存的大小达到最大值之后,换出最早未被使用的缓存。

在阅读 leveldb 的源代码的时候,发现其中的 cache 类正是一个线程安全的 lru-cache 实现,代码非常优雅。笔者读完之后受益良多,希望借助这篇博客,一来可以加深自己的理解,而来可以与大家共享这些优秀代码(PS:因为这个 cache 类主要是为了支持 leveldb 的实现,所以其实现的接口可能与其他常见缓存有所差别)。

leveldb 中的 cache 提供的并不是一个类似于 std::map 的模板类,其有一下几个特点:

  1. 键值必须为 string-like (代码中为 Slice 类).
  2. 存储的内容为 void *.所以支持存储任何类型的数据,因为存储的是指针,所以 cache 类负责管理其所有 entries 的生命周期以及资源回收。在插入的时候必须明确传入 deleter.
  3. 对 entries 的查询需要先获取 Handle(像pthread_t一样的一个对客户端透明的结构,如果客户端不再使用,必4. 须要显示的调用Cache::Release(handle).不直接提供访问 entries 的接口,防止了客户端修改,但引入了一次额外的函数调用(Cache::Value(hanle)).
  4. 使用两个双向链表来分别存储被客户端使用的 entries 和未被使用的 entries.
  5. NewId() 接口可以生成一个唯一的 id,多线程环境下可以使用这个 id 与自己的键值拼接起来,防止不同线程之间互相覆写.
  6. 使用很直观的 mutex 类提供线程安全性.
不止巧妙,而且代码的可读性非常之高的,下面让我们来看一下主要的代码(完整代码可以参见 leveldb 源代码或者本人的 github)。
class Cache {
 public:
  Cache() { }

  // 需要调用插入时的 deleter 销毁所有 entries
  virtual ~Cache();

  // 每个 entry 对应的 handle,客户端只能通过调用 this->Value(handle) 来获取缓存的内容,不可以直接访问缓存
  struct Handle { };

  // 插入一个 key->value 的映射,支持通过 charge 为不同的缓存设定不同的权重。
  // 返回一个 entry 对应的 handle 指针到客户端,客户端不需要使用后必须显示的调用 this->Release(handle),
  // 当缓存要被换出的时候,会调用传入的 deleter
  virtual Handle *Insert(const Slice &key, void *value, size_t charge,
                         void (*deleter)(const Slice &key, void *value)) = 0;

  // 查询接口,如果缓存不存在,返回 NULL 指针。否则返回 entry 对应的 handle 指针,同样必须调用 this->Release(handle)
  virtual Handle *Lookup(const Slice &key) = 0;

  // 释放由 Insert 或者 Lokkup 返回的 handle。Release a mapping returned by a previous Lookup().
  // NOTE: 1.仅能在一个 handle 上调用一次 this->Release(handle)
  // NOTE: 2.handle 在调用该方法后不能再使用(handle 指针会被 free 掉)
  virtual void Release(Handle *handle) = 0;

  // 获取缓存内容
  virtual void *Value(Handle *hanle) = 0;

  // 见上面第6点
  virtual uint64_t NewId() = 0;

  // 清楚所有的未使用缓存
  virtual void Prune() { }

  // 查看缓存使用总量
  virtual size_t TotalCharge() const = 0;

 private:
  // No copying allowed
  Cache(const Cache &);
  void operator=(const Cache &);
};
因为 leveldb 中支持对定制自己的缓存类,所以其头文件中的 Cache 是一个虚拟基类,更像是一个接口定义,《effective c++》中称之为 Interface class。每个接口的含义我都已经加上了注释,为了保证除了 Cache 类,没有人可以操作缓存数据(这是非常重要的一点,因为 Cache 中存储的都是 void *,Cache 类必须要确保它自己唯一管理这些缓存对象生命周期),这里使用了 Handle(句柄)。虽然在查询的时候额外引入了一层间接层,但是可以充分保证数据安全。

下面让我们看一下实现代码:

struct LRUHandle { // 这便是返回的 Handle,在返回的时候会 return reinterpret_cast<Cache::Handle*>(handle)
   void *value; 
   void (*deleter)(const Slice &, void *);
   LRUHandle *next_hash; // 给自己实现的 hahs_table 使用;
   LRUHandle *next; // 双向链表
   LRUHandle *prev; // 双向链表
   size_t charge; // 权重
   size_t key_length; // key 长度
   bool in_cache; // 是否被缓存(我们的类可以通过将 capacity 设置为0来关闭缓存)
   uint32_t refs; // 引用计数
   uint32_t hash; // 我们要将 hash 值缓存起来,防止需要重复 hash 计算
   char key_data[1]; // 柔性数组,存储键

   Slice key() const {
     assert(next != this); // Hold true iff this is the head of an empty list
     return Slice(key_data, key_length);
   }
 };
代码中 Slice 是 google 项目常见的一个工具类,其实现非常简单,可以认为它仅仅是对一个字符串的引用(因为它自己不管理内存,咱们在这里可以简单的把它当做 std::string 也是可以的,它可以使用的地方都可以使用 std::string(反过来不成立))。

可以看到我们这个 LRUHandle 类中的数据还是不少的,在后面的实现代码中我们能看到它们的用武之地。下面要看的代码是 levedb 自己实现的一个很短小精悍的 hash_table,其使用开链法解决冲突,代码真的很优雅,很值得一读:

class HandleTable {
 public:
   HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
   ~HandleTable() { delete[] list_; }

   LRUHandle *Lookup(const Slice &key, uint32_t hash) {
     return *FindPointer(key, hash);
   }

   // Insert into the last position
   LRUHandle *Insert(LRUHandle *h) {
     LRUHandle **ptr = FindPointer(h->key(), h->hash);
     LRUHandle *old = *ptr;
     h->next_hash = (old == NULL ? NULL : old->next_hash);
     *ptr = h;
     if (old == NULL) {
       ++elems_;
       if (elems_ > length_) {
         Resize();
       }
     }
     return old;
   }

   LRUHandle *Remove(const Slice &key, uint32_t hash) {
     LRUHandle **ptr = FindPointer(key, hash);
     LRUHandle *result = *ptr;
     if (result != NULL) {
       *ptr = result->next_hash;
       --elems_;
     }
     return result;
   }
 private:
   uint32_t length_; // hash table 的 bucket 数
   uint32_t elems_; // hash table 中 entry 数
   LRUHandle **list_; // hash bucket 数组

   LRUHandle **FindPointer(const Slice &key, uint32_t hash) { 
     assert(list_ != NULL);
     LRUHandle **ptr = &list_[hash & (length_ - 1)]; // 找到对应键的桶,取第一个 entry
     while (*ptr != NULL &&
            ((*ptr)->hash != hash || (*ptr)->key() != key)) {
       ptr = &(*ptr)->next_hash; // 注意,我们返回的不是一个 LRUHandle *,而是一个 LRUHandle **(&(LRUHandle::next_hash))
     }
     return ptr;
   }

   void Resize() {
     uint32_t new_length = 4; // length_ 一定是 2 的幂次,这样可以使用 hash & (length_ - 1) 来算出 hash % length_
     while (new_length < elems_) new_length <<= 1; // 算出新 length,因为 length > elems_; 所以装载因子<1
     LRUHandle **new_list = new LRUHandle *[new_length]; 
     memset(new_list, 0, sizeof(new_list[0]) * new_length);
     uint32_t count = 0;
     for (uint32_t i = 0; i < length_; i++) { // 遍历所有的桶,为桶内元素搬家
       LRUHandle *h = list_[i]; // 每个桶的第一个 entry
       while (h != NULL) {
         LRUHandle *next = h->next_hash; // 先保存起来 next
         uint32_t hash = h->hash;
         LRUHandle **ptr = &new_list[hash & (new_length-1)];  // 计算在新表中的桶下标,取桶内第一个 entry
         h->next_hash = *ptr; // 头插法
         *ptr = h;
         h = next;
         ++count;
       }
     }
     assert(count == elems_);
     delete[] list_; // 更新 list_ 和 length_
     list_ = new_list;
     length_ = new_length;
   }
};
上面的代码真的写的太好了,笔者都有怎么读都读不够的感觉啊,谷歌工程师,真的太强了。简要的说下最重要的两个接口:Resize() 和 FindPointer()。

先说 Resize()。在 hash_table 中扩容的时候,往往有两个常见的选择,一是 bucket 的数量选用一个素数,这样可以保证 hash mod 之后的结果最均匀,还有一种是使用2的幂次结果作为 bucket 数,后者是通过优化 hash % #bucket 这个运算来优化(因为 #bucket = 2^n,hash % #bucket 等价于 hash & (length - 1))。在 leveldb 中选用的是后者来优化。

然后是 FindPointer(),在读这份源码之前,我对链表内元素的插入和删除之后使用两个指针,一前一后,来操作链表,但是作者使用一个 LRUHandle **指针,一个指针就解决了对链表元素的插入和删除,真的太巧妙了。我觉得这个代码都值得单独拿出来讲,所以这里并不准备全部展开,但是推荐大家好好品读,我也会学习这个写法,自己写一个 list 实现,到时候也会发到博客,和大家一起交流。

有了 hash_table,我们可以开始实现了 LRUCache 了:

class LRUCache {
 public:
  LRUCache();
  ~LRUCache();

  void SetCapacity(size_t cap) { capacity_ = cap; }

  Cache::Handle *Insert(const Slice &key, uint32_t hash,
                        void *value, size_t charge,
                        void (*deleter)(const Slice &, void *));

  Cache::Handle *Lookup(const Slice &key, uint32_t hash);
  void Release(Cache::Handle *handle);
  void Erase(const Slice &key, uint32_t hash);
  void Prune();
  size_t TotalCharge() const {
    MutexLock l(&mutex_);
    return usage_;
  }

 private:
  void LRU_Remove(LRUHandle *e);
  void LRU_Append(LRUHandle *list, LRUHandle *e);
  void Ref(LRUHandle *e);
  void Unref(LRUHandle *e);
  bool FinishErase(LRUHandle *e);
  // Initialized before use.
  size_t capacity_;

  // Capacity usage guarded by mutex
  mutable Mutex mutex_;
  size_t usage_;

  // 所有未被使用的 entries 都被存储于 lru_ 这个双向链表中,这里使用 dummy head 的方法实现双向链表
  // lru_.prev 是最新的 entry,lru_.next 是最早的 entry(也就是说换出缓存的时候就从 lru_.next 开刀)
  // lru_ 中的所有 entries 都有 refs == 1 && in_cache == true (仅被 LRUCache 类引用,而且被缓存)
  LRUHandle lru_;

  // 所有现在被使用(客户端持有至少一个尚未 Release() 的 handle)的 entries 存储在 in_use_ 双向链表中
  // in_use_ 中 entries 有 refs >= 2 && in_cache = true
  LRUHandle in_use_;

  HandleTable table_; // 用于进行 key->handle 的 mapping
};

LRUCache::LRUCache() : usage_(0) { // 初始化两个双向链表的 dummy head
  lru_.next = &lru_; 
  lru_.prev = &lru_;
  in_use_.next = &in_use_;
  in_use_.prev = &in_use_;
}

LRUCache::~LRUCache() {
  assert(in_use_.next == &in_use_); // 如果在 LRUCache 在析构的时候,还有客户端持有尚未 Release() 的 handle,那么就会出问题
  LRUHandle *e = lru_.next;
  while (e != &lru_) { // 释放所有 entries
    LRUHandle *next = e->next;
    assert(e->in_cache); 
    e->in_cache = false;
    assert(e->refs == 1);
    Unref(e);
    e = next;
  }
}

void LRUCache::Ref(LRUHandle *e) { // 增加引用计数,必要的话将其从 lru_. 挪到 in_use_
  if (e->refs == 1 && e->in_cache) { // ++ 后 refs 大于1,代表有客户端使用
    LRU_Remove(e); // 从 lru_ 中删除
    LRU_Append(&in_use_, e); // 插入 in_use_ 中
  }
  e->refs++; // 增加引用计数
}

void LRUCache::Unref(LRUHandle *e) {
  assert(e->refs > 0); // Unref() 调用前,这个 handle 应该是被引用的
  if (--e->refs == 0) { // 无人引用(包括 LRUCache 类),需要调用其构造时传入的删除器,管理资源
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e); // 注意所有的 handle 都是 heap-allocated,必须要free
  } else if (e->in_cache && e->refs == 1) { // refs==1 代表仅有 LRUCache 类引用
    LRU_Remove(e);  // 从 in_use_ 中删除
    LRU_Append(&lru_, e); // 插入 lru_ 中
  }
}
// 将元素从双向链表中删除(删除并不需要知道其所属的 list)
void LRUCache::LRU_Remove(LRUHandle *e) {
  e->prev->next = e->next; 
  e->next->prev = e->prev;
}
// 将元素插入到双向链表 head.prev
void LRUCache::LRU_Append(LRUHandle *list, LRUHandle *e) {
  list->prev->next = e;
  e->prev = list->prev;
  e->next = list;
  list->prev = e;
}

Cache::Handle *LRUCache::Lookup(const Slice &key, uint32_t hash) {
  MutexLock l(&mutex_);
  LRUHandle *e = table_.Lookup(key, hash); // 在 hash_table 中查找
  if (e != NULL) Ref(e); // 如果有被缓存起来,增加引用计数等
  return reinterpret_cast<Cache::Handle*>(e); // 将其 cast 为一个 Handle * 返回,还记得 Cache 类中的接口吧?
}

void LRUCache::Release(Cache::Handle *e) {
  MutexLock l(&mutex_);
  Unref(reinterpret_cast<LRUHandle*>(e));
}

Cache::Handle *LRUCache::Insert(
    const Slice &key, uint32_t hash, void *value, size_t charge,
    void (*deleter)(const Slice &, void *)) {
  MutexLock l(&mutex_);

  LRUHandle *e = reinterpret_cast<LRUHandle*>(
      malloc(sizeof(LRUHandle) + (key.size() - 1))); // 这里注意,因为 LRUHandle 中保存了 key_length,所以这里在申请内存的时候,并没有为 '\0'申请内存,如果在 LRUHandle::key_data 上调用 strlen() 会有想不到的后果
  e->value = value; 
  e->charge = charge;
  e->deleter = deleter;
  e->key_length = key.size();
  e->in_cache = false;
  e->refs = 1; // 这里将引用计数设为1是代表被 this->Insert() 的调用者引用,并不是代表 LRUCache 引用,所以如果不保存 this->Insert() 的返回结果会有很大的问题
  memcpy(e->key_data, key.data(), key.size());

  if (capacity_ > 0) { // 需要缓存起来
    e->refs++; // LRUCache 类也要引用这个 handle 了
    e->in_cache = true;
    LRU_Append(&in_use_, e); // 因为这时候 refs == 2,需要放到 in_use_ 中
    usage_ += charge;
    FinishErase(table_.Insert(e)); // 如果相同键插入两次,会覆盖之前的缓存,并且需要将之前的缓存删除
  } else { // 如果将 capacity 设置为0,代表的是关闭缓存
    // 如果 handle->next == handle,会无法通过 LRUHandle::key() 中的断言
    e->next = NULL;
  }
  // 如果本次插入导致缓存超出上限,需要换出旧缓存
  while (usage_ > capacity_) {
    LRUHandle *oldest = lru_.next; // 还记得 lru_.next 存的是最早未使用的缓存吧
    assert(oldest->refs == 1);
    bool erased = FinishErase(table_.Remove(oldest->key(), oldest->hash)); // 删除调用缓存
    if (!erased) { // 如果 erased 是 false,代表 lru_ 中保存有没有被缓存起来的 entry,这是一个严重的问题
      assert(erased); // 一条一定会失败的断言
    }
  }

  return reinterpret_cast<Cache::Handle*>(e);
}
// 该接口仅应该在用与 FinishErase(handle_table_.Remove()) 一起使用,因为它做的是删除缓存后的后续工作
bool LRUCache::FinishErase(LRUHandle *e) {
  if (e != NULL) {
    assert(e->in_cache);
    LRU_Remove(e); // 从 lru_ 中删除
    e->in_cache = false; // 设置状态
    usage_ -= e->charge;
    Unref(e); // 一定会进入调用删除器的分支
  }
  return e != NULL;
}

void LRUCache::Erase(const Slice &key, uint32_t hash) {
  MutexLock l(&mutex_);
  FinishErase(table_.Remove(key, hash));
}

void LRUCache::Prune() {
  MutexLock l(&mutex_);
  while (lru_.next != &lru_) { // 遍历 lru_ 中保存的 entries,将其全部删除掉
    LRUHandle *e = lru_.next;
    assert(e->refs == 1);
    bool erased = FinishErase(table_.Remove(e->key(), e->hash));
    if (!erased) {
      assert(erased);
    }
  }
}
代码的解释都在注释中,这里我们从设计角度看看,这个 LRUCache 是如何实现的。首先,数据结构上,使用的是双向链表+hash_table,LRU 中要做到可以迅速找到最早未使用的缓存,所以使用了两个双向链表,分别用于保存“在使用(in_use_)”和“未使用(lru_)”的缓存,这样,lru_.next 便指向了最早未使用缓存。而 hash_table 则用于对 key->value 进行检索。而对于线程安全的实现,则是选用直观的 mutex,mutex 不代表慢,大家应该要记住,慢的不是锁,而是锁的竞争。

至此一个线程安全的 LRUCache 类就已经全部实现,在 leveldb 中使用的缓存是 ShardedLRUCache,其实就是聚合多个 LRUCache 实例,真正的逻辑都在 LRUCache 类中。

鉴于自身水平有限,文章中有难免有些错误,欢迎大家指出。也希望大家可以积极留言,与笔者一起讨论编程的那些事。

相关推荐