一个线程安全的 lrucache 实现 --- 读 leveldb 源码
在阅读 leveldb 的源代码的时候,发现其中的 cache 类正是一个线程安全的 lru-cache 实现,代码非常优雅。笔者读完之后受益良多,希望借助这篇博客,一来可以加深自己的理解,而来可以与大家共享这些优秀代码(PS:因为这个 cache 类主要是为了支持 leveldb 的实现,所以其实现的接口可能与其他常见缓存有所差别)。
leveldb 中的 cache 提供的并不是一个类似于 std::map 的模板类,其有一下几个特点:
- 键值必须为 string-like (代码中为 Slice 类).
- 存储的内容为 void *.所以支持存储任何类型的数据,因为存储的是指针,所以 cache 类负责管理其所有 entries 的生命周期以及资源回收。在插入的时候必须明确传入 deleter.
- 对 entries 的查询需要先获取 Handle(像pthread_t一样的一个对客户端透明的结构,如果客户端不再使用,必4. 须要显示的调用Cache::Release(handle).不直接提供访问 entries 的接口,防止了客户端修改,但引入了一次额外的函数调用(Cache::Value(hanle)).
- 使用两个双向链表来分别存储被客户端使用的 entries 和未被使用的 entries.
- NewId() 接口可以生成一个唯一的 id,多线程环境下可以使用这个 id 与自己的键值拼接起来,防止不同线程之间互相覆写.
- 使用很直观的 mutex 类提供线程安全性.
class Cache { public: Cache() { } // 需要调用插入时的 deleter 销毁所有 entries virtual ~Cache(); // 每个 entry 对应的 handle,客户端只能通过调用 this->Value(handle) 来获取缓存的内容,不可以直接访问缓存 struct Handle { }; // 插入一个 key->value 的映射,支持通过 charge 为不同的缓存设定不同的权重。 // 返回一个 entry 对应的 handle 指针到客户端,客户端不需要使用后必须显示的调用 this->Release(handle), // 当缓存要被换出的时候,会调用传入的 deleter virtual Handle *Insert(const Slice &key, void *value, size_t charge, void (*deleter)(const Slice &key, void *value)) = 0; // 查询接口,如果缓存不存在,返回 NULL 指针。否则返回 entry 对应的 handle 指针,同样必须调用 this->Release(handle) virtual Handle *Lookup(const Slice &key) = 0; // 释放由 Insert 或者 Lokkup 返回的 handle。Release a mapping returned by a previous Lookup(). // NOTE: 1.仅能在一个 handle 上调用一次 this->Release(handle) // NOTE: 2.handle 在调用该方法后不能再使用(handle 指针会被 free 掉) virtual void Release(Handle *handle) = 0; // 获取缓存内容 virtual void *Value(Handle *hanle) = 0; // 见上面第6点 virtual uint64_t NewId() = 0; // 清楚所有的未使用缓存 virtual void Prune() { } // 查看缓存使用总量 virtual size_t TotalCharge() const = 0; private: // No copying allowed Cache(const Cache &); void operator=(const Cache &); };因为 leveldb 中支持对定制自己的缓存类,所以其头文件中的 Cache 是一个虚拟基类,更像是一个接口定义,《effective c++》中称之为 Interface class。每个接口的含义我都已经加上了注释,为了保证除了 Cache 类,没有人可以操作缓存数据(这是非常重要的一点,因为 Cache 中存储的都是 void *,Cache 类必须要确保它自己唯一管理这些缓存对象生命周期),这里使用了 Handle(句柄)。虽然在查询的时候额外引入了一层间接层,但是可以充分保证数据安全。
下面让我们看一下实现代码:
struct LRUHandle { // 这便是返回的 Handle,在返回的时候会 return reinterpret_cast<Cache::Handle*>(handle) void *value; void (*deleter)(const Slice &, void *); LRUHandle *next_hash; // 给自己实现的 hahs_table 使用; LRUHandle *next; // 双向链表 LRUHandle *prev; // 双向链表 size_t charge; // 权重 size_t key_length; // key 长度 bool in_cache; // 是否被缓存(我们的类可以通过将 capacity 设置为0来关闭缓存) uint32_t refs; // 引用计数 uint32_t hash; // 我们要将 hash 值缓存起来,防止需要重复 hash 计算 char key_data[1]; // 柔性数组,存储键 Slice key() const { assert(next != this); // Hold true iff this is the head of an empty list return Slice(key_data, key_length); } };代码中 Slice 是 google 项目常见的一个工具类,其实现非常简单,可以认为它仅仅是对一个字符串的引用(因为它自己不管理内存,咱们在这里可以简单的把它当做 std::string 也是可以的,它可以使用的地方都可以使用 std::string(反过来不成立))。
可以看到我们这个 LRUHandle 类中的数据还是不少的,在后面的实现代码中我们能看到它们的用武之地。下面要看的代码是 levedb 自己实现的一个很短小精悍的 hash_table,其使用开链法解决冲突,代码真的很优雅,很值得一读:
class HandleTable { public: HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); } ~HandleTable() { delete[] list_; } LRUHandle *Lookup(const Slice &key, uint32_t hash) { return *FindPointer(key, hash); } // Insert into the last position LRUHandle *Insert(LRUHandle *h) { LRUHandle **ptr = FindPointer(h->key(), h->hash); LRUHandle *old = *ptr; h->next_hash = (old == NULL ? NULL : old->next_hash); *ptr = h; if (old == NULL) { ++elems_; if (elems_ > length_) { Resize(); } } return old; } LRUHandle *Remove(const Slice &key, uint32_t hash) { LRUHandle **ptr = FindPointer(key, hash); LRUHandle *result = *ptr; if (result != NULL) { *ptr = result->next_hash; --elems_; } return result; } private: uint32_t length_; // hash table 的 bucket 数 uint32_t elems_; // hash table 中 entry 数 LRUHandle **list_; // hash bucket 数组 LRUHandle **FindPointer(const Slice &key, uint32_t hash) { assert(list_ != NULL); LRUHandle **ptr = &list_[hash & (length_ - 1)]; // 找到对应键的桶,取第一个 entry while (*ptr != NULL && ((*ptr)->hash != hash || (*ptr)->key() != key)) { ptr = &(*ptr)->next_hash; // 注意,我们返回的不是一个 LRUHandle *,而是一个 LRUHandle **(&(LRUHandle::next_hash)) } return ptr; } void Resize() { uint32_t new_length = 4; // length_ 一定是 2 的幂次,这样可以使用 hash & (length_ - 1) 来算出 hash % length_ while (new_length < elems_) new_length <<= 1; // 算出新 length,因为 length > elems_; 所以装载因子<1 LRUHandle **new_list = new LRUHandle *[new_length]; memset(new_list, 0, sizeof(new_list[0]) * new_length); uint32_t count = 0; for (uint32_t i = 0; i < length_; i++) { // 遍历所有的桶,为桶内元素搬家 LRUHandle *h = list_[i]; // 每个桶的第一个 entry while (h != NULL) { LRUHandle *next = h->next_hash; // 先保存起来 next uint32_t hash = h->hash; LRUHandle **ptr = &new_list[hash & (new_length-1)]; // 计算在新表中的桶下标,取桶内第一个 entry h->next_hash = *ptr; // 头插法 *ptr = h; h = next; ++count; } } assert(count == elems_); delete[] list_; // 更新 list_ 和 length_ list_ = new_list; length_ = new_length; } };上面的代码真的写的太好了,笔者都有怎么读都读不够的感觉啊,谷歌工程师,真的太强了。简要的说下最重要的两个接口:Resize() 和 FindPointer()。
先说 Resize()。在 hash_table 中扩容的时候,往往有两个常见的选择,一是 bucket 的数量选用一个素数,这样可以保证 hash mod 之后的结果最均匀,还有一种是使用2的幂次结果作为 bucket 数,后者是通过优化 hash % #bucket 这个运算来优化(因为 #bucket = 2^n,hash % #bucket 等价于 hash & (length - 1))。在 leveldb 中选用的是后者来优化。
然后是 FindPointer(),在读这份源码之前,我对链表内元素的插入和删除之后使用两个指针,一前一后,来操作链表,但是作者使用一个 LRUHandle **指针,一个指针就解决了对链表元素的插入和删除,真的太巧妙了。我觉得这个代码都值得单独拿出来讲,所以这里并不准备全部展开,但是推荐大家好好品读,我也会学习这个写法,自己写一个 list 实现,到时候也会发到博客,和大家一起交流。
有了 hash_table,我们可以开始实现了 LRUCache 了:
class LRUCache { public: LRUCache(); ~LRUCache(); void SetCapacity(size_t cap) { capacity_ = cap; } Cache::Handle *Insert(const Slice &key, uint32_t hash, void *value, size_t charge, void (*deleter)(const Slice &, void *)); Cache::Handle *Lookup(const Slice &key, uint32_t hash); void Release(Cache::Handle *handle); void Erase(const Slice &key, uint32_t hash); void Prune(); size_t TotalCharge() const { MutexLock l(&mutex_); return usage_; } private: void LRU_Remove(LRUHandle *e); void LRU_Append(LRUHandle *list, LRUHandle *e); void Ref(LRUHandle *e); void Unref(LRUHandle *e); bool FinishErase(LRUHandle *e); // Initialized before use. size_t capacity_; // Capacity usage guarded by mutex mutable Mutex mutex_; size_t usage_; // 所有未被使用的 entries 都被存储于 lru_ 这个双向链表中,这里使用 dummy head 的方法实现双向链表 // lru_.prev 是最新的 entry,lru_.next 是最早的 entry(也就是说换出缓存的时候就从 lru_.next 开刀) // lru_ 中的所有 entries 都有 refs == 1 && in_cache == true (仅被 LRUCache 类引用,而且被缓存) LRUHandle lru_; // 所有现在被使用(客户端持有至少一个尚未 Release() 的 handle)的 entries 存储在 in_use_ 双向链表中 // in_use_ 中 entries 有 refs >= 2 && in_cache = true LRUHandle in_use_; HandleTable table_; // 用于进行 key->handle 的 mapping }; LRUCache::LRUCache() : usage_(0) { // 初始化两个双向链表的 dummy head lru_.next = &lru_; lru_.prev = &lru_; in_use_.next = &in_use_; in_use_.prev = &in_use_; } LRUCache::~LRUCache() { assert(in_use_.next == &in_use_); // 如果在 LRUCache 在析构的时候,还有客户端持有尚未 Release() 的 handle,那么就会出问题 LRUHandle *e = lru_.next; while (e != &lru_) { // 释放所有 entries LRUHandle *next = e->next; assert(e->in_cache); e->in_cache = false; assert(e->refs == 1); Unref(e); e = next; } } void LRUCache::Ref(LRUHandle *e) { // 增加引用计数,必要的话将其从 lru_. 挪到 in_use_ if (e->refs == 1 && e->in_cache) { // ++ 后 refs 大于1,代表有客户端使用 LRU_Remove(e); // 从 lru_ 中删除 LRU_Append(&in_use_, e); // 插入 in_use_ 中 } e->refs++; // 增加引用计数 } void LRUCache::Unref(LRUHandle *e) { assert(e->refs > 0); // Unref() 调用前,这个 handle 应该是被引用的 if (--e->refs == 0) { // 无人引用(包括 LRUCache 类),需要调用其构造时传入的删除器,管理资源 assert(!e->in_cache); (*e->deleter)(e->key(), e->value); free(e); // 注意所有的 handle 都是 heap-allocated,必须要free } else if (e->in_cache && e->refs == 1) { // refs==1 代表仅有 LRUCache 类引用 LRU_Remove(e); // 从 in_use_ 中删除 LRU_Append(&lru_, e); // 插入 lru_ 中 } } // 将元素从双向链表中删除(删除并不需要知道其所属的 list) void LRUCache::LRU_Remove(LRUHandle *e) { e->prev->next = e->next; e->next->prev = e->prev; } // 将元素插入到双向链表 head.prev void LRUCache::LRU_Append(LRUHandle *list, LRUHandle *e) { list->prev->next = e; e->prev = list->prev; e->next = list; list->prev = e; } Cache::Handle *LRUCache::Lookup(const Slice &key, uint32_t hash) { MutexLock l(&mutex_); LRUHandle *e = table_.Lookup(key, hash); // 在 hash_table 中查找 if (e != NULL) Ref(e); // 如果有被缓存起来,增加引用计数等 return reinterpret_cast<Cache::Handle*>(e); // 将其 cast 为一个 Handle * 返回,还记得 Cache 类中的接口吧? } void LRUCache::Release(Cache::Handle *e) { MutexLock l(&mutex_); Unref(reinterpret_cast<LRUHandle*>(e)); } Cache::Handle *LRUCache::Insert( const Slice &key, uint32_t hash, void *value, size_t charge, void (*deleter)(const Slice &, void *)) { MutexLock l(&mutex_); LRUHandle *e = reinterpret_cast<LRUHandle*>( malloc(sizeof(LRUHandle) + (key.size() - 1))); // 这里注意,因为 LRUHandle 中保存了 key_length,所以这里在申请内存的时候,并没有为 '\0'申请内存,如果在 LRUHandle::key_data 上调用 strlen() 会有想不到的后果 e->value = value; e->charge = charge; e->deleter = deleter; e->key_length = key.size(); e->in_cache = false; e->refs = 1; // 这里将引用计数设为1是代表被 this->Insert() 的调用者引用,并不是代表 LRUCache 引用,所以如果不保存 this->Insert() 的返回结果会有很大的问题 memcpy(e->key_data, key.data(), key.size()); if (capacity_ > 0) { // 需要缓存起来 e->refs++; // LRUCache 类也要引用这个 handle 了 e->in_cache = true; LRU_Append(&in_use_, e); // 因为这时候 refs == 2,需要放到 in_use_ 中 usage_ += charge; FinishErase(table_.Insert(e)); // 如果相同键插入两次,会覆盖之前的缓存,并且需要将之前的缓存删除 } else { // 如果将 capacity 设置为0,代表的是关闭缓存 // 如果 handle->next == handle,会无法通过 LRUHandle::key() 中的断言 e->next = NULL; } // 如果本次插入导致缓存超出上限,需要换出旧缓存 while (usage_ > capacity_) { LRUHandle *oldest = lru_.next; // 还记得 lru_.next 存的是最早未使用的缓存吧 assert(oldest->refs == 1); bool erased = FinishErase(table_.Remove(oldest->key(), oldest->hash)); // 删除调用缓存 if (!erased) { // 如果 erased 是 false,代表 lru_ 中保存有没有被缓存起来的 entry,这是一个严重的问题 assert(erased); // 一条一定会失败的断言 } } return reinterpret_cast<Cache::Handle*>(e); } // 该接口仅应该在用与 FinishErase(handle_table_.Remove()) 一起使用,因为它做的是删除缓存后的后续工作 bool LRUCache::FinishErase(LRUHandle *e) { if (e != NULL) { assert(e->in_cache); LRU_Remove(e); // 从 lru_ 中删除 e->in_cache = false; // 设置状态 usage_ -= e->charge; Unref(e); // 一定会进入调用删除器的分支 } return e != NULL; } void LRUCache::Erase(const Slice &key, uint32_t hash) { MutexLock l(&mutex_); FinishErase(table_.Remove(key, hash)); } void LRUCache::Prune() { MutexLock l(&mutex_); while (lru_.next != &lru_) { // 遍历 lru_ 中保存的 entries,将其全部删除掉 LRUHandle *e = lru_.next; assert(e->refs == 1); bool erased = FinishErase(table_.Remove(e->key(), e->hash)); if (!erased) { assert(erased); } } }代码的解释都在注释中,这里我们从设计角度看看,这个 LRUCache 是如何实现的。首先,数据结构上,使用的是双向链表+hash_table,LRU 中要做到可以迅速找到最早未使用的缓存,所以使用了两个双向链表,分别用于保存“在使用(in_use_)”和“未使用(lru_)”的缓存,这样,lru_.next 便指向了最早未使用缓存。而 hash_table 则用于对 key->value 进行检索。而对于线程安全的实现,则是选用直观的 mutex,mutex 不代表慢,大家应该要记住,慢的不是锁,而是锁的竞争。
至此一个线程安全的 LRUCache 类就已经全部实现,在 leveldb 中使用的缓存是 ShardedLRUCache,其实就是聚合多个 LRUCache 实例,真正的逻辑都在 LRUCache 类中。
鉴于自身水平有限,文章中有难免有些错误,欢迎大家指出。也希望大家可以积极留言,与笔者一起讨论编程的那些事。