Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"bytes"
"container/heap"
"flag"
"io/ioutil"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store/rocksdb"
"github.com/dgraph-io/dgraph/x"
)
type Item struct {
key, value []byte
storeIdx int // index of the store among the K stores
var stores = flag.String("stores", "",
"Root directory containing rocksDB directories")
var destinationDB = flag.String("dest", "",
"Folder to store merged rocksDB")
var glog = x.Log("rocksmerge")
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq PriorityQueue) Less(i, j int) bool {
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func compareValue(a, b interface{}) bool {
var x, y types.Posting
p1 := a.(*posting.List)
if ok := p1.Get(&x, 0); !ok {
glog.Fatal("While retrieving entry from posting list")
}
p2 := b.(*posting.List)
if ok := p2.Get(&y, 0); !ok {
glog.Fatal("While retrieving entry from posting list")
}
return x.Uid() == y.Uid()
}
func main() {
flag.Parse()
if len(*stores) == 0 {
glog.Fatal("No Directory specified")
}
files, err := ioutil.ReadDir(*stores)
if err != nil {
glog.Fatal("Cannot open stores directory")
}
var opt *rocksdb.Options
var ropt *rocksdb.ReadOptions
var wopt *rocksdb.WriteOptions
opt = rocksdb.NewOptions()
opt.SetCreateIfMissing(true)
ropt = rocksdb.NewReadOptions()
wopt = rocksdb.NewWriteOptions()
wopt.SetSync(true)
count := 0
for range files {
count++
}
pq := make(PriorityQueue, count)
var itVec []*rocksdb.Iterator
for i, f := range files {
if err != nil {
glog.WithField("filepath", *stores+f.Name()).
Fatal("While opening store")
}
it := curDb.NewIterator(ropt)
it.SeekToFirst()
if !it.Valid() {
itVec = append(itVec, it)
glog.Infof("Store empty() %v", *stores+f.Name())
key: it.Key(),
value: it.Value(),
storeIdx: i,
}
itVec = append(itVec, it)
}
heap.Init(&pq)
if err != nil {
glog.WithField("filepath", *destinationDB).
Fatal("While opening store")
}
for pq.Len() > 0 {
top := heap.Pop(&pq).(*Item)
if bytes.Compare(top.key, lastKey) == 0 {
if compareValue(top.value, lastValue) == false {
glog.Fatalf("different value for same key %s", lastKey)
}
} else {
db.Put(wopt, top.key, top.value)
lastKey = top.key
lastValue = top.value
}
itVec[top.storeIdx].Next()
if !itVec[top.storeIdx].Valid() {
key: itVec[top.storeIdx].Key(),
value: itVec[top.storeIdx].Value(),
storeIdx: top.storeIdx,