Skip to content
Snippets Groups Projects
Commit ffcb50d3 authored by Ashwin's avatar Ashwin
Browse files

Rename variables, check equivalence of XIDs

parent 36cf856b
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* compile : g++ merge.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs
* usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder>
*
* find the rocksdb headers here https://github.com/facebook/rocksdb/tree/master/include/rocksdb
*
*/
#include <fstream>
#include <cstdio>
#include <iostream>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include <experimental/filesystem>
using namespace rocksdb;
namespace fs = std::experimental::filesystem;
int main(int argc, char* argv[]) {
if(argc != 3) {
std::cerr << "Wrong number of arguments\nusage : ./<executable>\
<folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n";
exit(0);
}
std::string destinationDB = argv[2], mergeDir = argv[1];
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
// open DB
Status s = DB::Open(options, destinationDB, &db);
assert(s.ok());
for (auto& dirEntry : fs::directory_iterator(mergeDir)) {
std::cout << dirEntry << std::endl;
DB* cur_db;
Options options;
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// Don't create the DB if it's not already present
options.create_if_missing = false;
// open DB
Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db);
assert(s1.ok());
rocksdb::Iterator* it = cur_db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
Slice key_s = it->key();
Slice val_s = it->value();
std::string val_t;
Status s = db->Get(ReadOptions(), key_s, &val_t);
if(s.ok()) {
assert(val_t == val_s.ToString() && "Same key has different value");
} else {
s = db->Put(WriteOptions(), key_s, val_s);
assert(s.ok());
}
}
assert(it->status().ok()); // Check for any errors found during the scan
delete it;
delete cur_db;
}
delete db;
return 0;
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* compile : g++ merge_heap.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs
* usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder>
*
* rocksdb headers can be found here https://github.com/facebook/rocksdb/tree/master/include/rocksdb
*/
#include <fstream>
#include <cstdio>
#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include <experimental/filesystem>
using namespace rocksdb;
namespace fs = std::experimental::filesystem;
class node {
public:
Slice key;
Slice value;
int idx;
node(Slice k, Slice v, int id) {
key = k;
value = v;
idx = id;
}
};
class compare {
public:
bool operator()(node &a, node &b) {
return a.key.compare(b.key) <= 0;
}
};
int main(int argc, char* argv[]) {
if(argc != 3) {
std::cerr << "Wrong number of arguments\nusage : ./<executable>\
<folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n";
exit(0);
}
int counter = 0;
std::priority_queue<struct node, std::vector<node>, compare> pq;
std::vector<rocksdb::Iterator*> itVec;
std::string destinationDB = argv[2], mergeDir = argv[1];
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
// open DB
Status s = DB::Open(options, destinationDB, &db);
assert(s.ok());
for (auto& dirEntry : fs::directory_iterator(mergeDir)) {
std::cout << dirEntry << std::endl;
DB* cur_db;
Options options;
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// Don't create the DB if it's not already present
options.create_if_missing = false;
// open DB
Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db);
assert(s1.ok());
rocksdb::Iterator *it = cur_db->NewIterator(rocksdb::ReadOptions());
it->SeekToFirst();
if(!it->Valid()) {
continue;
}
struct node tnode(it->key(), it->value(), counter++);
itVec.push_back(it);
pq.push(tnode);
}
Slice lastKey, lastValue;
while(!pq.empty()) {
const struct node &top = pq.top();
pq.pop();
if(top.key == lastKey) {
assert(top.value == lastValue);
} else {
s = db->Put(WriteOptions(), top.key, top.value);
assert(s.ok());
lastKey = top.key;
lastValue = top.value;
}
itVec[top.idx]->Next();
if(!itVec[top.idx]->Valid()) {
continue;
}
struct node tnode(itVec[top.idx]->key(), itVec[top.idx]->value(), top.idx);
pq.push(tnode);
}
delete db;
return 0;
}
......@@ -22,6 +22,8 @@ import (
"container/heap"
"flag"
"io/ioutil"
"math"
"path"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/posting/types"
......@@ -42,6 +44,7 @@ var destinationDB = flag.String("dest", "",
"Folder to store merged rocksDB")
var glog = x.Log("rocksmerge")
var pq PriorityQueue
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Swap(i, j int) {
......@@ -61,7 +64,7 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}
func compareValue(a, b interface{}) bool {
func equalValue(a, b interface{}) bool {
var x, y types.Posting
p1 := a.(*posting.List)
if ok := p1.Get(&x, 0); !ok {
......@@ -72,11 +75,16 @@ func compareValue(a, b interface{}) bool {
glog.Fatal("While retrieving entry from posting list")
}
return x.Uid() == y.Uid()
aUid := x.Uid()
bUid := y.Uid()
return aUid == bUid ||
((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) &&
bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0)
}
func MergeFolders(mergePath, destPath string) {
files, err := ioutil.ReadDir(mergePath)
dirList, err := ioutil.ReadDir(mergePath)
if err != nil {
glog.Fatal("Cannot open stores directory")
}
......@@ -91,28 +99,25 @@ func MergeFolders(mergePath, destPath string) {
wopt.SetSync(true)
count := 0
for range files {
for range dirList {
count++
}
if mergePath[len(mergePath)-1] == '/' {
mergePath = mergePath[:len(mergePath)-1]
}
pq := make(PriorityQueue, count)
var itVec []*rocksdb.Iterator
for i, f := range files {
curDb, err := rocksdb.Open(mergePath+"/"+f.Name(), opt)
pq = make(PriorityQueue, count)
var storeIters []*rocksdb.Iterator
for i, dir := range dirList {
mPath := path.Join(mergePath, dir.Name())
curDb, err := rocksdb.Open(mPath, opt)
defer curDb.Close()
if err != nil {
glog.WithField("filepath", mergePath+"/"+f.Name()).
glog.WithField("filepath", mPath).
Fatal("While opening store")
}
it := curDb.NewIterator(ropt)
it.SeekToFirst()
if !it.Valid() {
itVec = append(itVec, it)
glog.WithField("path", mergePath+"/"+f.Name()).Info("Store empty")
storeIters = append(storeIters, it)
glog.WithField("path", mPath).Info("Store empty")
continue
}
pq[i] = &Item{
......@@ -120,12 +125,22 @@ func MergeFolders(mergePath, destPath string) {
value: it.Value(),
storeIdx: i,
}
itVec = append(itVec, it)
storeIters = append(storeIters, it)
}
heap.Init(&pq)
mergeUsingHeap(destPath, storeIters)
}
func mergeUsingHeap(destPath string, storeIters []*rocksdb.Iterator) {
var opt *rocksdb.Options
var wopt *rocksdb.WriteOptions
opt = rocksdb.NewOptions()
opt.SetCreateIfMissing(true)
wopt = rocksdb.NewWriteOptions()
wopt.SetSync(true)
var db *rocksdb.DB
db, err = rocksdb.Open(destPath, opt)
db, err := rocksdb.Open(destPath, opt)
defer db.Close()
if err != nil {
glog.WithField("filepath", destPath).
......@@ -137,7 +152,7 @@ func MergeFolders(mergePath, destPath string) {
top := heap.Pop(&pq).(*Item)
if bytes.Compare(top.key, lastKey) == 0 {
if !compareValue(top.value, lastValue) {
if !equalValue(top.value, lastValue) {
glog.WithField("key", lastKey).
Fatal("different value for same key")
}
......@@ -146,13 +161,13 @@ func MergeFolders(mergePath, destPath string) {
lastKey = top.key
lastValue = top.value
itVec[top.storeIdx].Next()
if !itVec[top.storeIdx].Valid() {
storeIters[top.storeIdx].Next()
if !storeIters[top.storeIdx].Valid() {
continue
}
item := &Item{
key: itVec[top.storeIdx].Key(),
value: itVec[top.storeIdx].Value(),
key: storeIters[top.storeIdx].Key(),
value: storeIters[top.storeIdx].Value(),
storeIdx: top.storeIdx,
}
heap.Push(&pq, item)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment