Skip to content
Snippets Groups Projects
Commit ba584b68 authored by Ashwin's avatar Ashwin
Browse files

change in logic for xid equality

parent ffcb50d3
No related branches found
No related tags found
No related merge requests found
File added
...@@ -78,7 +78,7 @@ func equalValue(a, b interface{}) bool { ...@@ -78,7 +78,7 @@ func equalValue(a, b interface{}) bool {
aUid := x.Uid() aUid := x.Uid()
bUid := y.Uid() bUid := y.Uid()
return aUid == bUid || return (x.ValueBytes() == nil && y.ValueBytes() == nil && (aUid == bUid)) ||
((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) && ((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) &&
bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0) bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0)
} }
...@@ -98,12 +98,8 @@ func MergeFolders(mergePath, destPath string) { ...@@ -98,12 +98,8 @@ func MergeFolders(mergePath, destPath string) {
wopt = rocksdb.NewWriteOptions() wopt = rocksdb.NewWriteOptions()
wopt.SetSync(true) wopt.SetSync(true)
count := 0 pq = make(PriorityQueue, 0)
for range dirList { heap.Init(&pq)
count++
}
pq = make(PriorityQueue, count)
var storeIters []*rocksdb.Iterator var storeIters []*rocksdb.Iterator
for i, dir := range dirList { for i, dir := range dirList {
mPath := path.Join(mergePath, dir.Name()) mPath := path.Join(mergePath, dir.Name())
...@@ -120,33 +116,24 @@ func MergeFolders(mergePath, destPath string) { ...@@ -120,33 +116,24 @@ func MergeFolders(mergePath, destPath string) {
glog.WithField("path", mPath).Info("Store empty") glog.WithField("path", mPath).Info("Store empty")
continue continue
} }
pq[i] = &Item{ item := &Item{
key: it.Key(), key: it.Key(),
value: it.Value(), value: it.Value(),
storeIdx: i, storeIdx: i,
} }
heap.Push(&pq, item)
storeIters = append(storeIters, it) storeIters = append(storeIters, it)
} }
heap.Init(&pq)
mergeUsingHeap(destPath, storeIters)
}
func mergeUsingHeap(destPath string, storeIters []*rocksdb.Iterator) {
var opt *rocksdb.Options
var wopt *rocksdb.WriteOptions
opt = rocksdb.NewOptions()
opt.SetCreateIfMissing(true)
wopt = rocksdb.NewWriteOptions()
wopt.SetSync(true)
var db *rocksdb.DB var db *rocksdb.DB
db, err := rocksdb.Open(destPath, opt) db, err = rocksdb.Open(destPath, opt)
defer db.Close() defer db.Close()
if err != nil { if err != nil {
glog.WithField("filepath", destPath). glog.WithField("filepath", destPath).
Fatal("While opening store") Fatal("While opening store")
} }
heap.Init(&pq)
var lastKey, lastValue []byte var lastKey, lastValue []byte
for pq.Len() > 0 { for pq.Len() > 0 {
top := heap.Pop(&pq).(*Item) top := heap.Pop(&pq).(*Item)
......
...@@ -11,12 +11,11 @@ import ( ...@@ -11,12 +11,11 @@ import (
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/store/rocksdb" "github.com/dgraph-io/dgraph/store/rocksdb"
"github.com/dgraph-io/dgraph/tools/merge"
"github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/uid"
"github.com/dgryski/go-farm" "github.com/dgryski/go-farm"
) )
func TestQuery(t *testing.T) { func TestMergeFolders(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
rootDir, err := ioutil.TempDir("", "storetest_") rootDir, err := ioutil.TempDir("", "storetest_")
if err != nil { if err != nil {
...@@ -82,7 +81,7 @@ func TestQuery(t *testing.T) { ...@@ -82,7 +81,7 @@ func TestQuery(t *testing.T) {
ps1.Close() ps1.Close()
ps2.Close() ps2.Close()
main.MergeFolders(rootDir, destDir) MergeFolders(rootDir, destDir)
var opt *rocksdb.Options var opt *rocksdb.Options
var ropt *rocksdb.ReadOptions var ropt *rocksdb.ReadOptions
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment