diff --git a/tools/merge/.gitignore b/tools/merge/.gitignore index 6a027f5d5ab8e459e013ce3c67e6ab65dbbda1fc..960e49ffce06a6a6e92f8b2995ef39544cc1e743 100644 --- a/tools/merge/.gitignore +++ b/tools/merge/.gitignore @@ -1,2 +1,4 @@ +/main /a.out /merge +/rocksmerge diff --git a/tools/merge/main.go b/tools/merge/main.go new file mode 100644 index 0000000000000000000000000000000000000000..8312b2f18a72d292ffa5b8f086853d794bede3ce --- /dev/null +++ b/tools/merge/main.go @@ -0,0 +1,170 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "bytes" + "container/heap" + "flag" + "io/ioutil" + "math" + "path" + + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/posting/types" + "github.com/dgraph-io/dgraph/store/rocksdb" + "github.com/dgraph-io/dgraph/x" +) + +type Item struct { + key, value []byte + storeIdx int // index of the store among the K stores +} + +type PriorityQueue []*Item + +var stores = flag.String("stores", "", + "Root directory containing rocksDB directories") +var destinationDB = flag.String("dest", "", + "Folder to store merged rocksDB") + +var glog = x.Log("rocksmerge") +var pq PriorityQueue + +func (pq PriorityQueue) Len() int { return len(pq) } +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} +func (pq PriorityQueue) Less(i, j int) bool { + return bytes.Compare(pq[i].key, pq[j].key) <= 0 +} +func (pq *PriorityQueue) Push(y interface{}) { + *pq = append(*pq, y.(*Item)) +} +func (pq *PriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + *pq = old[0 : n-1] + return item +} + +func equalValue(a, b interface{}) bool { + var x, y types.Posting + p1 := a.(*posting.List) + if ok := p1.Get(&x, 0); !ok { + glog.Fatal("While retrieving entry from posting list") + } + p2 := b.(*posting.List) + if ok := p2.Get(&y, 0); !ok { + glog.Fatal("While retrieving entry from posting list") + } + + aUid := x.Uid() + bUid := y.Uid() + + return (x.ValueBytes() == nil && y.ValueBytes() == nil && (aUid == bUid)) || + ((x.Uid() == math.MaxUint64 && y.Uid() == math.MaxUint64) && + bytes.Compare(x.ValueBytes(), y.ValueBytes()) == 0) +} + +func mergeFolders(mergePath, destPath string) { + dirList, err := ioutil.ReadDir(mergePath) + if err != nil { + glog.Fatal("Cannot open stores directory") + } + + var opt *rocksdb.Options + var ropt *rocksdb.ReadOptions + var wopt *rocksdb.WriteOptions + opt = rocksdb.NewOptions() + opt.SetCreateIfMissing(true) + ropt = rocksdb.NewReadOptions() + wopt = rocksdb.NewWriteOptions() + wopt.SetSync(true) + + pq = make(PriorityQueue, 0) + heap.Init(&pq) + var storeIters []*rocksdb.Iterator + for i, dir := range dirList { + mPath := path.Join(mergePath, dir.Name()) + curDb, err := rocksdb.Open(mPath, opt) + defer curDb.Close() + if err != nil { + glog.WithField("filepath", mPath). + Fatal("While opening store") + } + it := curDb.NewIterator(ropt) + it.SeekToFirst() + if !it.Valid() { + storeIters = append(storeIters, it) + glog.WithField("path", mPath).Info("Store empty") + continue + } + item := &Item{ + key: it.Key(), + value: it.Value(), + storeIdx: i, + } + heap.Push(&pq, item) + storeIters = append(storeIters, it) + } + + var db *rocksdb.DB + db, err = rocksdb.Open(destPath, opt) + defer db.Close() + if err != nil { + glog.WithField("filepath", destPath). + Fatal("While opening store") + } + + var lastKey, lastValue []byte + for pq.Len() > 0 { + top := heap.Pop(&pq).(*Item) + + if bytes.Compare(top.key, lastKey) == 0 { + if !equalValue(top.value, lastValue) { + glog.WithField("key", lastKey). + Fatal("different value for same key") + } + } + db.Put(wopt, top.key, top.value) + lastKey = top.key + lastValue = top.value + + storeIters[top.storeIdx].Next() + if !storeIters[top.storeIdx].Valid() { + continue + } + item := &Item{ + key: storeIters[top.storeIdx].Key(), + value: storeIters[top.storeIdx].Value(), + storeIdx: top.storeIdx, + } + heap.Push(&pq, item) + } +} + +func main() { + flag.Parse() + if len(*stores) == 0 { + glog.Fatal("No Directory specified") + } + + mergeFolders(*stores, *destinationDB) +} diff --git a/tools/merge/main_test.go b/tools/merge/main_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a52c36e9d675f34a961ee96f394121728094c416 --- /dev/null +++ b/tools/merge/main_test.go @@ -0,0 +1,101 @@ +package main + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/loader" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/store/rocksdb" + "github.com/dgraph-io/dgraph/uid" + "github.com/dgryski/go-farm" +) + +func TestMergeFolders(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + rootDir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(rootDir) + + dir1, err := ioutil.TempDir(rootDir, "dir_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir1) + + dir2, err := ioutil.TempDir(rootDir, "dir_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir2) + + destDir, err := ioutil.TempDir("", "dest_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(destDir) + + ps1 := new(store.Store) + ps1.Init(dir1) + + ps2 := new(store.Store) + ps2.Init(dir2) + + list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph", + "ash", "alice"} + var numInstances uint64 = 2 + posting.Init(nil) + uid.Init(ps1) + loader.Init(nil, ps1) + for _, str := range list { + if farm.Fingerprint64([]byte(str))%numInstances == 0 { + _, err := uid.GetOrAssign(str, 0, numInstances) + if err != nil { + fmt.Errorf("error while assigning uid") + } + } + } + uid.Init(ps2) + loader.Init(nil, ps2) + for _, str := range list { + if farm.Fingerprint64([]byte(str))%numInstances == 1 { + uid.Init(ps2) + _, err := uid.GetOrAssign(str, 1, numInstances) + if err != nil { + fmt.Errorf("error while assigning uid") + } + + } + } + posting.MergeLists(100) + ps1.Close() + ps2.Close() + + mergeFolders(rootDir, destDir) + + var opt *rocksdb.Options + var ropt *rocksdb.ReadOptions + opt = rocksdb.NewOptions() + ropt = rocksdb.NewReadOptions() + db, err := rocksdb.Open(destDir, opt) + it := db.NewIterator(ropt) + + count := 0 + for it.SeekToFirst(); it.Valid(); it.Next() { + count++ + } + + if count != 6 { // There are totally 6 unique strings + fmt.Errorf("Not all the items have been assigned uid") + } +} diff --git a/tools/merge/merge.cc b/tools/merge/merge.cc deleted file mode 100644 index d2e2a5be3a5f253dd60656f184a985e5eae7a4de..0000000000000000000000000000000000000000 --- a/tools/merge/merge.cc +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * compile : g++ merge.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs - * usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder> - * - * find the rocksdb headers here https://github.com/facebook/rocksdb/tree/master/include/rocksdb - * - */ - -#include <fstream> -#include <cstdio> -#include <iostream> -#include <string> -#include "rocksdb/db.h" -#include "rocksdb/options.h" -#include <experimental/filesystem> - -using namespace rocksdb; -namespace fs = std::experimental::filesystem; - -int main(int argc, char* argv[]) { - if(argc != 3) { - std::cerr << "Wrong number of arguments\nusage : ./<executable>\ - <folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n"; - exit(0); - } - - std::string destinationDB = argv[2], mergeDir = argv[1]; - DB* db; - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, destinationDB, &db); - assert(s.ok()); - - for (auto& dirEntry : fs::directory_iterator(mergeDir)) { - std::cout << dirEntry << std::endl; - DB* cur_db; - Options options; - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // Don't create the DB if it's not already present - options.create_if_missing = false; - - // open DB - Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db); - assert(s1.ok()); - - rocksdb::Iterator* it = cur_db->NewIterator(rocksdb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - Slice key_s = it->key(); - Slice val_s = it->value(); - std::string val_t; - Status s = db->Get(ReadOptions(), key_s, &val_t); - if(s.ok()) { - assert(val_t == val_s.ToString() && "Same key has different value"); - } else { - s = db->Put(WriteOptions(), key_s, val_s); - assert(s.ok()); - } - } - assert(it->status().ok()); // Check for any errors found during the scan - delete it; - delete cur_db; - } - - delete db; - return 0; -} diff --git a/tools/merge/merge_heap.cc b/tools/merge/merge_heap.cc deleted file mode 100644 index 34e755d54d524896bc8e21d4257b98b66196d603..0000000000000000000000000000000000000000 --- a/tools/merge/merge_heap.cc +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * compile : g++ merge_heap.cc <path_to_rocksDB_installation>/librocksdb.so.4.1 --std=c++11 -lstdc++fs - * usage : ./<executable> <folder_having_rocksDB_directories_to_be_merged> <destination_folder> - * - * rocksdb headers can be found here https://github.com/facebook/rocksdb/tree/master/include/rocksdb - */ - -#include <fstream> -#include <cstdio> -#include <iostream> -#include <string> -#include <queue> -#include <vector> -#include "rocksdb/db.h" -#include "rocksdb/options.h" -#include <experimental/filesystem> - -using namespace rocksdb; -namespace fs = std::experimental::filesystem; - -class node { -public: - Slice key; - Slice value; - int idx; - node(Slice k, Slice v, int id) { - key = k; - value = v; - idx = id; - } -}; - -class compare { - public: - bool operator()(node &a, node &b) { - return a.key.compare(b.key) <= 0; - } -}; - -int main(int argc, char* argv[]) { - if(argc != 3) { - std::cerr << "Wrong number of arguments\nusage : ./<executable>\ - <folder_having_rocksDB_directories_to_be_merged> <destination_folder>\n"; - exit(0); - } - - int counter = 0; - std::priority_queue<struct node, std::vector<node>, compare> pq; - std::vector<rocksdb::Iterator*> itVec; - std::string destinationDB = argv[2], mergeDir = argv[1]; - DB* db; - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, destinationDB, &db); - assert(s.ok()); - - for (auto& dirEntry : fs::directory_iterator(mergeDir)) { - std::cout << dirEntry << std::endl; - DB* cur_db; - Options options; - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // Don't create the DB if it's not already present - options.create_if_missing = false; - - // open DB - Status s1 = DB::Open(options, dirEntry.path().c_str(), &cur_db); - assert(s1.ok()); - - rocksdb::Iterator *it = cur_db->NewIterator(rocksdb::ReadOptions()); - it->SeekToFirst(); - if(!it->Valid()) { - continue; - } - struct node tnode(it->key(), it->value(), counter++); - itVec.push_back(it); - pq.push(tnode); - } - - Slice lastKey, lastValue; - - while(!pq.empty()) { - const struct node &top = pq.top(); - pq.pop(); - - if(top.key == lastKey) { - assert(top.value == lastValue); - } else { - s = db->Put(WriteOptions(), top.key, top.value); - assert(s.ok()); - lastKey = top.key; - lastValue = top.value; - } - - itVec[top.idx]->Next(); - if(!itVec[top.idx]->Valid()) { - continue; - } - struct node tnode(itVec[top.idx]->key(), itVec[top.idx]->value(), top.idx); - pq.push(tnode); - } - - delete db; - return 0; -}