Skip to content
Snippets Groups Projects
Commit 36cf856b authored by Ashwin's avatar Ashwin
Browse files

RocksDB merge test and shifted merge code to a function

parent 2bc103dc
No related branches found
No related tags found
No related merge requests found
File moved
File moved
......@@ -75,12 +75,8 @@ func compareValue(a, b interface{}) bool {
return x.Uid() == y.Uid()
}
func main() {
flag.Parse()
if len(*stores) == 0 {
glog.Fatal("No Directory specified")
}
files, err := ioutil.ReadDir(*stores)
func MergeFolders(mergePath, destPath string) {
files, err := ioutil.ReadDir(mergePath)
if err != nil {
glog.Fatal("Cannot open stores directory")
}
......@@ -99,20 +95,24 @@ func main() {
count++
}
if mergePath[len(mergePath)-1] == '/' {
mergePath = mergePath[:len(mergePath)-1]
}
pq := make(PriorityQueue, count)
var itVec []*rocksdb.Iterator
for i, f := range files {
curDb, err := rocksdb.Open(*stores+f.Name(), opt)
curDb, err := rocksdb.Open(mergePath+"/"+f.Name(), opt)
defer curDb.Close()
if err != nil {
glog.WithField("filepath", *stores+f.Name()).
glog.WithField("filepath", mergePath+"/"+f.Name()).
Fatal("While opening store")
}
it := curDb.NewIterator(ropt)
it.SeekToFirst()
if !it.Valid() {
itVec = append(itVec, it)
glog.Infof("Store empty() %v", *stores+f.Name())
glog.WithField("path", mergePath+"/"+f.Name()).Info("Store empty")
continue
}
pq[i] = &Item{
......@@ -125,10 +125,10 @@ func main() {
heap.Init(&pq)
var db *rocksdb.DB
db, err = rocksdb.Open(*destinationDB, opt)
db, err = rocksdb.Open(destPath, opt)
defer db.Close()
if err != nil {
glog.WithField("filepath", *destinationDB).
glog.WithField("filepath", destPath).
Fatal("While opening store")
}
......@@ -137,14 +137,14 @@ func main() {
top := heap.Pop(&pq).(*Item)
if bytes.Compare(top.key, lastKey) == 0 {
if compareValue(top.value, lastValue) == false {
glog.Fatalf("different value for same key %s", lastKey)
if !compareValue(top.value, lastValue) {
glog.WithField("key", lastKey).
Fatal("different value for same key")
}
} else {
db.Put(wopt, top.key, top.value)
lastKey = top.key
lastValue = top.value
}
db.Put(wopt, top.key, top.value)
lastKey = top.key
lastValue = top.value
itVec[top.storeIdx].Next()
if !itVec[top.storeIdx].Valid() {
......@@ -158,3 +158,12 @@ func main() {
heap.Push(&pq, item)
}
}
func main() {
flag.Parse()
if len(*stores) == 0 {
glog.Fatal("No Directory specified")
}
MergeFolders(*stores, *destinationDB)
}
package main
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/store/rocksdb"
"github.com/dgraph-io/dgraph/tools/merge"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgryski/go-farm"
)
func TestQuery(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
rootDir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(rootDir)
dir1, err := ioutil.TempDir(rootDir, "dir_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir1)
dir2, err := ioutil.TempDir(rootDir, "dir_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir2)
destDir, err := ioutil.TempDir("", "dest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(destDir)
ps1 := new(store.Store)
ps1.Init(dir1)
ps2 := new(store.Store)
ps2.Init(dir2)
list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph",
"ash", "alice"}
var numInstances uint64 = 2
posting.Init(nil)
uid.Init(ps1)
loader.Init(nil, ps1)
for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 0 {
_, err := uid.GetOrAssign(str, 0, numInstances)
if err != nil {
fmt.Errorf("error while assigning uid")
}
}
}
uid.Init(ps2)
loader.Init(nil, ps2)
for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 1 {
uid.Init(ps2)
_, err := uid.GetOrAssign(str, 1, numInstances)
if err != nil {
fmt.Errorf("error while assigning uid")
}
}
}
posting.MergeLists(100)
ps1.Close()
ps2.Close()
main.MergeFolders(rootDir, destDir)
var opt *rocksdb.Options
var ropt *rocksdb.ReadOptions
opt = rocksdb.NewOptions()
ropt = rocksdb.NewReadOptions()
db, err := rocksdb.Open(destDir, opt)
it := db.NewIterator(ropt)
count := 0
for it.SeekToFirst(); it.Valid(); it.Next() {
count++
}
if count != 6 { // There are totally 6 unique strings
fmt.Errorf("Not all the items have been assigned uid")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment