Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* compile : g++ rocks_merge.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs
* usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder>
*
*/
#include <fstream>
#include <cstdio>
#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include <experimental/filesystem>
using namespace rocksdb;
namespace fs = std::experimental::filesystem;
class node {
public:
std::string key;
std::string value;
int idx;
node(std::string k, std::string v, int id) {
key = k;
value = v;
idx = id;
}
};
class compare {
public:
bool operator()(node &a, node &b) {
return a.key < b.key;
}
};
int main(int argc, char* argv[]) {
if(argc != 3) {
std::cerr << "Wrong number of arguments\nusage : ./<executable>\
<folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n";
exit(0);
}
int counter = 0;
std::priority_queue<struct node, std::vector<node>, compare> pq;
std::vector<rocksdb::Iterator*> itVec;
std::string destinationDB = argv[2], mergeDir = argv[1];
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
// open DB
Status s = DB::Open(options, destinationDB, &db);
assert(s.ok());
for (auto& dirEntry : fs::directory_iterator(mergeDir)) {
std::cout << dirEntry << std::endl;
DB* cur_db;
Options options;
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// Don't create the DB if it's not already present
options.create_if_missing = false;
// open DB
Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db);
assert(s1.ok());
rocksdb::Iterator *it = cur_db->NewIterator(rocksdb::ReadOptions());
it->SeekToFirst();
if(!it->Valid()) {
continue;
}
struct node tnode(it->key().ToString(), it->value().ToString(), counter++);
itVec.push_back(it);
pq.push(tnode);
}
std::string lastKey = "", lastValue = "";
while(!pq.empty()) {
struct node top = pq.top();
pq.pop();
if(top.key == lastKey) {
assert(top.value == lastValue);
} else {
s = db->Put(WriteOptions(), top.key, top.value);
assert(s.ok());
lastKey = top.key;
lastValue = top.value;
}
itVec[top.idx]->Next();
if(!itVec[top.idx]->Valid()) {
continue;
}
struct node tnode(itVec[top.idx]->key().ToString(), itVec[top.idx]->value().ToString(), top.idx);
pq.push(tnode);
}
delete db;
return 0;
}