Skip to content
Snippets Groups Projects
rocksmerge.go 3.19 KiB
/*
 * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package main

import (
	"bytes"
	"container/heap"
	"flag"
	"io/ioutil"

	"github.com/dgraph-io/dgraph/store/rocksdb"
	"github.com/dgraph-io/dgraph/x"
)

type Item struct {
	key, value []byte
	storeIdx   int
}

type PriorityQueue []*Item

var stores = flag.String("stores", "",
	"Root directory containing rocksDB directories")
var destinationDB = flag.String("dest", "",
	"Folder to store merged rocksDB")

var glog = x.Log("rocksmerge")

func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
}
func (pq PriorityQueue) Less(i, j int) bool {
	return bytes.Compare(pq[i].key, pq[j].key) <= 0
}
func (pq *PriorityQueue) Push(y interface{}) {
	*pq = append(*pq, y.(*Item))
}
func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	*pq = old[0 : n-1]
	return item
}

func main() {
	flag.Parse()
	if len(*stores) == 0 {
		glog.Fatal("No Directory specified")
	}
	files, err := ioutil.ReadDir(*stores)
	if err != nil {
		glog.Fatal("Cannot open stores directory")
	}

	var opt *rocksdb.Options
	var ropt *rocksdb.ReadOptions
	var wopt *rocksdb.WriteOptions
	opt = rocksdb.NewOptions()
	opt.SetCreateIfMissing(true)
	ropt = rocksdb.NewReadOptions()
	wopt = rocksdb.NewWriteOptions()
	wopt.SetSync(true)

	count := 0
	for range files {
		count++
	}

	pq := make(PriorityQueue, count)
	var itVec []*rocksdb.Iterator
	for i, f := range files {
		curDb, err := rocksdb.Open(*stores+f.Name(), opt)
		defer curDb.Close()
		if err != nil {
			glog.WithField("filepath", *stores+f.Name()).
				Fatal("While opening store")
		}
		it := curDb.NewIterator(ropt)
		it.SeekToFirst()
		if !it.Valid() {
			continue
		}
		pq[i] = &Item{
			key:      it.Key(),
			value:    it.Value(),
			storeIdx: i,
		}
		itVec = append(itVec, it)
	}
	heap.Init(&pq)

	var db *rocksdb.DB
	db, err = rocksdb.Open(*destinationDB, opt)
	defer db.Close()
	if err != nil {
		glog.WithField("filepath", *destinationDB).
			Fatal("While opening store")
	}

	var lastKey, lastValue []byte
	for pq.Len() > 0 {
		top := heap.Pop(&pq).(*Item)

		if bytes.Compare(top.key, lastKey) == 0 {
			if bytes.Compare(top.value, lastValue) != 0 {
				// TODO::value comparison considering timestamps
				glog.Fatalf("different value for same key %s", lastKey)
			}
		} else {
			db.Put(wopt, top.key, top.value)
			lastKey = top.key
			lastValue = top.value
		}

		itVec[top.storeIdx].Next()
		if !itVec[top.storeIdx].Valid() {
			continue
		}
		item := &Item{
			key:      itVec[top.storeIdx].Key(),
			value:    itVec[top.storeIdx].Value(),
			storeIdx: top.storeIdx,
		}
		heap.Push(&pq, item)
	}
}