Skip to content
Snippets Groups Projects
rocksmerge.go 4.27 KiB
Newer Older
  • Learn to ignore specific revisions
  • Ashwin's avatar
    Ashwin committed
    /*
     * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     *
     */
    
    package main
    
    import (
    	"bytes"
    	"container/heap"
    	"flag"
    	"io/ioutil"
    
    Ashwin's avatar
    Ashwin committed
    
    
    	"github.com/dgraph-io/dgraph/posting"
    	"github.com/dgraph-io/dgraph/posting/types"
    
    Ashwin's avatar
    Ashwin committed
    	"github.com/dgraph-io/dgraph/store/rocksdb"
    	"github.com/dgraph-io/dgraph/x"
    )
    
    type Item struct {
    	key, value []byte
    
    	storeIdx   int // index of the store among the K stores
    
    Ashwin's avatar
    Ashwin committed
    }
    
    type PriorityQueue []*Item
    
    
    Ashwin's avatar
    Ashwin committed
    var stores = flag.String("stores", "",
    	"Root directory containing rocksDB directories")
    var destinationDB = flag.String("dest", "",
    
    Ashwin's avatar
    Ashwin committed
    	"Folder to store merged rocksDB")
    
    var glog = x.Log("rocksmerge")
    
    var pq PriorityQueue
    
    Ashwin's avatar
    Ashwin committed
    
    func (pq PriorityQueue) Len() int { return len(pq) }
    func (pq PriorityQueue) Swap(i, j int) {
    	pq[i], pq[j] = pq[j], pq[i]
    }
    func (pq PriorityQueue) Less(i, j int) bool {
    
    Ashwin's avatar
    Ashwin committed
    	return bytes.Compare(pq[i].key, pq[j].key) <= 0
    
    Ashwin's avatar
    Ashwin committed
    }
    func (pq *PriorityQueue) Push(y interface{}) {
    
    Ashwin's avatar
    Ashwin committed
    	*pq = append(*pq, y.(*Item))
    
    Ashwin's avatar
    Ashwin committed
    }
    func (pq *PriorityQueue) Pop() interface{} {
    	old := *pq
    	n := len(old)
    	item := old[n-1]
    	*pq = old[0 : n-1]
    	return item
    }
    
    
    func equalValue(a, b interface{}) bool {
    
    	var x, y types.Posting
    	p1 := a.(*posting.List)
    	if ok := p1.Get(&x, 0); !ok {
    		glog.Fatal("While retrieving entry from posting list")
    	}
    	p2 := b.(*posting.List)
    	if ok := p2.Get(&y, 0); !ok {
    		glog.Fatal("While retrieving entry from posting list")
    	}
    
    
    	aUid := x.Uid()
    	bUid := y.Uid()
    
    	return aUid == bUid ||
    		((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) &&
    			bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0)
    
    func MergeFolders(mergePath, destPath string) {
    
    	dirList, err := ioutil.ReadDir(mergePath)
    
    Ashwin's avatar
    Ashwin committed
    	if err != nil {
    		glog.Fatal("Cannot open stores directory")
    	}
    
    	var opt *rocksdb.Options
    	var ropt *rocksdb.ReadOptions
    	var wopt *rocksdb.WriteOptions
    	opt = rocksdb.NewOptions()
    	opt.SetCreateIfMissing(true)
    	ropt = rocksdb.NewReadOptions()
    	wopt = rocksdb.NewWriteOptions()
    	wopt.SetSync(true)
    
    	count := 0
    
    	for range dirList {
    
    Ashwin's avatar
    Ashwin committed
    		count++
    	}
    
    
    	pq = make(PriorityQueue, count)
    	var storeIters []*rocksdb.Iterator
    	for i, dir := range dirList {
    		mPath := path.Join(mergePath, dir.Name())
    		curDb, err := rocksdb.Open(mPath, opt)
    
    Ashwin's avatar
    Ashwin committed
    		defer curDb.Close()
    
    Ashwin's avatar
    Ashwin committed
    		if err != nil {
    
    			glog.WithField("filepath", mPath).
    
    Ashwin's avatar
    Ashwin committed
    				Fatal("While opening store")
    		}
    		it := curDb.NewIterator(ropt)
    		it.SeekToFirst()
    		if !it.Valid() {
    
    			storeIters = append(storeIters, it)
    			glog.WithField("path", mPath).Info("Store empty")
    
    Ashwin's avatar
    Ashwin committed
    			continue
    		}
    		pq[i] = &Item{
    
    Ashwin's avatar
    Ashwin committed
    			key:      it.Key(),
    			value:    it.Value(),
    			storeIdx: i,
    
    Ashwin's avatar
    Ashwin committed
    		}
    
    		storeIters = append(storeIters, it)
    
    Ashwin's avatar
    Ashwin committed
    	}
    	heap.Init(&pq)
    
    	mergeUsingHeap(destPath, storeIters)
    }
    
    func mergeUsingHeap(destPath string, storeIters []*rocksdb.Iterator) {
    	var opt *rocksdb.Options
    	var wopt *rocksdb.WriteOptions
    	opt = rocksdb.NewOptions()
    	opt.SetCreateIfMissing(true)
    	wopt = rocksdb.NewWriteOptions()
    	wopt.SetSync(true)
    
    Ashwin's avatar
    Ashwin committed
    
    
    Ashwin's avatar
    Ashwin committed
    	var db *rocksdb.DB
    
    	db, err := rocksdb.Open(destPath, opt)
    
    Ashwin's avatar
    Ashwin committed
    	defer db.Close()
    
    Ashwin's avatar
    Ashwin committed
    	if err != nil {
    
    		glog.WithField("filepath", destPath).
    
    Ashwin's avatar
    Ashwin committed
    			Fatal("While opening store")
    	}
    
    
    Ashwin's avatar
    Ashwin committed
    	var lastKey, lastValue []byte
    
    Ashwin's avatar
    Ashwin committed
    	for pq.Len() > 0 {
    		top := heap.Pop(&pq).(*Item)
    
    		if bytes.Compare(top.key, lastKey) == 0 {
    
    			if !equalValue(top.value, lastValue) {
    
    				glog.WithField("key", lastKey).
    					Fatal("different value for same key")
    
    		db.Put(wopt, top.key, top.value)
    		lastKey = top.key
    		lastValue = top.value
    
    Ashwin's avatar
    Ashwin committed
    
    
    		storeIters[top.storeIdx].Next()
    		if !storeIters[top.storeIdx].Valid() {
    
    Ashwin's avatar
    Ashwin committed
    			continue
    		}
    		item := &Item{
    
    			key:      storeIters[top.storeIdx].Key(),
    			value:    storeIters[top.storeIdx].Value(),
    
    Ashwin's avatar
    Ashwin committed
    			storeIdx: top.storeIdx,
    
    Ashwin's avatar
    Ashwin committed
    		}
    		heap.Push(&pq, item)
    	}
    }
    
    
    func main() {
    	flag.Parse()
    	if len(*stores) == 0 {
    		glog.Fatal("No Directory specified")
    	}
    
    	MergeFolders(*stores, *destinationDB)
    }