Skip to content
Snippets Groups Projects
Commit e7343568 authored by Ashwin Ramesh's avatar Ashwin Ramesh
Browse files

Merge pull request #45 from dgraph-io/rocksGo

RocksDB merge in go
parents 9982c02e 4269f674
No related branches found
No related tags found
No related merge requests found
/main
/a.out /a.out
/merge /merge
/rocksmerge
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"bytes"
"container/heap"
"flag"
"io/ioutil"
"math"
"path"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store/rocksdb"
"github.com/dgraph-io/dgraph/x"
)
type Item struct {
key, value []byte
storeIdx int // index of the store among the K stores
}
type PriorityQueue []*Item
var stores = flag.String("stores", "",
"Root directory containing rocksDB directories")
var destinationDB = flag.String("dest", "",
"Folder to store merged rocksDB")
var glog = x.Log("rocksmerge")
var pq PriorityQueue
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq PriorityQueue) Less(i, j int) bool {
return bytes.Compare(pq[i].key, pq[j].key) <= 0
}
func (pq *PriorityQueue) Push(y interface{}) {
*pq = append(*pq, y.(*Item))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func equalValue(a, b interface{}) bool {
var x, y types.Posting
p1 := a.(*posting.List)
if ok := p1.Get(&x, 0); !ok {
glog.Fatal("While retrieving entry from posting list")
}
p2 := b.(*posting.List)
if ok := p2.Get(&y, 0); !ok {
glog.Fatal("While retrieving entry from posting list")
}
aUid := x.Uid()
bUid := y.Uid()
return (x.ValueBytes() == nil && y.ValueBytes() == nil && (aUid == bUid)) ||
((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) &&
bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0)
}
func mergeFolders(mergePath, destPath string) {
dirList, err := ioutil.ReadDir(mergePath)
if err != nil {
glog.Fatal("Cannot open stores directory")
}
var opt *rocksdb.Options
var ropt *rocksdb.ReadOptions
var wopt *rocksdb.WriteOptions
opt = rocksdb.NewOptions()
opt.SetCreateIfMissing(true)
ropt = rocksdb.NewReadOptions()
wopt = rocksdb.NewWriteOptions()
wopt.SetSync(true)
pq = make(PriorityQueue, 0)
heap.Init(&pq)
var storeIters []*rocksdb.Iterator
for i, dir := range dirList {
mPath := path.Join(mergePath, dir.Name())
curDb, err := rocksdb.Open(mPath, opt)
defer curDb.Close()
if err != nil {
glog.WithField("filepath", mPath).
Fatal("While opening store")
}
it := curDb.NewIterator(ropt)
it.SeekToFirst()
if !it.Valid() {
storeIters = append(storeIters, it)
glog.WithField("path", mPath).Info("Store empty")
continue
}
item := &Item{
key: it.Key(),
value: it.Value(),
storeIdx: i,
}
heap.Push(&pq, item)
storeIters = append(storeIters, it)
}
var db *rocksdb.DB
db, err = rocksdb.Open(destPath, opt)
defer db.Close()
if err != nil {
glog.WithField("filepath", destPath).
Fatal("While opening store")
}
var lastKey, lastValue []byte
for pq.Len() > 0 {
top := heap.Pop(&pq).(*Item)
if bytes.Compare(top.key, lastKey) == 0 {
if !equalValue(top.value, lastValue) {
glog.WithField("key", lastKey).
Fatal("different value for same key")
}
}
db.Put(wopt, top.key, top.value)
lastKey = top.key
lastValue = top.value
storeIters[top.storeIdx].Next()
if !storeIters[top.storeIdx].Valid() {
continue
}
item := &Item{
key: storeIters[top.storeIdx].Key(),
value: storeIters[top.storeIdx].Value(),
storeIdx: top.storeIdx,
}
heap.Push(&pq, item)
}
}
func main() {
flag.Parse()
if len(*stores) == 0 {
glog.Fatal("No Directory specified")
}
mergeFolders(*stores, *destinationDB)
}
package main
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/store/rocksdb"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgryski/go-farm"
)
func TestMergeFolders(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
rootDir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(rootDir)
dir1, err := ioutil.TempDir(rootDir, "dir_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir1)
dir2, err := ioutil.TempDir(rootDir, "dir_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir2)
destDir, err := ioutil.TempDir("", "dest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(destDir)
ps1 := new(store.Store)
ps1.Init(dir1)
ps2 := new(store.Store)
ps2.Init(dir2)
list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph",
"ash", "alice"}
var numInstances uint64 = 2
posting.Init(nil)
uid.Init(ps1)
loader.Init(nil, ps1)
for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 0 {
_, err := uid.GetOrAssign(str, 0, numInstances)
if err != nil {
fmt.Errorf("error while assigning uid")
}
}
}
uid.Init(ps2)
loader.Init(nil, ps2)
for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 1 {
uid.Init(ps2)
_, err := uid.GetOrAssign(str, 1, numInstances)
if err != nil {
fmt.Errorf("error while assigning uid")
}
}
}
posting.MergeLists(100)
ps1.Close()
ps2.Close()
mergeFolders(rootDir, destDir)
var opt *rocksdb.Options
var ropt *rocksdb.ReadOptions
opt = rocksdb.NewOptions()
ropt = rocksdb.NewReadOptions()
db, err := rocksdb.Open(destDir, opt)
it := db.NewIterator(ropt)
count := 0
for it.SeekToFirst(); it.Valid(); it.Next() {
count++
}
if count != 6 { // There are totally 6 unique strings
fmt.Errorf("Not all the items have been assigned uid")
}
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* compile : g++ merge.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs
* usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder>
*
* find the rocksdb headers here https://github.com/facebook/rocksdb/tree/master/include/rocksdb
*
*/
#include <fstream>
#include <cstdio>
#include <iostream>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include <experimental/filesystem>
using namespace rocksdb;
namespace fs = std::experimental::filesystem;
int main(int argc, char* argv[]) {
if(argc != 3) {
std::cerr << "Wrong number of arguments\nusage : ./<executable>\
<folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n";
exit(0);
}
std::string destinationDB = argv[2], mergeDir = argv[1];
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
// open DB
Status s = DB::Open(options, destinationDB, &db);
assert(s.ok());
for (auto& dirEntry : fs::directory_iterator(mergeDir)) {
std::cout << dirEntry << std::endl;
DB* cur_db;
Options options;
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// Don't create the DB if it's not already present
options.create_if_missing = false;
// open DB
Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db);
assert(s1.ok());
rocksdb::Iterator* it = cur_db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
Slice key_s = it->key();
Slice val_s = it->value();
std::string val_t;
Status s = db->Get(ReadOptions(), key_s, &val_t);
if(s.ok()) {
assert(val_t == val_s.ToString() && "Same key has different value");
} else {
s = db->Put(WriteOptions(), key_s, val_s);
assert(s.ok());
}
}
assert(it->status().ok()); // Check for any errors found during the scan
delete it;
delete cur_db;
}
delete db;
return 0;
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* compile : g++ merge_heap.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs
* usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder>
*
* rocksdb headers can be found here https://github.com/facebook/rocksdb/tree/master/include/rocksdb
*/
#include <fstream>
#include <cstdio>
#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include <experimental/filesystem>
using namespace rocksdb;
namespace fs = std::experimental::filesystem;
class node {
public:
Slice key;
Slice value;
int idx;
node(Slice k, Slice v, int id) {
key = k;
value = v;
idx = id;
}
};
class compare {
public:
bool operator()(node &a, node &b) {
return a.key.compare(b.key) <= 0;
}
};
int main(int argc, char* argv[]) {
if(argc != 3) {
std::cerr << "Wrong number of arguments\nusage : ./<executable>\
<folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n";
exit(0);
}
int counter = 0;
std::priority_queue<struct node, std::vector<node>, compare> pq;
std::vector<rocksdb::Iterator*> itVec;
std::string destinationDB = argv[2], mergeDir = argv[1];
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
// open DB
Status s = DB::Open(options, destinationDB, &db);
assert(s.ok());
for (auto& dirEntry : fs::directory_iterator(mergeDir)) {
std::cout << dirEntry << std::endl;
DB* cur_db;
Options options;
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// Don't create the DB if it's not already present
options.create_if_missing = false;
// open DB
Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db);
assert(s1.ok());
rocksdb::Iterator *it = cur_db->NewIterator(rocksdb::ReadOptions());
it->SeekToFirst();
if(!it->Valid()) {
continue;
}
struct node tnode(it->key(), it->value(), counter++);
itVec.push_back(it);
pq.push(tnode);
}
Slice lastKey, lastValue;
while(!pq.empty()) {
const struct node &top = pq.top();
pq.pop();
if(top.key == lastKey) {
assert(top.value == lastValue);
} else {
s = db->Put(WriteOptions(), top.key, top.value);
assert(s.ok());
lastKey = top.key;
lastValue = top.value;
}
itVec[top.idx]->Next();
if(!itVec[top.idx]->Valid()) {
continue;
}
struct node tnode(itVec[top.idx]->key(), itVec[top.idx]->value(), top.idx);
pq.push(tnode);
}
delete db;
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment