diff --git a/commit/log.go b/commit/log.go index ecc329bdebd4ced78c30f76e35795a03c9456668..307255cf306cfc595ac262827c93cdb3e65b392a 100644 --- a/commit/log.go +++ b/commit/log.go @@ -92,7 +92,7 @@ func (l *Logger) resetCounters() { func (l *Logger) periodicSync() { glog.WithField("dur", l.SyncDur).Debug("Periodic sync.") if l.SyncDur == 0 { - glog.Info("No Periodic Sync for commit log.") + glog.Debug("No Periodic Sync for commit log.") return } @@ -169,6 +169,7 @@ func (l *Logger) Init() { l.Lock() defer l.Unlock() + glog.Debug("Logger init started.") { // First check if we have a current file. path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) @@ -199,6 +200,7 @@ func (l *Logger) Init() { l.createNew() } go l.periodicSync() + glog.Debug("Logger init finished.") } func (l *Logger) filepath(ts int64) string { @@ -419,6 +421,8 @@ func streamEntriesInFile(path string, return nil } +// Always run this method in it's own goroutine. Otherwise, your program +// will just hang waiting on channels. func (l *Logger) StreamEntries(afterTs int64, hash uint32, ch chan []byte, done chan error) { @@ -433,7 +437,9 @@ func (l *Logger) StreamEntries(afterTs int64, hash uint32, { cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) - paths = append(paths, cur) + if _, err := os.Stat(cur); err == nil { + paths = append(paths, cur) + } } for _, path := range paths { if err := streamEntriesInFile(path, afterTs, hash, ch); err != nil { diff --git a/posting/list.go b/posting/list.go index a1a658af0471ea30649efd55bb59f68eec2b8643..c8972f3bbc2ca61acc0cde394222a30cc8384e5f 100644 --- a/posting/list.go +++ b/posting/list.go @@ -26,15 +26,17 @@ import ( "sort" "sync" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" + "github.com/dgryski/go-farm" "github.com/google/flatbuffers/go" linked "container/list" ) -var log = x.Log("posting") +var glog = x.Log("posting") const Set = 0x01 const Del = 0x02 @@ -46,12 +48,14 @@ type MutationLink struct { } type List struct { - key []byte - mutex sync.RWMutex - buffer []byte - mbuffer []byte - pstore *store.Store // postinglist store - mstore *store.Store // mutation store + sync.RWMutex + key []byte + hash uint32 + buffer []byte + mutations []*types.Posting + pstore *store.Store // postinglist store + clog *commit.Logger + maxMutationTs int64 // Track maximum mutation ts // mlayer keeps only replace instructions for the posting list. // This works at the @@ -71,22 +75,51 @@ func Key(uid uint64, attr string) []byte { buf := new(bytes.Buffer) buf.WriteString(attr) if err := binary.Write(buf, binary.LittleEndian, uid); err != nil { - log.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid) + glog.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid) } return buf.Bytes() } +func newPosting(t x.DirectedEdge, op byte) []byte { + b := flatbuffers.NewBuilder(0) + var bo flatbuffers.UOffsetT + if t.Value != nil { + if t.ValueId != math.MaxUint64 { + glog.Fatal("This should have already been set by the caller.") + } + bytes, err := json.Marshal(t.Value) + if err != nil { + glog.WithError(err).Fatal("Unable to marshal value") + return []byte{} + } + bo = b.CreateByteVector(bytes) + } + so := b.CreateString(t.Source) + types.PostingStart(b) + if bo > 0 { + types.PostingAddValue(b, bo) + } + types.PostingAddUid(b, t.ValueId) + types.PostingAddSource(b, so) + types.PostingAddTs(b, t.Timestamp.UnixNano()) + types.PostingAddOp(b, op) + vend := types.PostingEnd(b) + b.Finish(vend) + + return b.Bytes[b.Head():] +} + func addEdgeToPosting(b *flatbuffers.Builder, t x.DirectedEdge, op byte) flatbuffers.UOffsetT { var bo flatbuffers.UOffsetT if t.Value != nil { if t.ValueId != math.MaxUint64 { - log.Fatal("This should have already been set by the caller.") + glog.Fatal("This should have already been set by the caller.") } bytes, err := json.Marshal(t.Value) if err != nil { - x.Err(log, err).Fatal("Unable to marshal value") + glog.WithError(err).Fatal("Unable to marshal value") return 0 } bo = b.CreateByteVector(bytes) @@ -144,7 +177,7 @@ func init() { emptyPosting = b.Bytes[b.Head():] } - log.Infof("Empty size: [%d] EmptyPosting size: [%d]", + glog.Infof("Empty size: [%d] EmptyPosting size: [%d]", len(empty), len(emptyPosting)) } @@ -161,32 +194,50 @@ func ParseValue(i *interface{}, value []byte) error { return json.Unmarshal(value, i) } -func (l *List) init(key []byte, pstore, mstore *store.Store) { - l.mutex.Lock() - defer l.mutex.Unlock() +func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { + l.Lock() + defer l.Unlock() if len(empty) == 0 { - log.Fatal("empty should have some bytes.") + glog.Fatal("empty should have some bytes.") } l.key = key l.pstore = pstore - l.mstore = mstore + l.clog = clog var err error if l.buffer, err = pstore.Get(key); err != nil { - // log.Debugf("While retrieving posting list from db: %v\n", err) + // glog.Debugf("While retrieving posting list from db: %v\n", err) // Error. Just set to empty. l.buffer = make([]byte, len(empty)) copy(l.buffer, empty) } - if l.mbuffer, err = mstore.Get(key); err != nil { - // log.Debugf("While retrieving mutation list from db: %v\n", err) - // Error. Just set to empty. - l.mbuffer = make([]byte, len(empty)) - copy(l.mbuffer, empty) + posting := types.GetRootAsPostingList(l.buffer, 0) + l.maxMutationTs = posting.CommitTs() + l.hash = farm.Fingerprint32(key) + + ch := make(chan []byte, 100) + done := make(chan error) + glog.Debug("Starting stream entries...") + go clog.StreamEntries(posting.CommitTs()+1, l.hash, ch, done) + + for buffer := range ch { + uo := flatbuffers.GetUOffsetT(buffer) + m := new(types.Posting) + m.Init(buffer, uo) + l.mergeWithList(m) + } + if err := <-done; err != nil { + glog.WithError(err).Error("While streaming entries.") + } + glog.Debug("Done streaming entries.") + if len(l.mutations) > 0 { + // Commit Logs are always streamed in increasing ts order. + l.maxMutationTs = l.mutations[len(l.mutations)-1].Ts() } - l.generateIndex() + + l.regenerateIndex() } func findIndex(pl *types.PostingList, uid uint64, begin, end int) int { @@ -220,14 +271,14 @@ func (l *List) length() int { } func (l *List) Length() int { - l.mutex.RLock() - defer l.mutex.RUnlock() + l.RLock() + defer l.RUnlock() return l.length() } func (l *List) Get(p *types.Posting, i int) bool { - l.mutex.RLock() - defer l.mutex.RUnlock() + l.RLock() + defer l.RUnlock() return l.get(p, i) } @@ -261,7 +312,7 @@ func (l *List) get(p *types.Posting, i int) bool { // variable. } else { - log.Fatal("Someone, I mean you, forgot to tackle" + + glog.Fatal("Someone, I mean you, forgot to tackle" + " this operation. Stop drinking.") } } @@ -304,7 +355,7 @@ func (l *List) get(p *types.Posting, i int) bool { // Effective: ADD DEL REP (REP = replace) // // ---------------------------------------------------------------------------- -// generateIndex would generate these: +// regenerateIndex would generate these: // mlayer (layer just above posting list contains only replace instructions) // idx: 4 // value: 13' @@ -332,22 +383,15 @@ func (l *List) get(p *types.Posting, i int) bool { // still ensuring fast lookup access. // // NOTE: This function expects the caller to hold a RW Lock. -func (l *List) generateIndex() { +func (l *List) regenerateIndex() { l.mindex = nil l.mdelta = 0 l.mlayer = make(map[int]types.Posting) - mlist := types.GetRootAsPostingList(l.mbuffer, 0) plist := types.GetRootAsPostingList(l.buffer, 0) - if mlist.PostingsLength() == 0 { + if len(l.mutations) == 0 { return } - var muts []*types.Posting - for i := 0; i < mlist.PostingsLength(); i++ { - var mp types.Posting - mlist.Postings(&mp, i) - muts = append(muts, &mp) - } - sort.Sort(ByUid(muts)) + sort.Sort(ByUid(l.mutations)) mchain := linked.New() pi := 0 @@ -357,13 +401,16 @@ func (l *List) generateIndex() { padding := flatbuffers.GetUOffsetT(emptyPosting) pp.Init(emptyPosting, padding) if pp.Uid() != 0 { - log.Fatal("Playing with bytes is like playing with fire." + + glog.Fatal("Playing with bytes is like playing with fire." + " Someone got burnt today!") } } + // The following algorithm is O(m + n), where m = number of mutations, and + // n = number of immutable postings. This could be optimized + // to O(1) with potentially more complexity. TODO: Look into that later. l.mdelta = 0 - for mi, mp := range muts { + for mi, mp := range l.mutations { // TODO: Consider converting to binary search later. for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ { plist.Postings(pp, pi) @@ -384,7 +431,7 @@ func (l *List) generateIndex() { l.mdelta -= 1 } else { - log.Fatal("This operation isn't being handled.") + glog.Fatal("This operation isn't being handled.") } } else if mp.Uid() < pp.Uid() { // This is an add, so move the plist index backwards. @@ -419,9 +466,49 @@ func (l *List) addIfValid(b *flatbuffers.Builder, } } +// Assumes a lock has already been acquired. +func (l *List) mergeWithList(mpost *types.Posting) { + // If this mutation is a deletion, then check if there's a valid uid entry + // in the immutable postinglist. If not, we can ignore this mutation. + ignore := false + if mpost.Op() == Del { + if fi := l.find(mpost.Uid()); fi < 0 { + ignore = true + } + } + + // Check if we already have a mutation entry with the same uid. + // If so, ignore (in case of del)/replace it. Otherwise, append it. + handled := false + final := l.mutations[:0] + for _, mut := range l.mutations { + // mut := &l.mutations[i] + if mpost.Uid() != mut.Uid() { + final = append(final, mut) + continue + } + + handled = true + if ignore { + // Don't add to final. + } else { + final = append(final, mpost) // replaced original. + } + } + if handled { + l.mutations = final + } else { + l.mutations = append(l.mutations, mpost) + } +} + func (l *List) AddMutation(t x.DirectedEdge, op byte) error { - l.mutex.Lock() - defer l.mutex.Unlock() + l.Lock() + defer l.Unlock() + + if t.Timestamp.UnixNano() < l.maxMutationTs { + return fmt.Errorf("Mutation ts lower than committed ts.") + } // Mutation arrives: // - Check if we had any(SET/DEL) before this, stored in the mutation list. @@ -435,48 +522,15 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { if t.Value != nil { t.ValueId = math.MaxUint64 } + mbuf := newPosting(t, op) + uo := flatbuffers.GetUOffsetT(mbuf) + mpost := new(types.Posting) + mpost.Init(mbuf, uo) - b := flatbuffers.NewBuilder(0) - muts := types.GetRootAsPostingList(l.mbuffer, 0) - var offsets []flatbuffers.UOffsetT - added := false - for i := 0; i < muts.PostingsLength(); i++ { - var p types.Posting - if ok := muts.Postings(&p, i); !ok { - log.Fatal("While reading posting") - return errors.New("Error reading posting") - } - - if p.Uid() != t.ValueId { - offsets = append(offsets, addPosting(b, p)) - - } else { - // An operation on something we already have a mutation for. - // Overwrite the previous one if it came earlier. - if p.Ts() <= t.Timestamp.UnixNano() { - l.addIfValid(b, &offsets, t, op) - } // else keep the previous one. - added = true - } - } - if !added { - l.addIfValid(b, &offsets, t, op) - } - - types.PostingListStartPostingsVector(b, len(offsets)) - for i := len(offsets) - 1; i >= 0; i-- { - b.PrependUOffsetT(offsets[i]) - } - vend := b.EndVector(len(offsets)) - - types.PostingListStart(b) - types.PostingListAddPostings(b, vend) - end := types.PostingListEnd(b) - b.Finish(end) - - l.mbuffer = b.Bytes[b.Head():] - l.generateIndex() - return l.mstore.SetOne(l.key, l.mbuffer) + l.mergeWithList(mpost) + l.regenerateIndex() + l.maxMutationTs = t.Timestamp.UnixNano() + return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf) } func addOrSet(ll *linked.List, p *types.Posting) { @@ -484,7 +538,7 @@ func addOrSet(ll *linked.List, p *types.Posting) { for e := ll.Front(); e != nil; e = e.Next() { pe := e.Value.(*types.Posting) if pe == nil { - log.Fatal("Posting shouldn't be nil!") + glog.Fatal("Posting shouldn't be nil!") } if !added && pe.Uid() > p.Uid() { @@ -510,22 +564,26 @@ func remove(ll *linked.List, p *types.Posting) { } } -func (l *List) generateLinkedList() *linked.List { +func (l *List) generateLinkedList() (*linked.List, int64) { plist := types.GetRootAsPostingList(l.buffer, 0) ll := linked.New() + var maxTs int64 for i := 0; i < plist.PostingsLength(); i++ { p := new(types.Posting) plist.Postings(p, i) + if maxTs < p.Ts() { + maxTs = p.Ts() + } ll.PushBack(p) } - mlist := types.GetRootAsPostingList(l.mbuffer, 0) // Now go through mutations - for i := 0; i < mlist.PostingsLength(); i++ { - p := new(types.Posting) - mlist.Postings(p, i) + for _, p := range l.mutations { + if maxTs < p.Ts() { + maxTs = p.Ts() + } if p.Op() == 0x01 { // Set/Add @@ -536,31 +594,32 @@ func (l *List) generateLinkedList() *linked.List { remove(ll, p) } else { - log.Fatalf("Strange mutation: %+v", p) + glog.Fatalf("Strange mutation: %+v", p) } } - return ll + return ll, maxTs } func (l *List) isDirty() bool { - l.mutex.RLock() - defer l.mutex.RUnlock() + l.RLock() + defer l.RUnlock() return l.mindex != nil } func (l *List) CommitIfDirty() error { if !l.isDirty() { - log.WithField("dirty", false).Debug("Not Committing") + glog.WithField("dirty", false).Debug("Not Committing") return nil } else { - log.WithField("dirty", true).Debug("Committing") + glog.WithField("dirty", true).Debug("Committing") } - l.mutex.Lock() - defer l.mutex.Unlock() + l.Lock() + defer l.Unlock() + + ll, commitTs := l.generateLinkedList() - ll := l.generateLinkedList() b := flatbuffers.NewBuilder(0) var offsets []flatbuffers.UOffsetT @@ -578,28 +637,24 @@ func (l *List) CommitIfDirty() error { types.PostingListStart(b) types.PostingListAddPostings(b, vend) + types.PostingListAddCommitTs(b, commitTs) end := types.PostingListEnd(b) b.Finish(end) l.buffer = b.Bytes[b.Head():] if err := l.pstore.SetOne(l.key, l.buffer); err != nil { - log.WithField("error", err).Errorf("While storing posting list") + glog.WithField("error", err).Errorf("While storing posting list") return err } + l.mutations = nil - if err := l.mstore.Delete(l.key); err != nil { - log.WithField("error", err).Errorf("While deleting mutation list") - return err - } - l.mbuffer = make([]byte, len(empty)) - copy(l.mbuffer, empty) - l.generateIndex() + l.regenerateIndex() return nil } func (l *List) GetUids() []uint64 { - l.mutex.RLock() - defer l.mutex.RUnlock() + l.RLock() + defer l.RUnlock() result := make([]uint64, l.length()) result = result[:0] @@ -614,8 +669,8 @@ func (l *List) GetUids() []uint64 { } func (l *List) Value() (result []byte, rerr error) { - l.mutex.RLock() - defer l.mutex.RUnlock() + l.RLock() + defer l.RUnlock() if l.length() == 0 { return result, fmt.Errorf("No value found") diff --git a/posting/list_test.go b/posting/list_test.go index 40198872f01bb74db15c20af2a23b3cf69fdc515..bf54c6280b39b43efcdcb50fa0c0e12a7b309aad 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" @@ -45,30 +46,25 @@ func checkUids(t *testing.T, l List, uids ...uint64) error { return nil } -func NewStore(t *testing.T) string { - path, err := ioutil.TempDir("", "storetest_") +func TestAddMutation(t *testing.T) { + // logrus.SetLevel(logrus.DebugLevel) + var l List + key := Key(1, "name") + dir, err := ioutil.TempDir("", "storetest_") if err != nil { t.Error(err) - t.Fail() - return "" + return } - return path -} -func TestAddMutation(t *testing.T) { - var l List - key := Key(1, "name") - pdir := NewStore(t) - defer os.RemoveAll(pdir) + defer os.RemoveAll(dir) ps := new(store.Store) - ps.Init(pdir) + ps.Init(dir) - mdir := NewStore(t) - defer os.RemoveAll(mdir) - ms := new(store.Store) - ms.Init(mdir) + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.Init() + defer clog.Close() - l.init(key, ps, ms) + l.init(key, ps, clog) edge := x.DirectedEdge{ ValueId: 9, @@ -179,7 +175,7 @@ func TestAddMutation(t *testing.T) { */ // Try reading the same data in another PostingList. var dl List - dl.init(key, ps, ms) + dl.init(key, ps, clog) if err := checkUids(t, dl, uids...); err != nil { t.Error(err) } @@ -193,19 +189,26 @@ func TestAddMutation(t *testing.T) { } func TestAddMutation_Value(t *testing.T) { + // logrus.SetLevel(logrus.DebugLevel) + glog.Debug("Running init...") var ol List key := Key(10, "value") - pdir := NewStore(t) - defer os.RemoveAll(pdir) + dir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } + + defer os.RemoveAll(dir) ps := new(store.Store) - ps.Init(pdir) + ps.Init(dir) - mdir := NewStore(t) - defer os.RemoveAll(mdir) - ms := new(store.Store) - ms.Init(mdir) + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.Init() + defer clog.Close() - ol.init(key, ps, ms) + ol.init(key, ps, clog) + glog.Debug("Init successful.") edge := x.DirectedEdge{ Value: "oh hey there", @@ -266,3 +269,48 @@ func TestAddMutation_Value(t *testing.T) { t.Errorf("Expected 119. Got: %v", intout) } } + +func benchmarkAddMutations(n int, b *testing.B) { + var l List + key := Key(1, "name") + dir, err := ioutil.TempDir("", "storetest_") + if err != nil { + b.Error(err) + return + } + + defer os.RemoveAll(dir) + ps := new(store.Store) + ps.Init(dir) + + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.SyncEvery = n + clog.Init() + defer clog.Close() + l.init(key, ps, clog) + b.ResetTimer() + + ts := time.Now() + for i := 0; i < b.N; i++ { + edge := x.DirectedEdge{ + ValueId: uint64(i), + Source: "testing", + Timestamp: ts.Add(time.Microsecond), + } + if err := l.AddMutation(edge, Set); err != nil { + b.Error(err) + } + } +} + +func BenchmarkAddMutations_SyncEveryLogEntry(b *testing.B) { + benchmarkAddMutations(0, b) +} + +func BenchmarkAddMutations_SyncEvery10LogEntry(b *testing.B) { + benchmarkAddMutations(10, b) +} + +func BenchmarkAddMutations_SyncEvery100LogEntry(b *testing.B) { + benchmarkAddMutations(100, b) +} diff --git a/posting/lists.go b/posting/lists.go index 65bfb4000a83a31894712e6d3e4cc133d4a25abb..9bbcd589d5e21c2d1a21e74ba0be6f1da00decb6 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -19,22 +19,23 @@ package posting import ( "sync" - "github.com/dgryski/go-farm" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/store" + "github.com/dgryski/go-farm" ) var lmutex sync.RWMutex var lcache map[uint64]*List var pstore *store.Store -var mstore *store.Store +var clog *commit.Logger -func Init(posting *store.Store, mutation *store.Store) { +func Init(posting *store.Store, log *commit.Logger) { lmutex.Lock() defer lmutex.Unlock() lcache = make(map[uint64]*List) pstore = posting - mstore = mutation + clog = log } func Get(key []byte) *List { @@ -56,7 +57,7 @@ func Get(key []byte) *List { } list := new(List) - list.init(key, pstore, mstore) + list.init(key, pstore, clog) lcache[uid] = list return list } diff --git a/posting/types.fbs b/posting/types.fbs index 55e7edff3aaba575eb7dd2b9a0c1548e9cfdedbd..13853a15100c4d2087f7ef66c60637aeda073ac1 100644 --- a/posting/types.fbs +++ b/posting/types.fbs @@ -9,6 +9,7 @@ table Posting { } table PostingList { + commitTs:long; postings:[Posting]; } diff --git a/posting/types/PostingList.go b/posting/types/PostingList.go index c1b54045bd73ecaa82d4295b66ac96f558dbee79..28f5ce61cec27b290899f3a63dd8d24effdfb3c7 100644 --- a/posting/types/PostingList.go +++ b/posting/types/PostingList.go @@ -21,8 +21,16 @@ func (rcv *PostingList) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Pos = i } -func (rcv *PostingList) Postings(obj *Posting, j int) bool { +func (rcv *PostingList) CommitTs() int64 { o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *PostingList) Postings(obj *Posting, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { x := rcv._tab.Vector(o) x += flatbuffers.UOffsetT(j) * 4 @@ -37,15 +45,16 @@ func (rcv *PostingList) Postings(obj *Posting, j int) bool { } func (rcv *PostingList) PostingsLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { return rcv._tab.VectorLen(o) } return 0 } -func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(1) } -func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(postings), 0) } +func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(2) } +func PostingListAddCommitTs(builder *flatbuffers.Builder, commitTs int64) { builder.PrependInt64Slot(0, commitTs, 0) } +func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(postings), 0) } func PostingListStartPostingsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) } func PostingListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/posting/worker_test.go b/posting/worker_test.go index 1fe3c6c229728995a126f4a9768d35f7db19f933..25b1c2345de73b28995704e98ce796e3a3fa91ba 100644 --- a/posting/worker_test.go +++ b/posting/worker_test.go @@ -2,11 +2,12 @@ package posting import ( "fmt" + "io/ioutil" "os" "testing" "time" - "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" @@ -39,18 +40,23 @@ func check(r *task.Result, idx int, expected []uint64) error { } func TestProcessTask(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) + // logrus.SetLevel(logrus.DebugLevel) - pdir := NewStore(t) - defer os.RemoveAll(pdir) + dir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } + + defer os.RemoveAll(dir) ps := new(store.Store) - ps.Init(pdir) + ps.Init(dir) + + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.Init() + defer clog.Close() - mdir := NewStore(t) - defer os.RemoveAll(mdir) - ms := new(store.Store) - ms.Init(mdir) - Init(ps, ms) + Init(ps, clog) edge := x.DirectedEdge{ ValueId: 23, diff --git a/query/query_test.go b/query/query_test.go index 0ca291395be0bb80bf0fcc5d94e1e25668b1a69e..85671af9ef768681fd51aafc482f67e5e9fa7519 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" @@ -117,16 +118,6 @@ func TestRun(t *testing.T) { } */ -func NewStore(t *testing.T) string { - path, err := ioutil.TempDir("", "storetest_") - if err != nil { - t.Error(err) - t.Fail() - return "" - } - return path -} - func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) { if err := l.AddMutation(edge, posting.Set); err != nil { t.Error(err) @@ -201,17 +192,20 @@ func TestNewGraph(t *testing.T) { func populateGraph(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) + dir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } - pdir := NewStore(t) - defer os.RemoveAll(pdir) + defer os.RemoveAll(dir) ps := new(store.Store) - ps.Init(pdir) + ps.Init(dir) - mdir := NewStore(t) - defer os.RemoveAll(mdir) - ms := new(store.Store) - ms.Init(mdir) - posting.Init(ps, ms) + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.Init() + defer clog.Close() + posting.Init(ps, clog) // So, user we're interested in has uid: 1. // She has 4 friends: 23, 24, 25, 31, and 101 diff --git a/server/main.go b/server/main.go index 98284a9a6112df409598d07135f05b0ccc08b585..33f4f84af0d6610b13112a34c43031aa9494f9b6 100644 --- a/server/main.go +++ b/server/main.go @@ -25,6 +25,7 @@ import ( "os" "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" @@ -131,9 +132,10 @@ func main() { ps := new(store.Store) ps.Init(*postingDir) - ms := new(store.Store) - ms.Init(*mutationDir) - posting.Init(ps, ms) + clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20) + clog.Init() + defer clog.Close() + posting.Init(ps, clog) if len(*rdfData) > 0 { f, err := os.Open(*rdfData) diff --git a/server/main_test.go b/server/main_test.go index 45d30bc9b21f3ac6beaeb15401d4a9bd66f487d8..5cff1c5bf06310ba3e6d10a1fb6c4e920078fe2d 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -22,6 +22,7 @@ import ( "os" "testing" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" @@ -46,22 +47,20 @@ var q0 = ` ` func prepare() error { - pdir, err := NewStore() + dir, err := ioutil.TempDir("", "storetest_") if err != nil { return err } - defer os.RemoveAll(pdir) + + defer os.RemoveAll(dir) ps := new(store.Store) - ps.Init(pdir) + ps.Init(dir) - mdir, err := NewStore() - if err != nil { - return err - } - defer os.RemoveAll(mdir) - ms := new(store.Store) - ms.Init(mdir) - posting.Init(ps, ms) + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.Init() + defer clog.Close() + + posting.Init(ps, clog) f, err := os.Open("testdata.nq") if err != nil { @@ -73,6 +72,8 @@ func prepare() error { return err } return nil + // Even though all files would be closed and the directory deleted, + // postings would still be present in memory. } func TestQuery(t *testing.T) { diff --git a/uid/assigner_test.go b/uid/assigner_test.go index 8d9d85e4f040eeb6e6fbf13e774e4bbb68428f2c..bf6d733168366bb928bef821140c7dae30c8a3d9 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -22,33 +22,27 @@ import ( "testing" "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" ) -func NewStore(t *testing.T) string { - path, err := ioutil.TempDir("", "storetest_") - if err != nil { - t.Error(err) - t.Fail() - return "" - } - return path -} - func TestGetOrAssign(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) - pdir := NewStore(t) - defer os.RemoveAll(pdir) + dir, err := ioutil.TempDir("", "storetest_") + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dir) ps := new(store.Store) - ps.Init(pdir) + ps.Init(dir) + clog := commit.NewLogger(dir, "mutations", 50<<20) + clog.Init() + defer clog.Close() - mdir := NewStore(t) - defer os.RemoveAll(mdir) - ms := new(store.Store) - ms.Init(mdir) - posting.Init(ps, ms) + posting.Init(ps, clog) var u1, u2 uint64 {