Skip to content
Snippets Groups Projects
Commit 09c9d98a authored by Manish R Jain's avatar Manish R Jain
Browse files

Move Mutations from store to commit logs.

parent 24de201f
No related branches found
No related tags found
No related merge requests found
......@@ -92,7 +92,7 @@ func (l *Logger) resetCounters() {
func (l *Logger) periodicSync() {
glog.WithField("dur", l.SyncDur).Debug("Periodic sync.")
if l.SyncDur == 0 {
glog.Info("No Periodic Sync for commit log.")
glog.Debug("No Periodic Sync for commit log.")
return
}
......@@ -169,6 +169,7 @@ func (l *Logger) Init() {
l.Lock()
defer l.Unlock()
glog.Debug("Logger init started.")
{
// First check if we have a current file.
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
......@@ -199,6 +200,7 @@ func (l *Logger) Init() {
l.createNew()
}
go l.periodicSync()
glog.Debug("Logger init finished.")
}
func (l *Logger) filepath(ts int64) string {
......@@ -419,6 +421,8 @@ func streamEntriesInFile(path string,
return nil
}
// Always run this method in it's own goroutine. Otherwise, your program
// will just hang waiting on channels.
func (l *Logger) StreamEntries(afterTs int64, hash uint32,
ch chan []byte, done chan error) {
......@@ -433,7 +437,9 @@ func (l *Logger) StreamEntries(afterTs int64, hash uint32,
{
cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
paths = append(paths, cur)
if _, err := os.Stat(cur); err == nil {
paths = append(paths, cur)
}
}
for _, path := range paths {
if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil {
......
......@@ -26,15 +26,17 @@ import (
"sort"
"sync"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
"github.com/google/flatbuffers/go"
linked "container/list"
)
var log = x.Log("posting")
var glog = x.Log("posting")
const Set = 0x01
const Del = 0x02
......@@ -46,12 +48,14 @@ type MutationLink struct {
}
type List struct {
key []byte
mutex sync.RWMutex
buffer []byte
mbuffer []byte
pstore *store.Store // postinglist store
mstore *store.Store // mutation store
sync.RWMutex
key []byte
hash uint32
buffer []byte
mutations []*types.Posting
pstore *store.Store // postinglist store
clog *commit.Logger
maxMutationTs int64 // Track maximum mutation ts
// mlayer keeps only replace instructions for the posting list.
// This works at the
......@@ -71,22 +75,51 @@ func Key(uid uint64, attr string) []byte {
buf := new(bytes.Buffer)
buf.WriteString(attr)
if err := binary.Write(buf, binary.LittleEndian, uid); err != nil {
log.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid)
glog.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid)
}
return buf.Bytes()
}
func newPosting(t x.DirectedEdge, op byte) []byte {
b := flatbuffers.NewBuilder(0)
var bo flatbuffers.UOffsetT
if t.Value != nil {
if t.ValueId != math.MaxUint64 {
glog.Fatal("This should have already been set by the caller.")
}
bytes, err := json.Marshal(t.Value)
if err != nil {
glog.WithError(err).Fatal("Unable to marshal value")
return []byte{}
}
bo = b.CreateByteVector(bytes)
}
so := b.CreateString(t.Source)
types.PostingStart(b)
if bo > 0 {
types.PostingAddValue(b, bo)
}
types.PostingAddUid(b, t.ValueId)
types.PostingAddSource(b, so)
types.PostingAddTs(b, t.Timestamp.UnixNano())
types.PostingAddOp(b, op)
vend := types.PostingEnd(b)
b.Finish(vend)
return b.Bytes[b.Head():]
}
func addEdgeToPosting(b *flatbuffers.Builder,
t x.DirectedEdge, op byte) flatbuffers.UOffsetT {
var bo flatbuffers.UOffsetT
if t.Value != nil {
if t.ValueId != math.MaxUint64 {
log.Fatal("This should have already been set by the caller.")
glog.Fatal("This should have already been set by the caller.")
}
bytes, err := json.Marshal(t.Value)
if err != nil {
x.Err(log, err).Fatal("Unable to marshal value")
glog.WithError(err).Fatal("Unable to marshal value")
return 0
}
bo = b.CreateByteVector(bytes)
......@@ -144,7 +177,7 @@ func init() {
emptyPosting = b.Bytes[b.Head():]
}
log.Infof("Empty size: [%d] EmptyPosting size: [%d]",
glog.Infof("Empty size: [%d] EmptyPosting size: [%d]",
len(empty), len(emptyPosting))
}
......@@ -161,32 +194,50 @@ func ParseValue(i *interface{}, value []byte) error {
return json.Unmarshal(value, i)
}
func (l *List) init(key []byte, pstore, mstore *store.Store) {
l.mutex.Lock()
defer l.mutex.Unlock()
func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
l.Lock()
defer l.Unlock()
if len(empty) == 0 {
log.Fatal("empty should have some bytes.")
glog.Fatal("empty should have some bytes.")
}
l.key = key
l.pstore = pstore
l.mstore = mstore
l.clog = clog
var err error
if l.buffer, err = pstore.Get(key); err != nil {
// log.Debugf("While retrieving posting list from db: %v\n", err)
// glog.Debugf("While retrieving posting list from db: %v\n", err)
// Error. Just set to empty.
l.buffer = make([]byte, len(empty))
copy(l.buffer, empty)
}
if l.mbuffer, err = mstore.Get(key); err != nil {
// log.Debugf("While retrieving mutation list from db: %v\n", err)
// Error. Just set to empty.
l.mbuffer = make([]byte, len(empty))
copy(l.mbuffer, empty)
posting := types.GetRootAsPostingList(l.buffer, 0)
l.maxMutationTs = posting.CommitTs()
l.hash = farm.Fingerprint32(key)
ch := make(chan []byte, 100)
done := make(chan error)
glog.Debug("Starting stream entries...")
go clog.StreamEntries(posting.CommitTs()+1, l.hash, ch, done)
for buffer := range ch {
uo := flatbuffers.GetUOffsetT(buffer)
m := new(types.Posting)
m.Init(buffer, uo)
l.mergeWithList(m)
}
if err := <-done; err != nil {
glog.WithError(err).Error("While streaming entries.")
}
glog.Debug("Done streaming entries.")
if len(l.mutations) > 0 {
// Commit Logs are always streamed in increasing ts order.
l.maxMutationTs = l.mutations[len(l.mutations)-1].Ts()
}
l.generateIndex()
l.regenerateIndex()
}
func findIndex(pl *types.PostingList, uid uint64, begin, end int) int {
......@@ -220,14 +271,14 @@ func (l *List) length() int {
}
func (l *List) Length() int {
l.mutex.RLock()
defer l.mutex.RUnlock()
l.RLock()
defer l.RUnlock()
return l.length()
}
func (l *List) Get(p *types.Posting, i int) bool {
l.mutex.RLock()
defer l.mutex.RUnlock()
l.RLock()
defer l.RUnlock()
return l.get(p, i)
}
......@@ -261,7 +312,7 @@ func (l *List) get(p *types.Posting, i int) bool {
// variable.
} else {
log.Fatal("Someone, I mean you, forgot to tackle" +
glog.Fatal("Someone, I mean you, forgot to tackle" +
" this operation. Stop drinking.")
}
}
......@@ -304,7 +355,7 @@ func (l *List) get(p *types.Posting, i int) bool {
// Effective: ADD DEL REP (REP = replace)
//
// ----------------------------------------------------------------------------
// generateIndex would generate these:
// regenerateIndex would generate these:
// mlayer (layer just above posting list contains only replace instructions)
// idx: 4
// value: 13'
......@@ -332,22 +383,15 @@ func (l *List) get(p *types.Posting, i int) bool {
// still ensuring fast lookup access.
//
// NOTE: This function expects the caller to hold a RW Lock.
func (l *List) generateIndex() {
func (l *List) regenerateIndex() {
l.mindex = nil
l.mdelta = 0
l.mlayer = make(map[int]types.Posting)
mlist := types.GetRootAsPostingList(l.mbuffer, 0)
plist := types.GetRootAsPostingList(l.buffer, 0)
if mlist.PostingsLength() == 0 {
if len(l.mutations) == 0 {
return
}
var muts []*types.Posting
for i := 0; i < mlist.PostingsLength(); i++ {
var mp types.Posting
mlist.Postings(&mp, i)
muts = append(muts, &mp)
}
sort.Sort(ByUid(muts))
sort.Sort(ByUid(l.mutations))
mchain := linked.New()
pi := 0
......@@ -357,13 +401,16 @@ func (l *List) generateIndex() {
padding := flatbuffers.GetUOffsetT(emptyPosting)
pp.Init(emptyPosting, padding)
if pp.Uid() != 0 {
log.Fatal("Playing with bytes is like playing with fire." +
glog.Fatal("Playing with bytes is like playing with fire." +
" Someone got burnt today!")
}
}
// The following algorithm is O(m + n), where m = number of mutations, and
// n = number of immutable postings. This could be optimized
// to O(1) with potentially more complexity. TODO: Look into that later.
l.mdelta = 0
for mi, mp := range muts {
for mi, mp := range l.mutations {
// TODO: Consider converting to binary search later.
for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ {
plist.Postings(pp, pi)
......@@ -384,7 +431,7 @@ func (l *List) generateIndex() {
l.mdelta -= 1
} else {
log.Fatal("This operation isn't being handled.")
glog.Fatal("This operation isn't being handled.")
}
} else if mp.Uid() < pp.Uid() {
// This is an add, so move the plist index backwards.
......@@ -419,9 +466,49 @@ func (l *List) addIfValid(b *flatbuffers.Builder,
}
}
// Assumes a lock has already been acquired.
func (l *List) mergeWithList(mpost *types.Posting) {
// If this mutation is a deletion, then check if there's a valid uid entry
// in the immutable postinglist. If not, we can ignore this mutation.
ignore := false
if mpost.Op() == Del {
if fi := l.find(mpost.Uid()); fi < 0 {
ignore = true
}
}
// Check if we already have a mutation entry with the same uid.
// If so, ignore (in case of del)/replace it. Otherwise, append it.
handled := false
final := l.mutations[:0]
for _, mut := range l.mutations {
// mut := &l.mutations[i]
if mpost.Uid() != mut.Uid() {
final = append(final, mut)
continue
}
handled = true
if ignore {
// Don't add to final.
} else {
final = append(final, mpost) // replaced original.
}
}
if handled {
l.mutations = final
} else {
l.mutations = append(l.mutations, mpost)
}
}
func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
l.Lock()
defer l.Unlock()
if t.Timestamp.UnixNano() < l.maxMutationTs {
return fmt.Errorf("Mutation ts lower than committed ts.")
}
// Mutation arrives:
// - Check if we had any(SET/DEL) before this, stored in the mutation list.
......@@ -435,48 +522,15 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
if t.Value != nil {
t.ValueId = math.MaxUint64
}
mbuf := newPosting(t, op)
uo := flatbuffers.GetUOffsetT(mbuf)
mpost := new(types.Posting)
mpost.Init(mbuf, uo)
b := flatbuffers.NewBuilder(0)
muts := types.GetRootAsPostingList(l.mbuffer, 0)
var offsets []flatbuffers.UOffsetT
added := false
for i := 0; i < muts.PostingsLength(); i++ {
var p types.Posting
if ok := muts.Postings(&p, i); !ok {
log.Fatal("While reading posting")
return errors.New("Error reading posting")
}
if p.Uid() != t.ValueId {
offsets = append(offsets, addPosting(b, p))
} else {
// An operation on something we already have a mutation for.
// Overwrite the previous one if it came earlier.
if p.Ts() <= t.Timestamp.UnixNano() {
l.addIfValid(b, &offsets, t, op)
} // else keep the previous one.
added = true
}
}
if !added {
l.addIfValid(b, &offsets, t, op)
}
types.PostingListStartPostingsVector(b, len(offsets))
for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i])
}
vend := b.EndVector(len(offsets))
types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
end := types.PostingListEnd(b)
b.Finish(end)
l.mbuffer = b.Bytes[b.Head():]
l.generateIndex()
return l.mstore.SetOne(l.key, l.mbuffer)
l.mergeWithList(mpost)
l.regenerateIndex()
l.maxMutationTs = t.Timestamp.UnixNano()
return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf)
}
func addOrSet(ll *linked.List, p *types.Posting) {
......@@ -484,7 +538,7 @@ func addOrSet(ll *linked.List, p *types.Posting) {
for e := ll.Front(); e != nil; e = e.Next() {
pe := e.Value.(*types.Posting)
if pe == nil {
log.Fatal("Posting shouldn't be nil!")
glog.Fatal("Posting shouldn't be nil!")
}
if !added && pe.Uid() > p.Uid() {
......@@ -510,22 +564,26 @@ func remove(ll *linked.List, p *types.Posting) {
}
}
func (l *List) generateLinkedList() *linked.List {
func (l *List) generateLinkedList() (*linked.List, int64) {
plist := types.GetRootAsPostingList(l.buffer, 0)
ll := linked.New()
var maxTs int64
for i := 0; i < plist.PostingsLength(); i++ {
p := new(types.Posting)
plist.Postings(p, i)
if maxTs < p.Ts() {
maxTs = p.Ts()
}
ll.PushBack(p)
}
mlist := types.GetRootAsPostingList(l.mbuffer, 0)
// Now go through mutations
for i := 0; i < mlist.PostingsLength(); i++ {
p := new(types.Posting)
mlist.Postings(p, i)
for _, p := range l.mutations {
if maxTs < p.Ts() {
maxTs = p.Ts()
}
if p.Op() == 0x01 {
// Set/Add
......@@ -536,31 +594,32 @@ func (l *List) generateLinkedList() *linked.List {
remove(ll, p)
} else {
log.Fatalf("Strange mutation: %+v", p)
glog.Fatalf("Strange mutation: %+v", p)
}
}
return ll
return ll, maxTs
}
func (l *List) isDirty() bool {
l.mutex.RLock()
defer l.mutex.RUnlock()
l.RLock()
defer l.RUnlock()
return l.mindex != nil
}
func (l *List) CommitIfDirty() error {
if !l.isDirty() {
log.WithField("dirty", false).Debug("Not Committing")
glog.WithField("dirty", false).Debug("Not Committing")
return nil
} else {
log.WithField("dirty", true).Debug("Committing")
glog.WithField("dirty", true).Debug("Committing")
}
l.mutex.Lock()
defer l.mutex.Unlock()
l.Lock()
defer l.Unlock()
ll, commitTs := l.generateLinkedList()
ll := l.generateLinkedList()
b := flatbuffers.NewBuilder(0)
var offsets []flatbuffers.UOffsetT
......@@ -578,28 +637,24 @@ func (l *List) CommitIfDirty() error {
types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
types.PostingListAddCommitTs(b, commitTs)
end := types.PostingListEnd(b)
b.Finish(end)
l.buffer = b.Bytes[b.Head():]
if err := l.pstore.SetOne(l.key, l.buffer); err != nil {
log.WithField("error", err).Errorf("While storing posting list")
glog.WithField("error", err).Errorf("While storing posting list")
return err
}
l.mutations = nil
if err := l.mstore.Delete(l.key); err != nil {
log.WithField("error", err).Errorf("While deleting mutation list")
return err
}
l.mbuffer = make([]byte, len(empty))
copy(l.mbuffer, empty)
l.generateIndex()
l.regenerateIndex()
return nil
}
func (l *List) GetUids() []uint64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
l.RLock()
defer l.RUnlock()
result := make([]uint64, l.length())
result = result[:0]
......@@ -614,8 +669,8 @@ func (l *List) GetUids() []uint64 {
}
func (l *List) Value() (result []byte, rerr error) {
l.mutex.RLock()
defer l.mutex.RUnlock()
l.RLock()
defer l.RUnlock()
if l.length() == 0 {
return result, fmt.Errorf("No value found")
......
......@@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
......@@ -45,30 +46,25 @@ func checkUids(t *testing.T, l List, uids ...uint64) error {
return nil
}
func NewStore(t *testing.T) string {
path, err := ioutil.TempDir("", "storetest_")
func TestAddMutation(t *testing.T) {
// logrus.SetLevel(logrus.DebugLevel)
var l List
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
t.Fail()
return ""
return
}
return path
}
func TestAddMutation(t *testing.T) {
var l List
key := Key(1, "name")
pdir := NewStore(t)
defer os.RemoveAll(pdir)
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(pdir)
ps.Init(dir)
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
l.init(key, ps, ms)
l.init(key, ps, clog)
edge := x.DirectedEdge{
ValueId: 9,
......@@ -179,7 +175,7 @@ func TestAddMutation(t *testing.T) {
*/
// Try reading the same data in another PostingList.
var dl List
dl.init(key, ps, ms)
dl.init(key, ps, clog)
if err := checkUids(t, dl, uids...); err != nil {
t.Error(err)
}
......@@ -193,19 +189,26 @@ func TestAddMutation(t *testing.T) {
}
func TestAddMutation_Value(t *testing.T) {
// logrus.SetLevel(logrus.DebugLevel)
glog.Debug("Running init...")
var ol List
key := Key(10, "value")
pdir := NewStore(t)
defer os.RemoveAll(pdir)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(pdir)
ps.Init(dir)
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
ol.init(key, ps, ms)
ol.init(key, ps, clog)
glog.Debug("Init successful.")
edge := x.DirectedEdge{
Value: "oh hey there",
......@@ -266,3 +269,48 @@ func TestAddMutation_Value(t *testing.T) {
t.Errorf("Expected 119. Got: %v", intout)
}
}
func benchmarkAddMutations(n int, b *testing.B) {
var l List
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
b.Error(err)
return
}
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(dir)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.SyncEvery = n
clog.Init()
defer clog.Close()
l.init(key, ps, clog)
b.ResetTimer()
ts := time.Now()
for i := 0; i < b.N; i++ {
edge := x.DirectedEdge{
ValueId: uint64(i),
Source: "testing",
Timestamp: ts.Add(time.Microsecond),
}
if err := l.AddMutation(edge, Set); err != nil {
b.Error(err)
}
}
}
func BenchmarkAddMutations_SyncEveryLogEntry(b *testing.B) {
benchmarkAddMutations(0, b)
}
func BenchmarkAddMutations_SyncEvery10LogEntry(b *testing.B) {
benchmarkAddMutations(10, b)
}
func BenchmarkAddMutations_SyncEvery100LogEntry(b *testing.B) {
benchmarkAddMutations(100, b)
}
......@@ -19,22 +19,23 @@ package posting
import (
"sync"
"github.com/dgryski/go-farm"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/store"
"github.com/dgryski/go-farm"
)
var lmutex sync.RWMutex
var lcache map[uint64]*List
var pstore *store.Store
var mstore *store.Store
var clog *commit.Logger
func Init(posting *store.Store, mutation *store.Store) {
func Init(posting *store.Store, log *commit.Logger) {
lmutex.Lock()
defer lmutex.Unlock()
lcache = make(map[uint64]*List)
pstore = posting
mstore = mutation
clog = log
}
func Get(key []byte) *List {
......@@ -56,7 +57,7 @@ func Get(key []byte) *List {
}
list := new(List)
list.init(key, pstore, mstore)
list.init(key, pstore, clog)
lcache[uid] = list
return list
}
......@@ -9,6 +9,7 @@ table Posting {
}
table PostingList {
commitTs:long;
postings:[Posting];
}
......
......@@ -21,8 +21,16 @@ func (rcv *PostingList) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Pos = i
}
func (rcv *PostingList) Postings(obj *Posting, j int) bool {
func (rcv *PostingList) CommitTs() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *PostingList) Postings(obj *Posting, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4
......@@ -37,15 +45,16 @@ func (rcv *PostingList) Postings(obj *Posting, j int) bool {
}
func (rcv *PostingList) PostingsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(1) }
func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(postings), 0) }
func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(2) }
func PostingListAddCommitTs(builder *flatbuffers.Builder, commitTs int64) { builder.PrependInt64Slot(0, commitTs, 0) }
func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(postings), 0) }
func PostingListStartPostingsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4)
}
func PostingListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }
......@@ -2,11 +2,12 @@ package posting
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
......@@ -39,18 +40,23 @@ func check(r *task.Result, idx int, expected []uint64) error {
}
func TestProcessTask(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
// logrus.SetLevel(logrus.DebugLevel)
pdir := NewStore(t)
defer os.RemoveAll(pdir)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(pdir)
ps.Init(dir)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
Init(ps, ms)
Init(ps, clog)
edge := x.DirectedEdge{
ValueId: 23,
......
......@@ -24,6 +24,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
......@@ -117,16 +118,6 @@ func TestRun(t *testing.T) {
}
*/
func NewStore(t *testing.T) string {
path, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
t.Fail()
return ""
}
return path
}
func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) {
if err := l.AddMutation(edge, posting.Set); err != nil {
t.Error(err)
......@@ -201,17 +192,20 @@ func TestNewGraph(t *testing.T) {
func populateGraph(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
pdir := NewStore(t)
defer os.RemoveAll(pdir)
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(pdir)
ps.Init(dir)
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
posting.Init(ps, ms)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
// So, user we're interested in has uid: 1.
// She has 4 friends: 23, 24, 25, 31, and 101
......
......@@ -25,6 +25,7 @@ import (
"os"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
......@@ -131,9 +132,10 @@ func main() {
ps := new(store.Store)
ps.Init(*postingDir)
ms := new(store.Store)
ms.Init(*mutationDir)
posting.Init(ps, ms)
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
if len(*rdfData) > 0 {
f, err := os.Open(*rdfData)
......
......@@ -22,6 +22,7 @@ import (
"os"
"testing"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
......@@ -46,22 +47,20 @@ var q0 = `
`
func prepare() error {
pdir, err := NewStore()
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
return err
}
defer os.RemoveAll(pdir)
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(pdir)
ps.Init(dir)
mdir, err := NewStore()
if err != nil {
return err
}
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
posting.Init(ps, ms)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
f, err := os.Open("testdata.nq")
if err != nil {
......@@ -73,6 +72,8 @@ func prepare() error {
return err
}
return nil
// Even though all files would be closed and the directory deleted,
// postings would still be present in memory.
}
func TestQuery(t *testing.T) {
......
......@@ -22,33 +22,27 @@ import (
"testing"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
)
func NewStore(t *testing.T) string {
path, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
t.Fail()
return ""
}
return path
}
func TestGetOrAssign(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
pdir := NewStore(t)
defer os.RemoveAll(pdir)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
ps := new(store.Store)
ps.Init(pdir)
ps.Init(dir)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
posting.Init(ps, ms)
posting.Init(ps, clog)
var u1, u2 uint64
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment