diff --git a/tools/merge/rocks_c++/merge.cc b/tools/merge/rocks_c++/merge.cc deleted file mode 100644 index d2e2a5be3a5f253dd60656f184a985e5eae7a4de..0000000000000000000000000000000000000000 --- a/tools/merge/rocks_c++/merge.cc +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * compile : g++ merge.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs - * usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder> - * - * find the rocksdb headers here https://github.com/facebook/rocksdb/tree/master/include/rocksdb - * - */ - -#include <fstream> -#include <cstdio> -#include <iostream> -#include <string> -#include "rocksdb/db.h" -#include "rocksdb/options.h" -#include <experimental/filesystem> - -using namespace rocksdb; -namespace fs = std::experimental::filesystem; - -int main(int argc, char* argv[]) { - if(argc != 3) { - std::cerr << "Wrong number of arguments\nusage : ./<executable>\ - <folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n"; - exit(0); - } - - std::string destinationDB = argv[2], mergeDir = argv[1]; - DB* db; - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, destinationDB, &db); - assert(s.ok()); - - for (auto& dirEntry : fs::directory_iterator(mergeDir)) { - std::cout << dirEntry << std::endl; - DB* cur_db; - Options options; - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // Don't create the DB if it's not already present - options.create_if_missing = false; - - // open DB - Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db); - assert(s1.ok()); - - rocksdb::Iterator* it = cur_db->NewIterator(rocksdb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - Slice key_s = it->key(); - Slice val_s = it->value(); - std::string val_t; - Status s = db->Get(ReadOptions(), key_s, &val_t); - if(s.ok()) { - assert(val_t == val_s.ToString() && "Same key has different value"); - } else { - s = db->Put(WriteOptions(), key_s, val_s); - assert(s.ok()); - } - } - assert(it->status().ok()); // Check for any errors found during the scan - delete it; - delete cur_db; - } - - delete db; - return 0; -} diff --git a/tools/merge/rocks_c++/merge_heap.cc b/tools/merge/rocks_c++/merge_heap.cc deleted file mode 100644 index 34e755d54d524896bc8e21d4257b98b66196d603..0000000000000000000000000000000000000000 --- a/tools/merge/rocks_c++/merge_heap.cc +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * compile : g++ merge_heap.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs - * usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder> - * - * rocksdb headers can be found here https://github.com/facebook/rocksdb/tree/master/include/rocksdb - */ - -#include <fstream> -#include <cstdio> -#include <iostream> -#include <string> -#include <queue> -#include <vector> -#include "rocksdb/db.h" -#include "rocksdb/options.h" -#include <experimental/filesystem> - -using namespace rocksdb; -namespace fs = std::experimental::filesystem; - -class node { -public: - Slice key; - Slice value; - int idx; - node(Slice k, Slice v, int id) { - key = k; - value = v; - idx = id; - } -}; - -class compare { - public: - bool operator()(node &a, node &b) { - return a.key.compare(b.key) <= 0; - } -}; - -int main(int argc, char* argv[]) { - if(argc != 3) { - std::cerr << "Wrong number of arguments\nusage : ./<executable>\ - <folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n"; - exit(0); - } - - int counter = 0; - std::priority_queue<struct node, std::vector<node>, compare> pq; - std::vector<rocksdb::Iterator*> itVec; - std::string destinationDB = argv[2], mergeDir = argv[1]; - DB* db; - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, destinationDB, &db); - assert(s.ok()); - - for (auto& dirEntry : fs::directory_iterator(mergeDir)) { - std::cout << dirEntry << std::endl; - DB* cur_db; - Options options; - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // Don't create the DB if it's not already present - options.create_if_missing = false; - - // open DB - Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db); - assert(s1.ok()); - - rocksdb::Iterator *it = cur_db->NewIterator(rocksdb::ReadOptions()); - it->SeekToFirst(); - if(!it->Valid()) { - continue; - } - struct node tnode(it->key(), it->value(), counter++); - itVec.push_back(it); - pq.push(tnode); - } - - Slice lastKey, lastValue; - - while(!pq.empty()) { - const struct node &top = pq.top(); - pq.pop(); - - if(top.key == lastKey) { - assert(top.value == lastValue); - } else { - s = db->Put(WriteOptions(), top.key, top.value); - assert(s.ok()); - lastKey = top.key; - lastValue = top.value; - } - - itVec[top.idx]->Next(); - if(!itVec[top.idx]->Valid()) { - continue; - } - struct node tnode(itVec[top.idx]->key(), itVec[top.idx]->value(), top.idx); - pq.push(tnode); - } - - delete db; - return 0; -} diff --git a/tools/merge/rocksmerge.go b/tools/merge/rocksmerge.go index cee8a37bd00abab5b2a8b77d0539a64d8963441b..6000ede8a3af569dab0769deaa8dc18652d3c3b4 100644 --- a/tools/merge/rocksmerge.go +++ b/tools/merge/rocksmerge.go @@ -22,6 +22,8 @@ import ( "container/heap" "flag" "io/ioutil" + "math" + "path" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting/types" @@ -42,6 +44,7 @@ var destinationDB = flag.String("dest", "", "Folder to store merged rocksDB") var glog = x.Log("rocksmerge") +var pq PriorityQueue func (pq PriorityQueue) Len() int { return len(pq) } func (pq PriorityQueue) Swap(i, j int) { @@ -61,7 +64,7 @@ func (pq *PriorityQueue) Pop() interface{} { return item } -func compareValue(a, b interface{}) bool { +func equalValue(a, b interface{}) bool { var x, y types.Posting p1 := a.(*posting.List) if ok := p1.Get(&x, 0); !ok { @@ -72,11 +75,16 @@ func compareValue(a, b interface{}) bool { glog.Fatal("While retrieving entry from posting list") } - return x.Uid() == y.Uid() + aUid := x.Uid() + bUid := y.Uid() + + return aUid == bUid || + ((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) && + bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0) } func MergeFolders(mergePath, destPath string) { - files, err := ioutil.ReadDir(mergePath) + dirList, err := ioutil.ReadDir(mergePath) if err != nil { glog.Fatal("Cannot open stores directory") } @@ -91,28 +99,25 @@ func MergeFolders(mergePath, destPath string) { wopt.SetSync(true) count := 0 - for range files { + for range dirList { count++ } - if mergePath[len(mergePath)-1] == '/' { - mergePath = mergePath[:len(mergePath)-1] - } - - pq := make(PriorityQueue, count) - var itVec []*rocksdb.Iterator - for i, f := range files { - curDb, err := rocksdb.Open(mergePath+"/"+f.Name(), opt) + pq = make(PriorityQueue, count) + var storeIters []*rocksdb.Iterator + for i, dir := range dirList { + mPath := path.Join(mergePath, dir.Name()) + curDb, err := rocksdb.Open(mPath, opt) defer curDb.Close() if err != nil { - glog.WithField("filepath", mergePath+"/"+f.Name()). + glog.WithField("filepath", mPath). Fatal("While opening store") } it := curDb.NewIterator(ropt) it.SeekToFirst() if !it.Valid() { - itVec = append(itVec, it) - glog.WithField("path", mergePath+"/"+f.Name()).Info("Store empty") + storeIters = append(storeIters, it) + glog.WithField("path", mPath).Info("Store empty") continue } pq[i] = &Item{ @@ -120,12 +125,22 @@ func MergeFolders(mergePath, destPath string) { value: it.Value(), storeIdx: i, } - itVec = append(itVec, it) + storeIters = append(storeIters, it) } heap.Init(&pq) + mergeUsingHeap(destPath, storeIters) +} + +func mergeUsingHeap(destPath string, storeIters []*rocksdb.Iterator) { + var opt *rocksdb.Options + var wopt *rocksdb.WriteOptions + opt = rocksdb.NewOptions() + opt.SetCreateIfMissing(true) + wopt = rocksdb.NewWriteOptions() + wopt.SetSync(true) var db *rocksdb.DB - db, err = rocksdb.Open(destPath, opt) + db, err := rocksdb.Open(destPath, opt) defer db.Close() if err != nil { glog.WithField("filepath", destPath). @@ -137,7 +152,7 @@ func MergeFolders(mergePath, destPath string) { top := heap.Pop(&pq).(*Item) if bytes.Compare(top.key, lastKey) == 0 { - if !compareValue(top.value, lastValue) { + if !equalValue(top.value, lastValue) { glog.WithField("key", lastKey). Fatal("different value for same key") } @@ -146,13 +161,13 @@ func MergeFolders(mergePath, destPath string) { lastKey = top.key lastValue = top.value - itVec[top.storeIdx].Next() - if !itVec[top.storeIdx].Valid() { + storeIters[top.storeIdx].Next() + if !storeIters[top.storeIdx].Valid() { continue } item := &Item{ - key: itVec[top.storeIdx].Key(), - value: itVec[top.storeIdx].Value(), + key: storeIters[top.storeIdx].Key(), + value: storeIters[top.storeIdx].Value(), storeIdx: top.storeIdx, } heap.Push(&pq, item)