Skip to content
Snippets Groups Projects
Commit d15c1f56 authored by Ashwin's avatar Ashwin
Browse files

removed index field in heap

parent dfae5ca1
No related branches found
No related tags found
No related merge requests found
File deleted
......@@ -29,14 +29,14 @@ import (
type Item struct {
key, value []byte
itIdx int
index int
storeIdx int
}
type PriorityQueue []*Item
var stores = flag.String("stores", "", "Folder containing rocksDB directories")
var destinationDB = flag.String("destination", "",
var stores = flag.String("stores", "",
"Root directory containing rocksDB directories")
var destinationDB = flag.String("dest", "",
"Folder to store merged rocksDB")
var glog = x.Log("rocksmerge")
......@@ -44,23 +44,17 @@ var glog = x.Log("rocksmerge")
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq PriorityQueue) Less(i, j int) bool {
return (bytes.Compare(pq[i].key, pq[j].key) <= 0)
return bytes.Compare(pq[i].key, pq[j].key) <= 0
}
func (pq *PriorityQueue) Push(y interface{}) {
n := len(*pq)
item := y.(*Item)
item.index = n
*pq = append(*pq, item)
*pq = append(*pq, y.(*Item))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
......@@ -80,25 +74,20 @@ func main() {
var wopt *rocksdb.WriteOptions
opt = rocksdb.NewOptions()
opt.SetCreateIfMissing(true)
fp := rocksdb.NewBloomFilter(16)
opt.SetFilterPolicy(fp)
ropt = rocksdb.NewReadOptions()
wopt = rocksdb.NewWriteOptions()
wopt.SetSync(true)
var itVec []*rocksdb.Iterator
var db *rocksdb.DB
var lastKey, lastValue []byte
count := 0
for range files {
count++
}
pq := make(PriorityQueue, count)
i := 0
for _, f := range files {
pq := make(PriorityQueue, count)
var itVec []*rocksdb.Iterator
for i, f := range files {
curDb, err := rocksdb.Open(*stores+f.Name(), opt)
defer curDb.Close()
if err != nil {
glog.WithField("filepath", *stores+f.Name()).
Fatal("While opening store")
......@@ -109,27 +98,29 @@ func main() {
continue
}
pq[i] = &Item{
key: it.Key(),
value: it.Value(),
itIdx: i,
index: i,
key: it.Key(),
value: it.Value(),
storeIdx: i,
}
i++
itVec = append(itVec, it)
}
heap.Init(&pq)
var db *rocksdb.DB
db, err = rocksdb.Open(*destinationDB, opt)
defer db.Close()
if err != nil {
glog.WithField("filepath", *destinationDB).
Fatal("While opening store")
}
var lastKey, lastValue []byte
for pq.Len() > 0 {
top := heap.Pop(&pq).(*Item)
if bytes.Compare(top.key, lastKey) == 0 {
if bytes.Compare(top.value, lastValue) != 0 {
// TODO::value comparison considering timestamps
glog.Fatalf("different value for same key %s", lastKey)
}
} else {
......@@ -138,16 +129,15 @@ func main() {
lastValue = top.value
}
itVec[top.itIdx].Next()
if !itVec[top.itIdx].Valid() {
itVec[top.storeIdx].Next()
if !itVec[top.storeIdx].Valid() {
continue
}
item := &Item{
key: itVec[top.itIdx].Key(),
value: itVec[top.itIdx].Value(),
itIdx: top.itIdx,
key: itVec[top.storeIdx].Key(),
value: itVec[top.storeIdx].Value(),
storeIdx: top.storeIdx,
}
heap.Push(&pq, item)
}
db.Close()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment