diff --git a/tools/merge/rocksmerge b/tools/merge/rocksmerge deleted file mode 100755 index b7a7ff4ee54aebf7c6bddd769a0cb375f8f2a0cc..0000000000000000000000000000000000000000 Binary files a/tools/merge/rocksmerge and /dev/null differ diff --git a/tools/merge/rocksmerge.go b/tools/merge/rocksmerge.go index 96c0a218254969b0d924ac35b6a8700e1943716f..e900cf76e355fd70e124064e3deba44fa78fe5a6 100644 --- a/tools/merge/rocksmerge.go +++ b/tools/merge/rocksmerge.go @@ -29,14 +29,14 @@ import ( type Item struct { key, value []byte - itIdx int - index int + storeIdx int } type PriorityQueue []*Item -var stores = flag.String("stores", "", "Folder containing rocksDB directories") -var destinationDB = flag.String("destination", "", +var stores = flag.String("stores", "", + "Root directory containing rocksDB directories") +var destinationDB = flag.String("dest", "", "Folder to store merged rocksDB") var glog = x.Log("rocksmerge") @@ -44,23 +44,17 @@ var glog = x.Log("rocksmerge") func (pq PriorityQueue) Len() int { return len(pq) } func (pq PriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j } func (pq PriorityQueue) Less(i, j int) bool { - return (bytes.Compare(pq[i].key, pq[j].key) <= 0) + return bytes.Compare(pq[i].key, pq[j].key) <= 0 } func (pq *PriorityQueue) Push(y interface{}) { - n := len(*pq) - item := y.(*Item) - item.index = n - *pq = append(*pq, item) + *pq = append(*pq, y.(*Item)) } func (pq *PriorityQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] - item.index = -1 // for safety *pq = old[0 : n-1] return item } @@ -80,25 +74,20 @@ func main() { var wopt *rocksdb.WriteOptions opt = rocksdb.NewOptions() opt.SetCreateIfMissing(true) - fp := rocksdb.NewBloomFilter(16) - opt.SetFilterPolicy(fp) - ropt = rocksdb.NewReadOptions() wopt = rocksdb.NewWriteOptions() wopt.SetSync(true) - var itVec []*rocksdb.Iterator - var db *rocksdb.DB - var lastKey, lastValue []byte count := 0 for range files { count++ } - pq := make(PriorityQueue, count) - i := 0 - for _, f := range files { + pq := make(PriorityQueue, count) + var itVec []*rocksdb.Iterator + for i, f := range files { curDb, err := rocksdb.Open(*stores+f.Name(), opt) + defer curDb.Close() if err != nil { glog.WithField("filepath", *stores+f.Name()). Fatal("While opening store") @@ -109,27 +98,29 @@ func main() { continue } pq[i] = &Item{ - key: it.Key(), - value: it.Value(), - itIdx: i, - index: i, + key: it.Key(), + value: it.Value(), + storeIdx: i, } - i++ itVec = append(itVec, it) } heap.Init(&pq) + var db *rocksdb.DB db, err = rocksdb.Open(*destinationDB, opt) + defer db.Close() if err != nil { glog.WithField("filepath", *destinationDB). Fatal("While opening store") } + var lastKey, lastValue []byte for pq.Len() > 0 { top := heap.Pop(&pq).(*Item) if bytes.Compare(top.key, lastKey) == 0 { if bytes.Compare(top.value, lastValue) != 0 { + // TODO::value comparison considering timestamps glog.Fatalf("different value for same key %s", lastKey) } } else { @@ -138,16 +129,15 @@ func main() { lastValue = top.value } - itVec[top.itIdx].Next() - if !itVec[top.itIdx].Valid() { + itVec[top.storeIdx].Next() + if !itVec[top.storeIdx].Valid() { continue } item := &Item{ - key: itVec[top.itIdx].Key(), - value: itVec[top.itIdx].Value(), - itIdx: top.itIdx, + key: itVec[top.storeIdx].Key(), + value: itVec[top.storeIdx].Value(), + storeIdx: top.storeIdx, } heap.Push(&pq, item) } - db.Close() }