Home > programming > leveldb阅读笔记—数据写入

leveldb阅读笔记—数据写入

October 24th, 2019 Leave a comment Go to comments

leveldb的写入可以简单的调用方法.

Status Put(const WriteOptions& options, const Slice& key,
                   const Slice& value) = 0;

其中key和value都可以是任何byte数据. 且可以是任意长度. 但是key的长度尽量越短越好, 这样有助于提供效率.

向leveldb的写入过程是比较简单的. 下图是一个 简化以后的流程, 大致上描述了写入的全过程. 去掉了诸如后台压缩线程的启动等内容.

write flow

首先, 不管是调动DB::Put 还是DB::Write最终都会走到Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates)这个函数来.

首先, leveldb会把updates写入一个deque变量writers_中.

Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);

然后, leveldb会去检查当前的memory table是否还有足够的空间可以容纳下当前的这一次写入. 不行的话就flush当前的数据到disk, 然后启动新的memory table.

Status DBImpl::MakeRoomForWrite(bool force) {
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      // 后台压缩线程如果出现了错误, 就在这里把错误返回出去
      s = bg_error_; 
      break;
    } else if (allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
      // level0的文件数量太多, 此时后台压缩线程应该还在工作中. 
      // 所以这里暂停一下, 不要让大量的写入让level0的文件数量越来越多.
      // 这里因为释放了mutex_这把锁. 所以可能会造成writers_中有多个writeBatch
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // 当前memory table还可用.
      break;
    } else if (imm_ != nullptr) {
      // imm还存在着, 表明后台的压缩线程还未执行完毕
      background_work_finished_signal_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // level0的文件实在是太多, 强行等待压缩线程完成其工作.
      background_work_finished_signal_.Wait();
    } else {
      // memory table使用的内存已经超标了, 而且imm也已经被flush到disk
      // 在这里将当前的memory table设置为imm(也就是imutable memory table)
      // 然后再产生一个新的memory table. writes_里面包含的数据都往这个心的
      // memory table中写入了.
      ...
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.store(true, std::memory_order_release);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;
      // 这里尝试着启动一下后台压缩线程, 因为产生了一个新的imm.
      MaybeScheduleCompaction();
    }
  }

上述代码描述了MakeRoomForWrite所做的事情

  • 如果后台压缩线程出错了, 那么将这个错误通过write这个线程返回回去
  • 如果level 0的文件数量超过了8个, 那么稍微等一下后台压缩线程, 免得写入太快导致后台压缩线程来不及工作. 不过这里只是稍微等待1ms时间.
  • 如果immutable memory table(imm)还没有被flush到disk, 那么等待后台线程完成这个工作
  • 如果文件实在是太多了, 超过了12个, 那么久强行等待后台压缩线程工作结束
  • 上述条件都不满足, 并且当前的memory table(mm)使用的空间已经超过预设的4M, 那么使用当前mm替换imm, 然后产生一个新的mm供后续写入

运行到这里, 表明mm有足够的空间可以容纳当前的数据. leveldb开始对writers_里面的数据进行打包. 上面MakeRoomForWrite里面如果level 0的文件数量超过8, leveldb会等待1ms时间, 在这个等待的时间里面, leveldb暂时会释放mutex锁. 这就可能导致在这个等待期间内.有新的线程插入新的数据. 所以这时writers里面可能会有多个batch形成的一个batchlist. 打包的任务就是将这些batch打包在一起形成一个新的batch. 但是也有一些限制. 新的batch的size不能超过1M. 并且如果batchlist[0]的大小不超过128K的话, 那么新的batch size不超过batchlist[0].size() + 128K.

数据打包好了以后, leveldb会暂时释放开mutex, 然后向log文件中写入打包好的数据. 然后把这个打包的数据写入memory table. memory table的主体数据结构是一个SkipList. 所以写入的时候实际上又会把刚才打包的数据一个一个拆开成key-value pair. 实际上写入到memory table的时候的key并不是用户传入的key, 而是InternalKey(参见数据结构篇). InternalKey有一个sequenceNumber和一个Type. type很简单, 如果是插入数据,type=1(kTypeValue). 如果是删除数据, type=0(kTypeDeletetion). 而sequence number在每次向skip list插入一个数据以后sequence number都会加一. 这样可以保证在skip list里面的数据是没有重复key的. 我猜这也是为何leveldb的skiplist不允许有重复key的理由.当后台线程被启动以后, 会将imm的数据flush到level0的sst文件中去, 同时该文件的一些metadata如最小的key和最大的key以及文件的size会同version记录到manifest文件中去. 至此, write的流程结束. 后续leveldb会在压缩(整理)后台线程中将imm的数据flush到level0的文件中去.

Categories: programming Tags: ,
  1. No comments yet.
  1. No trackbacks yet.