Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
Manish R Jain
committed
"fmt"
Manish R Jain
committed
"time"
Manish R Jain
committed
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
const Set = 0x01
const Del = 0x02
Manish R Jain
committed
type MutationLink struct {
idx int
moveidx int
Manish R Jain
committed
posting *types.Posting
Manish R Jain
committed
}
Manish R Jain
committed
key []byte
hash uint32
buffer []byte
pstore *store.Store // postinglist store
clog *commit.Logger
lastCompact time.Time
Manish R Jain
committed
wg sync.WaitGroup
Manish R Jain
committed
// Mutations
mlayer map[int]types.Posting // stores only replace instructions.
mdelta int // len(plist) + mdelta = final length.
maxMutationTs int64 // Track maximum mutation ts.
mindex []*MutationLink
Manish R Jain
committed
func NewList() *List {
l := new(List)
l.wg.Add(1)
l.mlayer = make(map[int]types.Posting)
return l
}
Manish R Jain
committed
type ByUid []*types.Posting
func (pa ByUid) Len() int { return len(pa) }
func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] }
func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }
Manish R Jain
committed
// key = (entity uid, attribute)
func Key(uid uint64, attr string) []byte {
buf := new(bytes.Buffer)
buf.WriteString(attr)
Manish R Jain
committed
if err := binary.Write(buf, binary.LittleEndian, uid); err != nil {
glog.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid)
}
return buf.Bytes()
}
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
func newPosting(t x.DirectedEdge, op byte) []byte {
b := flatbuffers.NewBuilder(0)
var bo flatbuffers.UOffsetT
if t.Value != nil {
if t.ValueId != math.MaxUint64 {
glog.Fatal("This should have already been set by the caller.")
}
bytes, err := json.Marshal(t.Value)
if err != nil {
glog.WithError(err).Fatal("Unable to marshal value")
return []byte{}
}
bo = b.CreateByteVector(bytes)
}
so := b.CreateString(t.Source)
types.PostingStart(b)
if bo > 0 {
types.PostingAddValue(b, bo)
}
types.PostingAddUid(b, t.ValueId)
types.PostingAddSource(b, so)
types.PostingAddTs(b, t.Timestamp.UnixNano())
types.PostingAddOp(b, op)
vend := types.PostingEnd(b)
b.Finish(vend)
return b.Bytes[b.Head():]
}
func addEdgeToPosting(b *flatbuffers.Builder,
t x.DirectedEdge, op byte) flatbuffers.UOffsetT {
var bo flatbuffers.UOffsetT
if t.Value != nil {
if t.ValueId != math.MaxUint64 {
glog.Fatal("This should have already been set by the caller.")
}
glog.WithError(err).Fatal("Unable to marshal value")
return 0
}
so := b.CreateString(t.Source) // Do this before posting start.
if bo > 0 {
types.PostingAddValue(b, bo)
}
types.PostingAddUid(b, t.ValueId)
types.PostingAddSource(b, so)
types.PostingAddTs(b, t.Timestamp.UnixNano())
types.PostingAddOp(b, op)
return types.PostingEnd(b)
}
func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
so := b.CreateByteString(p.Source()) // Do this before posting start.
var bo flatbuffers.UOffsetT
if p.ValueLength() > 0 {
bo = b.CreateByteVector(p.ValueBytes())
}
types.PostingStart(b)
types.PostingAddUid(b, p.Uid())
if bo > 0 {
types.PostingAddValue(b, bo)
}
types.PostingAddSource(b, so)
types.PostingAddTs(b, p.Ts())
types.PostingAddOp(b, p.Op())
return types.PostingEnd(b)
}
var empty []byte
Manish R Jain
committed
// package level init
func init() {
{
b := flatbuffers.NewBuilder(0)
types.PostingListStart(b)
of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
}
{
b := flatbuffers.NewBuilder(0)
types.PostingStart(b)
types.PostingAddUid(b, 0)
of := types.PostingEnd(b)
b.Finish(of)
emptyPosting = b.Bytes[b.Head():]
}
glog.Infof("Empty size: [%d] EmptyPosting size: [%d]",
Manish R Jain
committed
}
if len(value) == 0 {
return errors.New("No value found in posting")
}
if len(value) == 1 && value[0] == 0x00 {
i = nil
return nil
}
return json.Unmarshal(value, i)
}
func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
l.Lock()
defer l.Unlock()
Manish R Jain
committed
defer l.wg.Done()
if len(empty) == 0 {
glog.Fatal("empty should have some bytes.")
Manish R Jain
committed
}
l.key = key
l.pstore = pstore
Manish R Jain
committed
var err error
if l.buffer, err = pstore.Get(key); err != nil {
// glog.Debugf("While retrieving posting list from db: %v\n", err)
// Error. Just set to empty.
Manish R Jain
committed
l.buffer = make([]byte, len(empty))
copy(l.buffer, empty)
}
posting := types.GetRootAsPostingList(l.buffer, 0)
l.maxMutationTs = posting.CommitTs()
l.hash = farm.Fingerprint32(key)
Manish R Jain
committed
l.mlayer = make(map[int]types.Posting)
ch := make(chan []byte, 100)
done := make(chan error)
glog.Debug("Starting stream entries...")
go clog.StreamEntries(posting.CommitTs()+1, l.hash, ch, done)
for buffer := range ch {
uo := flatbuffers.GetUOffsetT(buffer)
m := new(types.Posting)
m.Init(buffer, uo)
Manish R Jain
committed
if m.Ts() > l.maxMutationTs {
l.maxMutationTs = m.Ts()
}
glog.WithFields(logrus.Fields{
"uid": m.Uid(),
"source": string(m.Source()),
"ts": m.Ts(),
}).Debug("Got entry from log")
l.mergeMutation(m)
}
if err := <-done; err != nil {
glog.WithError(err).Error("While streaming entries.")
}
glog.Debug("Done streaming entries.")
Manish R Jain
committed
// l.regenerateIndex()
Manish R Jain
committed
// Caller must hold at least a read lock.
func (l *List) lePostingIndex(maxUid uint64) (int, uint64) {
posting := types.GetRootAsPostingList(l.buffer, 0)
left, right := 0, posting.PostingsLength()-1
sofar := -1
p := new(types.Posting)
for left <= right {
pos := (left + right) / 2
if ok := posting.Postings(p, pos); !ok {
glog.WithField("idx", pos).Fatal("Unable to parse posting from list.")
}
val := p.Uid()
if val > maxUid {
right = pos - 1
continue
}
if val == maxUid {
return pos, val
}
sofar = pos
left = pos + 1
Manish R Jain
committed
if sofar == -1 {
return -1, 0
Manish R Jain
committed
if ok := posting.Postings(p, sofar); !ok {
glog.WithField("idx", sofar).Fatal("Unable to parse posting from list.")
Manish R Jain
committed
return sofar, p.Uid()
Manish R Jain
committed
}
Manish R Jain
committed
Manish R Jain
committed
func (l *List) leMutationIndex(maxUid uint64) (int, uint64) {
left, right := 0, len(l.mindex)-1
sofar := -1
for left <= right {
pos := (left + right) / 2
m := l.mindex[pos]
val := m.posting.Uid()
if val > maxUid {
right = pos - 1
continue
Manish R Jain
committed
}
Manish R Jain
committed
if val == maxUid {
return pos, val
}
sofar = pos
left = pos + 1
Manish R Jain
committed
if sofar == -1 {
return -1, 0
}
return sofar, l.mindex[sofar].posting.Uid()
}
Manish R Jain
committed
func (l *List) mindexInsertAt(mlink *MutationLink, mi int) {
l.mindex = append(l.mindex, nil)
copy(l.mindex[mi+1:], l.mindex[mi:])
l.mindex[mi] = mlink
for i := mi + 1; i < len(l.mindex); i++ {
l.mindex[i].idx += 1
Manish R Jain
committed
}
func (l *List) mindexDeleteAt(mi int) {
glog.WithField("mi", mi).WithField("size", len(l.mindex)).
Debug("mindexDeleteAt")
l.mindex = append(l.mindex[:mi], l.mindex[mi+1:]...)
for i := mi; i < len(l.mindex); i++ {
l.mindex[i].idx -= 1
Manish R Jain
committed
}
}
Manish R Jain
committed
// mutationIndex (mindex) is useful to avoid having to parse the entire
// postinglist upto idx, for every Get(*types.Posting, idx), which has a
// complexity of O(idx). Iteration over N size posting list would this push
// us into O(N^2) territory, without this technique.
//
// Using this technique,
// we can overlay mutation layers over immutable posting list, to allow for
// O(m) lookups, where m = size of mutation list. Obviously, the size of
// mutation list should be much smaller than the size of posting list, except
// in tiny posting lists, where performance wouldn't be such a concern anyways.
//
// Say we have this data:
// Posting List (plist, immutable):
// idx: 0 1 2 3 4 5
// value: 2 5 9 10 13 15
//
// Mutation List (mlist):
// idx: 0 1 2
// value: 7 10 13' // posting uid is 13 but other values vary.
// Op: SET DEL SET
// Effective: ADD DEL REP (REP = replace)
//
// ----------------------------------------------------------------------------
// regenerateIndex would generate these:
// mlayer (layer just above posting list contains only replace instructions)
// idx: 4
// value: 13'
// Op: SET
// Effective: REP (REP = replace)
//
// mindex:
// idx: 2 4
// value: 7 10
// moveidx: -1 +1
// Effective: ADD DEL
//
// Now, let's see how the access would work:
// idx: get --> calculation [idx, served from, value]
// idx: 0 --> 0 [0, plist, 2]
// idx: 1 --> 1 [1, plist, 5]
// idx: 2 --> ADD from mindex
// --> [2, mindex, 7] // also has moveidx = -1
// idx: 3 --> 3 + moveidx=-1 = 2 [2, plist, 9]
// idx: 4 --> DEL from mindex
// --> 4 + moveidx=-1 + moveidx=+1 = 4 [4, mlayer, 13']
// idx: 5 --> 5 + moveidx=-1 + moveidx=+1 = 5 [5, plist, 15]
//
// Thus we can provide mutation layers over immutable posting list, while
// still ensuring fast lookup access.
//
// NOTE: This function expects the caller to hold a RW Lock.
Manish R Jain
committed
// Update: With mergeMutation function, we're adding mutations with a cost
// of O(log M + log N), where M = number of previous mutations, and N =
// number of postings in the immutable posting list.
func (l *List) mergeMutation(mp *types.Posting) {
curUid := mp.Uid()
pi, puid := l.lePostingIndex(curUid) // O(log N)
mi, muid := l.leMutationIndex(curUid) // O(log M)
inPlist := puid == curUid
// O(1) follows, but any additions or deletions from mindex would
// be O(M) due to element shifting. In terms of benchmarks, this performs
// a LOT better than when I was running O(N + M), re-generating mutation
// flatbuffers, linked lists etc.
mlink := new(MutationLink)
mlink.posting = mp
if mp.Op() == Del {
if muid == curUid { // curUid found in mindex.
if inPlist { // In plist, so replace previous instruction in mindex.
mlink.moveidx = 1
mlink.idx = pi + mi
l.mindex[mi] = mlink
Manish R Jain
committed
} else { // Not in plist, so delete previous instruction in mindex.
l.mdelta -= 1
l.mindexDeleteAt(mi)
}
Manish R Jain
committed
} else { // curUid not found in mindex.
if inPlist { // In plist, so insert in mindex.
mlink.moveidx = 1
Manish R Jain
committed
mlink.idx = pi + mi + 1
l.mindexInsertAt(mlink, mi+1)
} else {
Manish R Jain
committed
// Not found in plist, and not found in mindex. So, ignore.
}
}
Manish R Jain
committed
} else if mp.Op() == Set {
if muid == curUid { // curUid found in mindex.
if inPlist { // In plist, so delete previous instruction, set in mlayer.
l.mindexDeleteAt(mi)
l.mlayer[pi] = *mp
} else { // Not in plist, so replace previous set instruction in mindex.
// NOTE: This prev instruction couldn't have been a Del instruction.
mlink.idx = pi + 1 + mi
mlink.moveidx = -1
l.mindex[mi] = mlink
}
Manish R Jain
committed
} else { // curUid not found in mindex.
if inPlist { // In plist, so just set it in mlayer.
l.mlayer[pi] = *mp
Manish R Jain
committed
} else { // not in plist, not in mindex, so insert in mindex.
mlink.moveidx = -1
l.mdelta += 1
mlink.idx = pi + 1 + mi + 1 // right of pi, and right of mi.
l.mindexInsertAt(mlink, mi+1)
}
Manish R Jain
committed
Manish R Jain
committed
glog.WithField("op", mp.Op()).Fatal("Invalid operation.")
Manish R Jain
committed
// Caller must hold at least a read lock.
func (l *List) length() int {
plist := types.GetRootAsPostingList(l.buffer, 0)
return plist.PostingsLength() + l.mdelta
}
func (l *List) Length() int {
Manish R Jain
committed
l.wg.Wait()
Manish R Jain
committed
l.RLock()
defer l.RUnlock()
return l.length()
}
func (l *List) Get(p *types.Posting, i int) bool {
Manish R Jain
committed
l.wg.Wait()
Manish R Jain
committed
l.RLock()
defer l.RUnlock()
return l.get(p, i)
}
// Caller must hold at least a read lock.
func (l *List) get(p *types.Posting, i int) bool {
plist := types.GetRootAsPostingList(l.buffer, 0)
if len(l.mindex) == 0 {
if val, ok := l.mlayer[i]; ok {
*p = val
return true
Manish R Jain
committed
return plist.Postings(p, i)
Manish R Jain
committed
// Iterate over mindex, and see if we have instructions
// for the given index. Otherwise, sum up the move indexes
// uptil the given index, so we know where to look in
// mlayer and/or the main posting list.
move := 0
for _, mlink := range l.mindex {
if mlink.idx > i {
break
} else if mlink.idx == i {
// Found an instruction. Check what is says.
if mlink.posting.Op() == Set {
// ADD
glog.WithField("idx", i).
WithField("uid", mlink.posting.Uid()).
WithField("source", string(mlink.posting.Source())).
Debug("Returning from mlink")
*p = *mlink.posting
return true
} else if mlink.posting.Op() == Del {
// DELETE
// The loop will break in the next iteration, after updating the move
// variable.
Manish R Jain
committed
} else {
glog.Fatal("Someone, I mean you, forgot to tackle" +
" this operation. Stop drinking.")
}
Manish R Jain
committed
move += mlink.moveidx
Manish R Jain
committed
newidx := i + move
glog.WithFields(logrus.Fields{
"newidx": newidx,
"idx": i,
"move": move,
}).Debug("Final Indices")
// Check if we have any replace instruction in mlayer.
if val, ok := l.mlayer[newidx]; ok {
*p = val
return true
Manish R Jain
committed
// Hit the main posting list.
if newidx >= plist.PostingsLength() {
return false
}
return plist.Postings(p, newidx)
Manish R Jain
committed
// In benchmarks, the time taken per AddMutation before was
// plateauing at 2.5 ms with sync per 10 log entries, and increasing
// for sync per 100 log entries (to 3 ms per AddMutation), largely because
// of how index generation was being done.
//
// With this change, the benchmarks perform as good as benchmarks for
// commit.Logger, where the less frequently file sync happens, the faster
// AddMutations run.
//
// PASS
// BenchmarkAddMutations_SyncEveryLogEntry-6 100 24712455 ns/op
// BenchmarkAddMutations_SyncEvery10LogEntry-6 500 2485961 ns/op
// BenchmarkAddMutations_SyncEvery100LogEntry-6 10000 298352 ns/op
// BenchmarkAddMutations_SyncEvery1000LogEntry-6 30000 63544 ns/op
// ok github.com/dgraph-io/dgraph/posting 10.291s
func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
Manish R Jain
committed
l.wg.Wait()
l.Lock()
defer l.Unlock()
if t.Timestamp.UnixNano() < l.maxMutationTs {
return fmt.Errorf("Mutation ts lower than committed ts.")
}
// Mutation arrives:
// - Check if we had any(SET/DEL) before this, stored in the mutation list.
// - If yes, then replace that mutation. Jump to a)
// a) check if the entity exists in main posting list.
// - If yes, store the mutation.
// - If no, disregard this mutation.
// All edges with a value set, have the same uid. In other words,
// an (entity, attribute) can only have one interface{} value.
if t.Value != nil {
t.ValueId = math.MaxUint64
}
Manish R Jain
committed
if t.ValueId == 0 {
return fmt.Errorf("ValueId cannot be zero.")
}
mbuf := newPosting(t, op)
uo := flatbuffers.GetUOffsetT(mbuf)
mpost := new(types.Posting)
mpost.Init(mbuf, uo)
Manish R Jain
committed
glog.WithFields(logrus.Fields{
"uid": mpost.Uid(),
"source": string(mpost.Source()),
"ts": mpost.Ts(),
}).Debug("Add mutation")
l.mergeMutation(mpost)
l.maxMutationTs = t.Timestamp.UnixNano()
return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf)
Manish R Jain
committed
func (l *List) IsDirty() bool {
Manish R Jain
committed
// We can avoid checking for init here.
l.RLock()
defer l.RUnlock()
Manish R Jain
committed
return len(l.mindex)+len(l.mlayer) > 0
}
Manish R Jain
committed
func (l *List) MergeIfDirty() error {
Manish R Jain
committed
if !l.IsDirty() {
glog.WithField("dirty", false).Debug("Not Committing")
return nil
glog.WithField("dirty", true).Debug("Committing")
Manish R Jain
committed
return l.merge()
Manish R Jain
committed
}
Manish R Jain
committed
func (l *List) merge() error {
Manish R Jain
committed
l.wg.Wait()
l.Lock()
defer l.Unlock()
Manish R Jain
committed
var p types.Posting
sz := l.length()
b := flatbuffers.NewBuilder(0)
Manish R Jain
committed
offsets := make([]flatbuffers.UOffsetT, sz)
for i := 0; i < sz; i++ {
if ok := l.get(&p, i); !ok {
glog.WithField("idx", i).Fatal("Unable to parse posting.")
}
offsets[i] = addPosting(b, p)
Manish R Jain
committed
types.PostingListStartPostingsVector(b, sz)
for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i])
}
Manish R Jain
committed
vend := b.EndVector(sz)
types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
Manish R Jain
committed
types.PostingListAddCommitTs(b, l.maxMutationTs)
end := types.PostingListEnd(b)
b.Finish(end)
l.buffer = b.Bytes[b.Head():]
Manish R Jain
committed
if err := l.pstore.SetOne(l.key, l.buffer); err != nil {
glog.WithField("error", err).Errorf("While storing posting list")
Manish R Jain
committed
return err
}
Manish R Jain
committed
// Now reset the mutation variables.
Manish R Jain
committed
l.lastCompact = time.Now()
Manish R Jain
committed
l.mlayer = make(map[int]types.Posting)
l.mdelta = 0
l.mindex = nil
Manish R Jain
committed
return nil
Manish R Jain
committed
Manish R Jain
committed
func (l *List) LastCompactionTs() time.Time {
l.RLock()
defer l.RUnlock()
return l.lastCompact
}
Manish R Jain
committed
l.wg.Wait()
l.RLock()
defer l.RUnlock()
Manish R Jain
committed
result := make([]uint64, l.length())
result = result[:0]
Manish R Jain
committed
var p types.Posting
for i := 0; i < l.length(); i++ {
if ok := l.get(&p, i); !ok || p.Uid() == math.MaxUint64 {
break
}
Manish R Jain
committed
}
Manish R Jain
committed
}
func (l *List) Value() (result []byte, rerr error) {
Manish R Jain
committed
l.wg.Wait()
l.RLock()
defer l.RUnlock()
Manish R Jain
committed
if l.length() == 0 {
return result, fmt.Errorf("No value found")
}
var p types.Posting
if ok := l.get(&p, l.length()-1); !ok {
return result, fmt.Errorf("Unable to get last posting")
}
if p.Uid() != math.MaxUint64 {
return result, fmt.Errorf("No value found")
}
return p.ValueBytes(), nil
}