diff --git a/tools/merge/merge.cc b/tools/merge/rocks_c++/merge.cc similarity index 100% rename from tools/merge/merge.cc rename to tools/merge/rocks_c++/merge.cc diff --git a/tools/merge/merge_heap.cc b/tools/merge/rocks_c++/merge_heap.cc similarity index 100% rename from tools/merge/merge_heap.cc rename to tools/merge/rocks_c++/merge_heap.cc diff --git a/tools/merge/rocksmerge.go b/tools/merge/rocksmerge.go index 45db281c30227f321ea961181b8fc05b91999875..cee8a37bd00abab5b2a8b77d0539a64d8963441b 100644 --- a/tools/merge/rocksmerge.go +++ b/tools/merge/rocksmerge.go @@ -75,12 +75,8 @@ func compareValue(a, b interface{}) bool { return x.Uid() == y.Uid() } -func main() { - flag.Parse() - if len(*stores) == 0 { - glog.Fatal("No Directory specified") - } - files, err := ioutil.ReadDir(*stores) +func MergeFolders(mergePath, destPath string) { + files, err := ioutil.ReadDir(mergePath) if err != nil { glog.Fatal("Cannot open stores directory") } @@ -99,20 +95,24 @@ func main() { count++ } + if mergePath[len(mergePath)-1] == '/' { + mergePath = mergePath[:len(mergePath)-1] + } + pq := make(PriorityQueue, count) var itVec []*rocksdb.Iterator for i, f := range files { - curDb, err := rocksdb.Open(*stores+f.Name(), opt) + curDb, err := rocksdb.Open(mergePath+"/"+f.Name(), opt) defer curDb.Close() if err != nil { - glog.WithField("filepath", *stores+f.Name()). + glog.WithField("filepath", mergePath+"/"+f.Name()). Fatal("While opening store") } it := curDb.NewIterator(ropt) it.SeekToFirst() if !it.Valid() { itVec = append(itVec, it) - glog.Infof("Store empty() %v", *stores+f.Name()) + glog.WithField("path", mergePath+"/"+f.Name()).Info("Store empty") continue } pq[i] = &Item{ @@ -125,10 +125,10 @@ func main() { heap.Init(&pq) var db *rocksdb.DB - db, err = rocksdb.Open(*destinationDB, opt) + db, err = rocksdb.Open(destPath, opt) defer db.Close() if err != nil { - glog.WithField("filepath", *destinationDB). + glog.WithField("filepath", destPath). Fatal("While opening store") } @@ -137,14 +137,14 @@ func main() { top := heap.Pop(&pq).(*Item) if bytes.Compare(top.key, lastKey) == 0 { - if compareValue(top.value, lastValue) == false { - glog.Fatalf("different value for same key %s", lastKey) + if !compareValue(top.value, lastValue) { + glog.WithField("key", lastKey). + Fatal("different value for same key") } - } else { - db.Put(wopt, top.key, top.value) - lastKey = top.key - lastValue = top.value } + db.Put(wopt, top.key, top.value) + lastKey = top.key + lastValue = top.value itVec[top.storeIdx].Next() if !itVec[top.storeIdx].Valid() { @@ -158,3 +158,12 @@ func main() { heap.Push(&pq, item) } } + +func main() { + flag.Parse() + if len(*stores) == 0 { + glog.Fatal("No Directory specified") + } + + MergeFolders(*stores, *destinationDB) +} diff --git a/tools/merge/rocksmerge_test.go b/tools/merge/rocksmerge_test.go new file mode 100644 index 0000000000000000000000000000000000000000..611c234d3cc469b13e5445e1da77bd542bd3ec31 --- /dev/null +++ b/tools/merge/rocksmerge_test.go @@ -0,0 +1,102 @@ +package main + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/loader" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/store/rocksdb" + "github.com/dgraph-io/dgraph/tools/merge" + "github.com/dgraph-io/dgraph/uid" + "github.com/dgryski/go-farm" +) + +func TestQuery(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + rootDir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(rootDir) + + dir1, err := ioutil.TempDir(rootDir, "dir_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir1) + + dir2, err := ioutil.TempDir(rootDir, "dir_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir2) + + destDir, err := ioutil.TempDir("", "dest_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(destDir) + + ps1 := new(store.Store) + ps1.Init(dir1) + + ps2 := new(store.Store) + ps2.Init(dir2) + + list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph", + "ash", "alice"} + var numInstances uint64 = 2 + posting.Init(nil) + uid.Init(ps1) + loader.Init(nil, ps1) + for _, str := range list { + if farm.Fingerprint64([]byte(str))%numInstances == 0 { + _, err := uid.GetOrAssign(str, 0, numInstances) + if err != nil { + fmt.Errorf("error while assigning uid") + } + } + } + uid.Init(ps2) + loader.Init(nil, ps2) + for _, str := range list { + if farm.Fingerprint64([]byte(str))%numInstances == 1 { + uid.Init(ps2) + _, err := uid.GetOrAssign(str, 1, numInstances) + if err != nil { + fmt.Errorf("error while assigning uid") + } + + } + } + posting.MergeLists(100) + ps1.Close() + ps2.Close() + + main.MergeFolders(rootDir, destDir) + + var opt *rocksdb.Options + var ropt *rocksdb.ReadOptions + opt = rocksdb.NewOptions() + ropt = rocksdb.NewReadOptions() + db, err := rocksdb.Open(destDir, opt) + it := db.NewIterator(ropt) + + count := 0 + for it.SeekToFirst(); it.Valid(); it.Next() { + count++ + } + + if count != 6 { // There are totally 6 unique strings + fmt.Errorf("Not all the items have been assigned uid") + } +}