Skip to content
Snippets Groups Projects
Commit 4f4059e0 authored by Manish R Jain's avatar Manish R Jain
Browse files

Optimized Posting List, yes! AddMutation now largely runs in O(log M + log N),...

Optimized Posting List, yes! AddMutation now largely runs in O(log M + log N), plus potential element shift for mindex. Also reduced complexity.
parent 09c9d98a
Branches
No related tags found
No related merge requests found
......@@ -19,17 +19,14 @@ package commit
import (
"bytes"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/Sirupsen/logrus"
)
func TestHandleFile(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
......@@ -253,3 +250,32 @@ func TestReadEntries(t *testing.T) {
}
}
}
func benchmarkAddLog(n int, b *testing.B) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
b.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 50<<20)
l.SyncEvery = n
l.Init()
data := make([]byte, 100)
b.ResetTimer()
ts := time.Now().UnixNano()
for i := 0; i < b.N; i++ {
end := rand.Intn(50)
if err := l.AddLog(ts+int64(i), 0, data[:50+end]); err != nil {
b.Error(err)
}
}
l.Close()
}
func BenchmarkAddLog_SyncEveryRecord(b *testing.B) { benchmarkAddLog(0, b) }
func BenchmarkAddLog_SyncEvery10Records(b *testing.B) { benchmarkAddLog(10, b) }
func BenchmarkAddLog_SyncEvery100Records(b *testing.B) { benchmarkAddLog(100, b) }
func BenchmarkAddLog_SyncEvery1000Records(b *testing.B) { benchmarkAddLog(1000, b) }
......@@ -23,9 +23,9 @@ import (
"errors"
"fmt"
"math"
"sort"
"sync"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store"
......@@ -44,24 +44,22 @@ const Del = 0x02
type MutationLink struct {
idx int
moveidx int
posting types.Posting
posting *types.Posting
}
type List struct {
sync.RWMutex
key []byte
hash uint32
buffer []byte
mutations []*types.Posting
pstore *store.Store // postinglist store
clog *commit.Logger
maxMutationTs int64 // Track maximum mutation ts
// mlayer keeps only replace instructions for the posting list.
// This works at the
mlayer map[int]types.Posting
mdelta int // Delta based on number of elements in posting list.
mindex *linked.List
key []byte
hash uint32
buffer []byte
pstore *store.Store // postinglist store
clog *commit.Logger
// Mutations
mlayer map[int]types.Posting // stores only replace instructions.
mdelta int // len(plist) + mdelta = final length.
maxMutationTs int64 // Track maximum mutation ts.
mindex []*MutationLink
}
type ByUid []*types.Posting
......@@ -216,6 +214,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
posting := types.GetRootAsPostingList(l.buffer, 0)
l.maxMutationTs = posting.CommitTs()
l.hash = farm.Fingerprint32(key)
l.mlayer = make(map[int]types.Posting)
ch := make(chan []byte, 100)
done := make(chan error)
......@@ -226,116 +225,100 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
uo := flatbuffers.GetUOffsetT(buffer)
m := new(types.Posting)
m.Init(buffer, uo)
l.mergeWithList(m)
if m.Ts() > l.maxMutationTs {
l.maxMutationTs = m.Ts()
}
glog.WithFields(logrus.Fields{
"uid": m.Uid(),
"source": string(m.Source()),
"ts": m.Ts(),
}).Debug("Got entry from log")
l.mergeMutation(m)
}
if err := <-done; err != nil {
glog.WithError(err).Error("While streaming entries.")
}
glog.Debug("Done streaming entries.")
if len(l.mutations) > 0 {
// Commit Logs are always streamed in increasing ts order.
l.maxMutationTs = l.mutations[len(l.mutations)-1].Ts()
}
l.regenerateIndex()
// l.regenerateIndex()
}
func findIndex(pl *types.PostingList, uid uint64, begin, end int) int {
if begin > end {
return -1
}
mid := (begin + end) / 2
var pmid types.Posting
if ok := pl.Postings(&pmid, mid); !ok {
return -1
// Caller must hold at least a read lock.
func (l *List) lePostingIndex(maxUid uint64) (int, uint64) {
posting := types.GetRootAsPostingList(l.buffer, 0)
left, right := 0, posting.PostingsLength()-1
sofar := -1
p := new(types.Posting)
for left <= right {
pos := (left + right) / 2
if ok := posting.Postings(p, pos); !ok {
glog.WithField("idx", pos).Fatal("Unable to parse posting from list.")
}
val := p.Uid()
if val > maxUid {
right = pos - 1
continue
}
if val == maxUid {
return pos, val
}
sofar = pos
left = pos + 1
}
if uid < pmid.Uid() {
return findIndex(pl, uid, begin, mid-1)
if sofar == -1 {
return -1, 0
}
if uid > pmid.Uid() {
return findIndex(pl, uid, mid+1, end)
if ok := posting.Postings(p, sofar); !ok {
glog.WithField("idx", sofar).Fatal("Unable to parse posting from list.")
}
return mid
}
// Caller must hold at least a read lock.
func (l *List) find(uid uint64) int {
posting := types.GetRootAsPostingList(l.buffer, 0)
return findIndex(posting, uid, 0, posting.PostingsLength())
}
// Caller must hold at least a read lock.
func (l *List) length() int {
plist := types.GetRootAsPostingList(l.buffer, 0)
return plist.PostingsLength() + l.mdelta
}
func (l *List) Length() int {
l.RLock()
defer l.RUnlock()
return l.length()
}
func (l *List) Get(p *types.Posting, i int) bool {
l.RLock()
defer l.RUnlock()
return l.get(p, i)
return sofar, p.Uid()
}
// Caller must hold at least a read lock.
func (l *List) get(p *types.Posting, i int) bool {
plist := types.GetRootAsPostingList(l.buffer, 0)
if l.mindex == nil {
return plist.Postings(p, i)
}
// Iterate over mindex, and see if we have instructions
// for the given index. Otherwise, sum up the move indexes
// uptil the given index, so we know where to look in
// mlayer and/or the main posting list.
move := 0
for e := l.mindex.Front(); e != nil; e = e.Next() {
mlink := e.Value.(*MutationLink)
if mlink.idx > i {
break
} else if mlink.idx == i {
// Found an instruction. Check what is says.
if mlink.posting.Op() == 0x01 {
// ADD
*p = mlink.posting
return true
} else if mlink.posting.Op() == 0x02 {
// DELETE
// The loop will break in the next iteration, after updating the move
// variable.
} else {
glog.Fatal("Someone, I mean you, forgot to tackle" +
" this operation. Stop drinking.")
}
func (l *List) leMutationIndex(maxUid uint64) (int, uint64) {
left, right := 0, len(l.mindex)-1
sofar := -1
for left <= right {
pos := (left + right) / 2
m := l.mindex[pos]
val := m.posting.Uid()
if val > maxUid {
right = pos - 1
continue
}
move += mlink.moveidx
if val == maxUid {
return pos, val
}
sofar = pos
left = pos + 1
}
newidx := i + move
if sofar == -1 {
return -1, 0
}
return sofar, l.mindex[sofar].posting.Uid()
}
// Check if we have any replace instruction in mlayer.
if val, ok := l.mlayer[newidx]; ok {
*p = val
return true
func (l *List) mindexInsertAt(mlink *MutationLink, mi int) {
l.mindex = append(l.mindex, nil)
copy(l.mindex[mi+1:], l.mindex[mi:])
l.mindex[mi] = mlink
for i := mi + 1; i < len(l.mindex); i++ {
l.mindex[i].idx += 1
}
// Hit the main posting list.
if newidx >= plist.PostingsLength() {
return false
}
func (l *List) mindexDeleteAt(mi int) {
glog.WithField("mi", mi).WithField("size", len(l.mindex)).
Debug("mindexDeleteAt")
l.mindex = append(l.mindex[:mi], l.mindex[mi+1:]...)
for i := mi; i < len(l.mindex); i++ {
l.mindex[i].idx -= 1
}
return plist.Postings(p, newidx)
}
// mutationIndex is useful to avoid having to parse the entire postinglist
// upto idx, for every Get(*types.Posting, idx), which has a complexity
// of O(idx). Iteration over N size posting list would this push us into
// O(N^2) territory, without this technique.
// mutationIndex (mindex) is useful to avoid having to parse the entire
// postinglist upto idx, for every Get(*types.Posting, idx), which has a
// complexity of O(idx). Iteration over N size posting list would this push
// us into O(N^2) territory, without this technique.
//
// Using this technique,
// we can overlay mutation layers over immutable posting list, to allow for
......@@ -383,125 +366,171 @@ func (l *List) get(p *types.Posting, i int) bool {
// still ensuring fast lookup access.
//
// NOTE: This function expects the caller to hold a RW Lock.
func (l *List) regenerateIndex() {
l.mindex = nil
l.mdelta = 0
l.mlayer = make(map[int]types.Posting)
plist := types.GetRootAsPostingList(l.buffer, 0)
if len(l.mutations) == 0 {
return
}
sort.Sort(ByUid(l.mutations))
mchain := linked.New()
pi := 0
pp := new(types.Posting)
if ok := plist.Postings(pp, pi); !ok {
// There's some weird padding before Posting starts. Get that padding.
padding := flatbuffers.GetUOffsetT(emptyPosting)
pp.Init(emptyPosting, padding)
if pp.Uid() != 0 {
glog.Fatal("Playing with bytes is like playing with fire." +
" Someone got burnt today!")
}
}
// The following algorithm is O(m + n), where m = number of mutations, and
// n = number of immutable postings. This could be optimized
// to O(1) with potentially more complexity. TODO: Look into that later.
l.mdelta = 0
for mi, mp := range l.mutations {
// TODO: Consider converting to binary search later.
for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ {
plist.Postings(pp, pi)
}
mlink := new(MutationLink)
mlink.posting = *mp
// Update: With mergeMutation function, we're adding mutations with a cost
// of O(log M + log N), where M = number of previous mutations, and N =
// number of postings in the immutable posting list.
func (l *List) mergeMutation(mp *types.Posting) {
curUid := mp.Uid()
pi, puid := l.lePostingIndex(curUid) // O(log N)
mi, muid := l.leMutationIndex(curUid) // O(log M)
inPlist := puid == curUid
// O(1) follows, but any additions or deletions from mindex would
// be O(M) due to element shifting. In terms of benchmarks, this performs
// a LOT better than when I was running O(N + M), re-generating mutation
// flatbuffers, linked lists etc.
mlink := new(MutationLink)
mlink.posting = mp
if mp.Op() == Del {
if muid == curUid { // curUid found in mindex.
if inPlist { // In plist, so replace previous instruction in mindex.
mlink.moveidx = 1
mlink.idx = pi + mi
l.mindex[mi] = mlink
if pp.Uid() == mp.Uid() {
if mp.Op() == Set {
// This is a replace, so store it in mlayer, instead of mindex.
// Note that mlayer index is based right off the plist.
l.mlayer[pi] = *mp
} else { // Not in plist, so delete previous instruction in mindex.
l.mdelta -= 1
l.mindexDeleteAt(mi)
}
} else if mp.Op() == Del {
// This is a delete, so move the plist index forward.
} else { // curUid not found in mindex.
if inPlist { // In plist, so insert in mindex.
mlink.moveidx = 1
l.mdelta -= 1
mlink.idx = pi + mi + 1
l.mindexInsertAt(mlink, mi+1)
} else {
glog.Fatal("This operation isn't being handled.")
// Not found in plist, and not found in mindex. So, ignore.
}
} else if mp.Uid() < pp.Uid() {
// This is an add, so move the plist index backwards.
mlink.moveidx = -1
l.mdelta += 1
} else {
// mp.Uid() > pp.Uid()
// We've crossed over from posting list. Posting List shouldn't be
// consulted in this case, so moveidx wouldn't be used. Just set it
// to zero anyways, to represent that.
mlink.moveidx = 0
l.mdelta += 1
}
mlink.idx = pi + mi
mchain.PushBack(mlink)
}
l.mindex = mchain
}
} else if mp.Op() == Set {
if muid == curUid { // curUid found in mindex.
if inPlist { // In plist, so delete previous instruction, set in mlayer.
l.mindexDeleteAt(mi)
l.mlayer[pi] = *mp
} else { // Not in plist, so replace previous set instruction in mindex.
// NOTE: This prev instruction couldn't have been a Del instruction.
mlink.idx = pi + 1 + mi
mlink.moveidx = -1
l.mindex[mi] = mlink
}
func (l *List) addIfValid(b *flatbuffers.Builder,
offsets *[]flatbuffers.UOffsetT, t x.DirectedEdge, op byte) {
} else { // curUid not found in mindex.
if inPlist { // In plist, so just set it in mlayer.
l.mlayer[pi] = *mp
if op == Del {
if fi := l.find(t.ValueId); fi >= 0 {
// Delete. Only add it to the list if it exists in the posting list.
*offsets = append(*offsets, addEdgeToPosting(b, t, op))
} else { // not in plist, not in mindex, so insert in mindex.
mlink.moveidx = -1
l.mdelta += 1
mlink.idx = pi + 1 + mi + 1 // right of pi, and right of mi.
l.mindexInsertAt(mlink, mi+1)
}
}
} else {
*offsets = append(*offsets, addEdgeToPosting(b, t, op))
glog.WithField("op", mp.Op()).Fatal("Invalid operation.")
}
}
// Assumes a lock has already been acquired.
func (l *List) mergeWithList(mpost *types.Posting) {
// If this mutation is a deletion, then check if there's a valid uid entry
// in the immutable postinglist. If not, we can ignore this mutation.
ignore := false
if mpost.Op() == Del {
if fi := l.find(mpost.Uid()); fi < 0 {
ignore = true
// Caller must hold at least a read lock.
func (l *List) length() int {
plist := types.GetRootAsPostingList(l.buffer, 0)
return plist.PostingsLength() + l.mdelta
}
func (l *List) Length() int {
l.RLock()
defer l.RUnlock()
return l.length()
}
func (l *List) Get(p *types.Posting, i int) bool {
l.RLock()
defer l.RUnlock()
return l.get(p, i)
}
// Caller must hold at least a read lock.
func (l *List) get(p *types.Posting, i int) bool {
plist := types.GetRootAsPostingList(l.buffer, 0)
if len(l.mindex) == 0 {
if val, ok := l.mlayer[i]; ok {
*p = val
return true
}
return plist.Postings(p, i)
}
// Check if we already have a mutation entry with the same uid.
// If so, ignore (in case of del)/replace it. Otherwise, append it.
handled := false
final := l.mutations[:0]
for _, mut := range l.mutations {
// mut := &l.mutations[i]
if mpost.Uid() != mut.Uid() {
final = append(final, mut)
continue
}
// Iterate over mindex, and see if we have instructions
// for the given index. Otherwise, sum up the move indexes
// uptil the given index, so we know where to look in
// mlayer and/or the main posting list.
move := 0
for _, mlink := range l.mindex {
if mlink.idx > i {
break
} else if mlink.idx == i {
// Found an instruction. Check what is says.
if mlink.posting.Op() == Set {
// ADD
glog.WithField("idx", i).
WithField("uid", mlink.posting.Uid()).
WithField("source", string(mlink.posting.Source())).
Debug("Returning from mlink")
*p = *mlink.posting
return true
} else if mlink.posting.Op() == Del {
// DELETE
// The loop will break in the next iteration, after updating the move
// variable.
handled = true
if ignore {
// Don't add to final.
} else {
final = append(final, mpost) // replaced original.
} else {
glog.Fatal("Someone, I mean you, forgot to tackle" +
" this operation. Stop drinking.")
}
}
move += mlink.moveidx
}
if handled {
l.mutations = final
} else {
l.mutations = append(l.mutations, mpost)
newidx := i + move
glog.WithFields(logrus.Fields{
"newidx": newidx,
"idx": i,
"move": move,
}).Debug("Final Indices")
// Check if we have any replace instruction in mlayer.
if val, ok := l.mlayer[newidx]; ok {
*p = val
return true
}
// Hit the main posting list.
if newidx >= plist.PostingsLength() {
return false
}
return plist.Postings(p, newidx)
}
// In benchmarks, the time taken per AddMutation before was
// plateauing at 2.5 ms with sync per 10 log entries, and increasing
// for sync per 100 log entries (to 3 ms per AddMutation), largely because
// of how index generation was being done.
//
// With this change, the benchmarks perform as good as benchmarks for
// commit.Logger, where the less frequently file sync happens, the faster
// AddMutations run.
//
// PASS
// BenchmarkAddMutations_SyncEveryLogEntry-6 100 24712455 ns/op
// BenchmarkAddMutations_SyncEvery10LogEntry-6 500 2485961 ns/op
// BenchmarkAddMutations_SyncEvery100LogEntry-6 10000 298352 ns/op
// BenchmarkAddMutations_SyncEvery1000LogEntry-6 30000 63544 ns/op
// ok github.com/dgraph-io/dgraph/posting 10.291s
func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
l.Lock()
defer l.Unlock()
......@@ -522,13 +551,22 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
if t.Value != nil {
t.ValueId = math.MaxUint64
}
if t.ValueId == 0 {
return fmt.Errorf("ValueId cannot be zero.")
}
mbuf := newPosting(t, op)
uo := flatbuffers.GetUOffsetT(mbuf)
mpost := new(types.Posting)
mpost.Init(mbuf, uo)
l.mergeWithList(mpost)
l.regenerateIndex()
glog.WithFields(logrus.Fields{
"uid": mpost.Uid(),
"source": string(mpost.Source()),
"ts": mpost.Ts(),
}).Debug("Add mutation")
l.mergeMutation(mpost)
l.maxMutationTs = t.Timestamp.UnixNano()
return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf)
}
......@@ -564,47 +602,10 @@ func remove(ll *linked.List, p *types.Posting) {
}
}
func (l *List) generateLinkedList() (*linked.List, int64) {
plist := types.GetRootAsPostingList(l.buffer, 0)
ll := linked.New()
var maxTs int64
for i := 0; i < plist.PostingsLength(); i++ {
p := new(types.Posting)
plist.Postings(p, i)
if maxTs < p.Ts() {
maxTs = p.Ts()
}
ll.PushBack(p)
}
// Now go through mutations
for _, p := range l.mutations {
if maxTs < p.Ts() {
maxTs = p.Ts()
}
if p.Op() == 0x01 {
// Set/Add
addOrSet(ll, p)
} else if p.Op() == 0x02 {
// Delete
remove(ll, p)
} else {
glog.Fatalf("Strange mutation: %+v", p)
}
}
return ll, maxTs
}
func (l *List) isDirty() bool {
l.RLock()
defer l.RUnlock()
return l.mindex != nil
return len(l.mindex)+len(l.mlayer) > 0
}
func (l *List) CommitIfDirty() error {
......@@ -618,26 +619,25 @@ func (l *List) CommitIfDirty() error {
l.Lock()
defer l.Unlock()
ll, commitTs := l.generateLinkedList()
var p types.Posting
sz := l.length()
b := flatbuffers.NewBuilder(0)
var offsets []flatbuffers.UOffsetT
for e := ll.Front(); e != nil; e = e.Next() {
p := e.Value.(*types.Posting)
off := addPosting(b, *p)
offsets = append(offsets, off)
offsets := make([]flatbuffers.UOffsetT, sz)
for i := 0; i < sz; i++ {
if ok := l.get(&p, i); !ok {
glog.WithField("idx", i).Fatal("Unable to parse posting.")
}
offsets[i] = addPosting(b, p)
}
types.PostingListStartPostingsVector(b, ll.Len())
types.PostingListStartPostingsVector(b, sz)
for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i])
}
vend := b.EndVector(ll.Len())
vend := b.EndVector(sz)
types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
types.PostingListAddCommitTs(b, commitTs)
types.PostingListAddCommitTs(b, l.maxMutationTs)
end := types.PostingListEnd(b)
b.Finish(end)
......@@ -646,9 +646,11 @@ func (l *List) CommitIfDirty() error {
glog.WithField("error", err).Errorf("While storing posting list")
return err
}
l.mutations = nil
l.regenerateIndex()
// Now reset the mutation variables.
l.mlayer = make(map[int]types.Posting)
l.mdelta = 0
l.mindex = nil
return nil
}
......
......@@ -20,6 +20,7 @@ import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"testing"
"time"
......@@ -271,6 +272,7 @@ func TestAddMutation_Value(t *testing.T) {
}
func benchmarkAddMutations(n int, b *testing.B) {
// logrus.SetLevel(logrus.DebugLevel)
var l List
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
......@@ -293,7 +295,7 @@ func benchmarkAddMutations(n int, b *testing.B) {
ts := time.Now()
for i := 0; i < b.N; i++ {
edge := x.DirectedEdge{
ValueId: uint64(i),
ValueId: uint64(rand.Intn(b.N) + 1),
Source: "testing",
Timestamp: ts.Add(time.Microsecond),
}
......@@ -314,3 +316,7 @@ func BenchmarkAddMutations_SyncEvery10LogEntry(b *testing.B) {
func BenchmarkAddMutations_SyncEvery100LogEntry(b *testing.B) {
benchmarkAddMutations(100, b)
}
func BenchmarkAddMutations_SyncEvery1000LogEntry(b *testing.B) {
benchmarkAddMutations(1000, b)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment