From 4f4059e0dd9302bba55f599fbd113f3fb0c1c428 Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Fri, 13 Nov 2015 15:12:04 +1100 Subject: [PATCH] Optimized Posting List, yes! AddMutation now largely runs in O(log M + log N), plus potential element shift for mindex. Also reduced complexity. --- commit/log_test.go | 34 ++- posting/list.go | 510 ++++++++++++++++++++++--------------------- posting/list_test.go | 8 +- 3 files changed, 293 insertions(+), 259 deletions(-) diff --git a/commit/log_test.go b/commit/log_test.go index 49145a00..777f654e 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -19,17 +19,14 @@ package commit import ( "bytes" "io/ioutil" + "math/rand" "os" "path/filepath" "testing" "time" - - "github.com/Sirupsen/logrus" ) func TestHandleFile(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) - dir, err := ioutil.TempDir("", "dgraph-log") if err != nil { t.Error(err) @@ -253,3 +250,32 @@ func TestReadEntries(t *testing.T) { } } } + +func benchmarkAddLog(n int, b *testing.B) { + dir, err := ioutil.TempDir("", "dgraph-log") + if err != nil { + b.Error(err) + return + } + defer os.RemoveAll(dir) + + l := NewLogger(dir, "dgraph", 50<<20) + l.SyncEvery = n + l.Init() + + data := make([]byte, 100) + b.ResetTimer() + ts := time.Now().UnixNano() + for i := 0; i < b.N; i++ { + end := rand.Intn(50) + if err := l.AddLog(ts+int64(i), 0, data[:50+end]); err != nil { + b.Error(err) + } + } + l.Close() +} + +func BenchmarkAddLog_SyncEveryRecord(b *testing.B) { benchmarkAddLog(0, b) } +func BenchmarkAddLog_SyncEvery10Records(b *testing.B) { benchmarkAddLog(10, b) } +func BenchmarkAddLog_SyncEvery100Records(b *testing.B) { benchmarkAddLog(100, b) } +func BenchmarkAddLog_SyncEvery1000Records(b *testing.B) { benchmarkAddLog(1000, b) } diff --git a/posting/list.go b/posting/list.go index c8972f3b..767c93af 100644 --- a/posting/list.go +++ b/posting/list.go @@ -23,9 +23,9 @@ import ( "errors" "fmt" "math" - "sort" "sync" + "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" @@ -44,24 +44,22 @@ const Del = 0x02 type MutationLink struct { idx int moveidx int - posting types.Posting + posting *types.Posting } type List struct { sync.RWMutex - key []byte - hash uint32 - buffer []byte - mutations []*types.Posting - pstore *store.Store // postinglist store - clog *commit.Logger - maxMutationTs int64 // Track maximum mutation ts - - // mlayer keeps only replace instructions for the posting list. - // This works at the - mlayer map[int]types.Posting - mdelta int // Delta based on number of elements in posting list. - mindex *linked.List + key []byte + hash uint32 + buffer []byte + pstore *store.Store // postinglist store + clog *commit.Logger + + // Mutations + mlayer map[int]types.Posting // stores only replace instructions. + mdelta int // len(plist) + mdelta = final length. + maxMutationTs int64 // Track maximum mutation ts. + mindex []*MutationLink } type ByUid []*types.Posting @@ -216,6 +214,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { posting := types.GetRootAsPostingList(l.buffer, 0) l.maxMutationTs = posting.CommitTs() l.hash = farm.Fingerprint32(key) + l.mlayer = make(map[int]types.Posting) ch := make(chan []byte, 100) done := make(chan error) @@ -226,116 +225,100 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { uo := flatbuffers.GetUOffsetT(buffer) m := new(types.Posting) m.Init(buffer, uo) - l.mergeWithList(m) + if m.Ts() > l.maxMutationTs { + l.maxMutationTs = m.Ts() + } + glog.WithFields(logrus.Fields{ + "uid": m.Uid(), + "source": string(m.Source()), + "ts": m.Ts(), + }).Debug("Got entry from log") + l.mergeMutation(m) } if err := <-done; err != nil { glog.WithError(err).Error("While streaming entries.") } glog.Debug("Done streaming entries.") - if len(l.mutations) > 0 { - // Commit Logs are always streamed in increasing ts order. - l.maxMutationTs = l.mutations[len(l.mutations)-1].Ts() - } - - l.regenerateIndex() + // l.regenerateIndex() } -func findIndex(pl *types.PostingList, uid uint64, begin, end int) int { - if begin > end { - return -1 - } - mid := (begin + end) / 2 - var pmid types.Posting - if ok := pl.Postings(&pmid, mid); !ok { - return -1 +// Caller must hold at least a read lock. +func (l *List) lePostingIndex(maxUid uint64) (int, uint64) { + posting := types.GetRootAsPostingList(l.buffer, 0) + left, right := 0, posting.PostingsLength()-1 + sofar := -1 + p := new(types.Posting) + + for left <= right { + pos := (left + right) / 2 + if ok := posting.Postings(p, pos); !ok { + glog.WithField("idx", pos).Fatal("Unable to parse posting from list.") + } + val := p.Uid() + if val > maxUid { + right = pos - 1 + continue + } + if val == maxUid { + return pos, val + } + sofar = pos + left = pos + 1 } - if uid < pmid.Uid() { - return findIndex(pl, uid, begin, mid-1) + if sofar == -1 { + return -1, 0 } - if uid > pmid.Uid() { - return findIndex(pl, uid, mid+1, end) + if ok := posting.Postings(p, sofar); !ok { + glog.WithField("idx", sofar).Fatal("Unable to parse posting from list.") } - return mid -} - -// Caller must hold at least a read lock. -func (l *List) find(uid uint64) int { - posting := types.GetRootAsPostingList(l.buffer, 0) - return findIndex(posting, uid, 0, posting.PostingsLength()) -} - -// Caller must hold at least a read lock. -func (l *List) length() int { - plist := types.GetRootAsPostingList(l.buffer, 0) - return plist.PostingsLength() + l.mdelta -} - -func (l *List) Length() int { - l.RLock() - defer l.RUnlock() - return l.length() -} - -func (l *List) Get(p *types.Posting, i int) bool { - l.RLock() - defer l.RUnlock() - return l.get(p, i) + return sofar, p.Uid() } -// Caller must hold at least a read lock. -func (l *List) get(p *types.Posting, i int) bool { - plist := types.GetRootAsPostingList(l.buffer, 0) - if l.mindex == nil { - return plist.Postings(p, i) - } - - // Iterate over mindex, and see if we have instructions - // for the given index. Otherwise, sum up the move indexes - // uptil the given index, so we know where to look in - // mlayer and/or the main posting list. - move := 0 - for e := l.mindex.Front(); e != nil; e = e.Next() { - mlink := e.Value.(*MutationLink) - if mlink.idx > i { - break - - } else if mlink.idx == i { - // Found an instruction. Check what is says. - if mlink.posting.Op() == 0x01 { - // ADD - *p = mlink.posting - return true - - } else if mlink.posting.Op() == 0x02 { - // DELETE - // The loop will break in the next iteration, after updating the move - // variable. - - } else { - glog.Fatal("Someone, I mean you, forgot to tackle" + - " this operation. Stop drinking.") - } +func (l *List) leMutationIndex(maxUid uint64) (int, uint64) { + left, right := 0, len(l.mindex)-1 + sofar := -1 + for left <= right { + pos := (left + right) / 2 + m := l.mindex[pos] + val := m.posting.Uid() + if val > maxUid { + right = pos - 1 + continue } - move += mlink.moveidx + if val == maxUid { + return pos, val + } + sofar = pos + left = pos + 1 } - newidx := i + move + if sofar == -1 { + return -1, 0 + } + return sofar, l.mindex[sofar].posting.Uid() +} - // Check if we have any replace instruction in mlayer. - if val, ok := l.mlayer[newidx]; ok { - *p = val - return true +func (l *List) mindexInsertAt(mlink *MutationLink, mi int) { + l.mindex = append(l.mindex, nil) + copy(l.mindex[mi+1:], l.mindex[mi:]) + l.mindex[mi] = mlink + for i := mi + 1; i < len(l.mindex); i++ { + l.mindex[i].idx += 1 } - // Hit the main posting list. - if newidx >= plist.PostingsLength() { - return false +} + +func (l *List) mindexDeleteAt(mi int) { + glog.WithField("mi", mi).WithField("size", len(l.mindex)). + Debug("mindexDeleteAt") + l.mindex = append(l.mindex[:mi], l.mindex[mi+1:]...) + for i := mi; i < len(l.mindex); i++ { + l.mindex[i].idx -= 1 } - return plist.Postings(p, newidx) } -// mutationIndex is useful to avoid having to parse the entire postinglist -// upto idx, for every Get(*types.Posting, idx), which has a complexity -// of O(idx). Iteration over N size posting list would this push us into -// O(N^2) territory, without this technique. +// mutationIndex (mindex) is useful to avoid having to parse the entire +// postinglist upto idx, for every Get(*types.Posting, idx), which has a +// complexity of O(idx). Iteration over N size posting list would this push +// us into O(N^2) territory, without this technique. // // Using this technique, // we can overlay mutation layers over immutable posting list, to allow for @@ -383,125 +366,171 @@ func (l *List) get(p *types.Posting, i int) bool { // still ensuring fast lookup access. // // NOTE: This function expects the caller to hold a RW Lock. -func (l *List) regenerateIndex() { - l.mindex = nil - l.mdelta = 0 - l.mlayer = make(map[int]types.Posting) - plist := types.GetRootAsPostingList(l.buffer, 0) - if len(l.mutations) == 0 { - return - } - sort.Sort(ByUid(l.mutations)) - - mchain := linked.New() - pi := 0 - pp := new(types.Posting) - if ok := plist.Postings(pp, pi); !ok { - // There's some weird padding before Posting starts. Get that padding. - padding := flatbuffers.GetUOffsetT(emptyPosting) - pp.Init(emptyPosting, padding) - if pp.Uid() != 0 { - glog.Fatal("Playing with bytes is like playing with fire." + - " Someone got burnt today!") - } - } - - // The following algorithm is O(m + n), where m = number of mutations, and - // n = number of immutable postings. This could be optimized - // to O(1) with potentially more complexity. TODO: Look into that later. - l.mdelta = 0 - for mi, mp := range l.mutations { - // TODO: Consider converting to binary search later. - for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ { - plist.Postings(pp, pi) - } - - mlink := new(MutationLink) - mlink.posting = *mp +// Update: With mergeMutation function, we're adding mutations with a cost +// of O(log M + log N), where M = number of previous mutations, and N = +// number of postings in the immutable posting list. +func (l *List) mergeMutation(mp *types.Posting) { + curUid := mp.Uid() + pi, puid := l.lePostingIndex(curUid) // O(log N) + mi, muid := l.leMutationIndex(curUid) // O(log M) + inPlist := puid == curUid + + // O(1) follows, but any additions or deletions from mindex would + // be O(M) due to element shifting. In terms of benchmarks, this performs + // a LOT better than when I was running O(N + M), re-generating mutation + // flatbuffers, linked lists etc. + mlink := new(MutationLink) + mlink.posting = mp + + if mp.Op() == Del { + if muid == curUid { // curUid found in mindex. + if inPlist { // In plist, so replace previous instruction in mindex. + mlink.moveidx = 1 + mlink.idx = pi + mi + l.mindex[mi] = mlink - if pp.Uid() == mp.Uid() { - if mp.Op() == Set { - // This is a replace, so store it in mlayer, instead of mindex. - // Note that mlayer index is based right off the plist. - l.mlayer[pi] = *mp + } else { // Not in plist, so delete previous instruction in mindex. + l.mdelta -= 1 + l.mindexDeleteAt(mi) + } - } else if mp.Op() == Del { - // This is a delete, so move the plist index forward. + } else { // curUid not found in mindex. + if inPlist { // In plist, so insert in mindex. mlink.moveidx = 1 l.mdelta -= 1 + mlink.idx = pi + mi + 1 + l.mindexInsertAt(mlink, mi+1) } else { - glog.Fatal("This operation isn't being handled.") + // Not found in plist, and not found in mindex. So, ignore. } - } else if mp.Uid() < pp.Uid() { - // This is an add, so move the plist index backwards. - mlink.moveidx = -1 - l.mdelta += 1 - - } else { - // mp.Uid() > pp.Uid() - // We've crossed over from posting list. Posting List shouldn't be - // consulted in this case, so moveidx wouldn't be used. Just set it - // to zero anyways, to represent that. - mlink.moveidx = 0 - l.mdelta += 1 } - mlink.idx = pi + mi - mchain.PushBack(mlink) - } - l.mindex = mchain -} + } else if mp.Op() == Set { + if muid == curUid { // curUid found in mindex. + if inPlist { // In plist, so delete previous instruction, set in mlayer. + l.mindexDeleteAt(mi) + l.mlayer[pi] = *mp + + } else { // Not in plist, so replace previous set instruction in mindex. + // NOTE: This prev instruction couldn't have been a Del instruction. + mlink.idx = pi + 1 + mi + mlink.moveidx = -1 + l.mindex[mi] = mlink + } -func (l *List) addIfValid(b *flatbuffers.Builder, - offsets *[]flatbuffers.UOffsetT, t x.DirectedEdge, op byte) { + } else { // curUid not found in mindex. + if inPlist { // In plist, so just set it in mlayer. + l.mlayer[pi] = *mp - if op == Del { - if fi := l.find(t.ValueId); fi >= 0 { - // Delete. Only add it to the list if it exists in the posting list. - *offsets = append(*offsets, addEdgeToPosting(b, t, op)) + } else { // not in plist, not in mindex, so insert in mindex. + mlink.moveidx = -1 + l.mdelta += 1 + mlink.idx = pi + 1 + mi + 1 // right of pi, and right of mi. + l.mindexInsertAt(mlink, mi+1) + } } + } else { - *offsets = append(*offsets, addEdgeToPosting(b, t, op)) + glog.WithField("op", mp.Op()).Fatal("Invalid operation.") } } -// Assumes a lock has already been acquired. -func (l *List) mergeWithList(mpost *types.Posting) { - // If this mutation is a deletion, then check if there's a valid uid entry - // in the immutable postinglist. If not, we can ignore this mutation. - ignore := false - if mpost.Op() == Del { - if fi := l.find(mpost.Uid()); fi < 0 { - ignore = true +// Caller must hold at least a read lock. +func (l *List) length() int { + plist := types.GetRootAsPostingList(l.buffer, 0) + return plist.PostingsLength() + l.mdelta +} + +func (l *List) Length() int { + l.RLock() + defer l.RUnlock() + return l.length() +} + +func (l *List) Get(p *types.Posting, i int) bool { + l.RLock() + defer l.RUnlock() + return l.get(p, i) +} + +// Caller must hold at least a read lock. +func (l *List) get(p *types.Posting, i int) bool { + plist := types.GetRootAsPostingList(l.buffer, 0) + if len(l.mindex) == 0 { + if val, ok := l.mlayer[i]; ok { + *p = val + return true } + return plist.Postings(p, i) } - // Check if we already have a mutation entry with the same uid. - // If so, ignore (in case of del)/replace it. Otherwise, append it. - handled := false - final := l.mutations[:0] - for _, mut := range l.mutations { - // mut := &l.mutations[i] - if mpost.Uid() != mut.Uid() { - final = append(final, mut) - continue - } + // Iterate over mindex, and see if we have instructions + // for the given index. Otherwise, sum up the move indexes + // uptil the given index, so we know where to look in + // mlayer and/or the main posting list. + move := 0 + for _, mlink := range l.mindex { + if mlink.idx > i { + break + + } else if mlink.idx == i { + // Found an instruction. Check what is says. + if mlink.posting.Op() == Set { + // ADD + glog.WithField("idx", i). + WithField("uid", mlink.posting.Uid()). + WithField("source", string(mlink.posting.Source())). + Debug("Returning from mlink") + *p = *mlink.posting + return true + + } else if mlink.posting.Op() == Del { + // DELETE + // The loop will break in the next iteration, after updating the move + // variable. - handled = true - if ignore { - // Don't add to final. - } else { - final = append(final, mpost) // replaced original. + } else { + glog.Fatal("Someone, I mean you, forgot to tackle" + + " this operation. Stop drinking.") + } } + move += mlink.moveidx } - if handled { - l.mutations = final - } else { - l.mutations = append(l.mutations, mpost) + newidx := i + move + glog.WithFields(logrus.Fields{ + "newidx": newidx, + "idx": i, + "move": move, + }).Debug("Final Indices") + + // Check if we have any replace instruction in mlayer. + if val, ok := l.mlayer[newidx]; ok { + *p = val + return true } + // Hit the main posting list. + if newidx >= plist.PostingsLength() { + return false + } + return plist.Postings(p, newidx) } +// In benchmarks, the time taken per AddMutation before was +// plateauing at 2.5 ms with sync per 10 log entries, and increasing +// for sync per 100 log entries (to 3 ms per AddMutation), largely because +// of how index generation was being done. +// +// With this change, the benchmarks perform as good as benchmarks for +// commit.Logger, where the less frequently file sync happens, the faster +// AddMutations run. +// +// PASS +// BenchmarkAddMutations_SyncEveryLogEntry-6 100 24712455 ns/op +// BenchmarkAddMutations_SyncEvery10LogEntry-6 500 2485961 ns/op +// BenchmarkAddMutations_SyncEvery100LogEntry-6 10000 298352 ns/op +// BenchmarkAddMutations_SyncEvery1000LogEntry-6 30000 63544 ns/op +// ok github.com/dgraph-io/dgraph/posting 10.291s func (l *List) AddMutation(t x.DirectedEdge, op byte) error { l.Lock() defer l.Unlock() @@ -522,13 +551,22 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { if t.Value != nil { t.ValueId = math.MaxUint64 } + if t.ValueId == 0 { + return fmt.Errorf("ValueId cannot be zero.") + } + mbuf := newPosting(t, op) uo := flatbuffers.GetUOffsetT(mbuf) mpost := new(types.Posting) mpost.Init(mbuf, uo) - l.mergeWithList(mpost) - l.regenerateIndex() + glog.WithFields(logrus.Fields{ + "uid": mpost.Uid(), + "source": string(mpost.Source()), + "ts": mpost.Ts(), + }).Debug("Add mutation") + + l.mergeMutation(mpost) l.maxMutationTs = t.Timestamp.UnixNano() return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf) } @@ -564,47 +602,10 @@ func remove(ll *linked.List, p *types.Posting) { } } -func (l *List) generateLinkedList() (*linked.List, int64) { - plist := types.GetRootAsPostingList(l.buffer, 0) - ll := linked.New() - - var maxTs int64 - for i := 0; i < plist.PostingsLength(); i++ { - p := new(types.Posting) - plist.Postings(p, i) - if maxTs < p.Ts() { - maxTs = p.Ts() - } - - ll.PushBack(p) - } - - // Now go through mutations - for _, p := range l.mutations { - if maxTs < p.Ts() { - maxTs = p.Ts() - } - - if p.Op() == 0x01 { - // Set/Add - addOrSet(ll, p) - - } else if p.Op() == 0x02 { - // Delete - remove(ll, p) - - } else { - glog.Fatalf("Strange mutation: %+v", p) - } - } - - return ll, maxTs -} - func (l *List) isDirty() bool { l.RLock() defer l.RUnlock() - return l.mindex != nil + return len(l.mindex)+len(l.mlayer) > 0 } func (l *List) CommitIfDirty() error { @@ -618,26 +619,25 @@ func (l *List) CommitIfDirty() error { l.Lock() defer l.Unlock() - ll, commitTs := l.generateLinkedList() - + var p types.Posting + sz := l.length() b := flatbuffers.NewBuilder(0) - - var offsets []flatbuffers.UOffsetT - for e := ll.Front(); e != nil; e = e.Next() { - p := e.Value.(*types.Posting) - off := addPosting(b, *p) - offsets = append(offsets, off) + offsets := make([]flatbuffers.UOffsetT, sz) + for i := 0; i < sz; i++ { + if ok := l.get(&p, i); !ok { + glog.WithField("idx", i).Fatal("Unable to parse posting.") + } + offsets[i] = addPosting(b, p) } - - types.PostingListStartPostingsVector(b, ll.Len()) + types.PostingListStartPostingsVector(b, sz) for i := len(offsets) - 1; i >= 0; i-- { b.PrependUOffsetT(offsets[i]) } - vend := b.EndVector(ll.Len()) + vend := b.EndVector(sz) types.PostingListStart(b) types.PostingListAddPostings(b, vend) - types.PostingListAddCommitTs(b, commitTs) + types.PostingListAddCommitTs(b, l.maxMutationTs) end := types.PostingListEnd(b) b.Finish(end) @@ -646,9 +646,11 @@ func (l *List) CommitIfDirty() error { glog.WithField("error", err).Errorf("While storing posting list") return err } - l.mutations = nil - l.regenerateIndex() + // Now reset the mutation variables. + l.mlayer = make(map[int]types.Posting) + l.mdelta = 0 + l.mindex = nil return nil } diff --git a/posting/list_test.go b/posting/list_test.go index bf54c628..0ed2ee6b 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" "os" "testing" "time" @@ -271,6 +272,7 @@ func TestAddMutation_Value(t *testing.T) { } func benchmarkAddMutations(n int, b *testing.B) { + // logrus.SetLevel(logrus.DebugLevel) var l List key := Key(1, "name") dir, err := ioutil.TempDir("", "storetest_") @@ -293,7 +295,7 @@ func benchmarkAddMutations(n int, b *testing.B) { ts := time.Now() for i := 0; i < b.N; i++ { edge := x.DirectedEdge{ - ValueId: uint64(i), + ValueId: uint64(rand.Intn(b.N) + 1), Source: "testing", Timestamp: ts.Add(time.Microsecond), } @@ -314,3 +316,7 @@ func BenchmarkAddMutations_SyncEvery10LogEntry(b *testing.B) { func BenchmarkAddMutations_SyncEvery100LogEntry(b *testing.B) { benchmarkAddMutations(100, b) } + +func BenchmarkAddMutations_SyncEvery1000LogEntry(b *testing.B) { + benchmarkAddMutations(1000, b) +} -- GitLab