Skip to content
Snippets Groups Projects
merge_heap.cc 3.34 KiB
Newer Older
  • Learn to ignore specific revisions
  • Ashwin's avatar
    Ashwin committed
    /*
    
    Ashwin's avatar
    Ashwin committed
     * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
     *
    
    Ashwin's avatar
    Ashwin committed
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
    
     *  compile : g++ merge_heap.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs
    
    Ashwin's avatar
    Ashwin committed
     *  usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder>
    
     *
     *  rocksdb headers can be found here https://github.com/facebook/rocksdb/tree/master/include/rocksdb
     */
    
    Ashwin's avatar
    Ashwin committed
    
    #include <fstream>
    #include <cstdio>
    #include <iostream>
    #include <string>
    #include <queue>
    #include <vector>
    #include "rocksdb/db.h"
    #include "rocksdb/options.h"
    #include <experimental/filesystem>
    
    using namespace rocksdb;
    namespace fs = std::experimental::filesystem;
    
    class node {
    public:
    
    Ashwin's avatar
    Ashwin committed
      int idx;
    
      node(Slice k, Slice v, int id) {
    
    Ashwin's avatar
    Ashwin committed
        key = k;
        value = v;
        idx = id;
      }
    };
    
    class compare {
      public:
        bool operator()(node &a, node &b) {
    
          return a.key.compare(b.key) <= 0;
    
    Ashwin's avatar
    Ashwin committed
        }
    };
    
    int main(int argc, char* argv[]) {
      if(argc != 3) {
        std::cerr << "Wrong number of arguments\nusage : ./<executable>\
    
    Ashwin's avatar
    Ashwin committed
            <folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n";
    
    Ashwin's avatar
    Ashwin committed
        exit(0);
      }
    
    Ashwin's avatar
    Ashwin committed
    
    
    Ashwin's avatar
    Ashwin committed
      int counter = 0;
      std::priority_queue<struct node, std::vector<node>, compare> pq;
      std::vector<rocksdb::Iterator*> itVec;
      std::string destinationDB = argv[2], mergeDir = argv[1];
      DB* db;
      Options options;
      // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
      options.IncreaseParallelism();
      options.OptimizeLevelStyleCompaction();
      // create the DB if it's not already present
      options.create_if_missing = true;
    
    Ashwin's avatar
    Ashwin committed
    
    
    Ashwin's avatar
    Ashwin committed
      // open DB
      Status s = DB::Open(options, destinationDB, &db);
      assert(s.ok());
    
      for (auto& dirEntry : fs::directory_iterator(mergeDir)) {
        std::cout << dirEntry << std::endl;
        DB* cur_db;
        Options options;
        options.IncreaseParallelism();
        options.OptimizeLevelStyleCompaction();
        // Don't create the DB if it's not already present
        options.create_if_missing = false;
    
    Ashwin's avatar
    Ashwin committed
    
    
    Ashwin's avatar
    Ashwin committed
        // open DB
        Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db);
        assert(s1.ok());
    
        rocksdb::Iterator *it = cur_db->NewIterator(rocksdb::ReadOptions());
        it->SeekToFirst();
        if(!it->Valid()) {
          continue;
    
    Ashwin's avatar
    Ashwin committed
        }
    
        struct node tnode(it->key(), it->value(), counter++);
    
    Ashwin's avatar
    Ashwin committed
        itVec.push_back(it);
        pq.push(tnode);
      }
    
    
      Slice lastKey, lastValue;
    
    Ashwin's avatar
    Ashwin committed
    
      while(!pq.empty()) {
    
        const struct node &top = pq.top();
    
    Ashwin's avatar
    Ashwin committed
        pq.pop();
    
    Ashwin's avatar
    Ashwin committed
    
    
    Ashwin's avatar
    Ashwin committed
        if(top.key == lastKey) {
          assert(top.value == lastValue);
        } else {
          s = db->Put(WriteOptions(), top.key, top.value);
          assert(s.ok());
          lastKey = top.key;
          lastValue = top.value;
        }
    
    Ashwin's avatar
    Ashwin committed
    
    
    Ashwin's avatar
    Ashwin committed
        itVec[top.idx]->Next();
    
    Ashwin's avatar
    Ashwin committed
        if(!itVec[top.idx]->Valid()) {
    
    Ashwin's avatar
    Ashwin committed
          continue;
        }
    
        struct node tnode(itVec[top.idx]->key(), itVec[top.idx]->value(), top.idx);
    
    Ashwin's avatar
    Ashwin committed
        pq.push(tnode);
      }
    
    
    Ashwin's avatar
    Ashwin committed
      delete db;
    
    Ashwin's avatar
    Ashwin committed
      return 0;
    }