Newer
Older
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
"bytes"
"encoding/gob"
Manish R Jain
committed
"sort"
"github.com/google/flatbuffers/go"
"github.com/manishrjain/dgraph/posting/types"
Manish R Jain
committed
"github.com/manishrjain/dgraph/store"
"github.com/manishrjain/dgraph/x"
linked "container/list"
var log = x.Log("posting")
const Set = 0x01
const Del = 0x02
Manish R Jain
committed
type MutationLink struct {
idx int
moveidx int
Manish R Jain
committed
posting types.Posting
}
Manish R Jain
committed
key []byte
mutex sync.RWMutex
buffer []byte
mbuffer []byte
pstore *store.Store // postinglist store
mstore *store.Store // mutation store
// mlayer keeps only replace instructions for the posting list.
// This works at the
mlayer map[int]types.Posting
mdelta int // Delta based on number of elements in posting list.
Manish R Jain
committed
mindex *linked.List
Manish R Jain
committed
type ByUid []*types.Posting
func (pa ByUid) Len() int { return len(pa) }
func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] }
func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }
func addTripleToPosting(b *flatbuffers.Builder,
t x.Triple, op byte) flatbuffers.UOffsetT {
var bo flatbuffers.UOffsetT
if t.Value != nil {
if t.ValueId != math.MaxUint64 {
log.Fatal("This should have already been set by the caller.")
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(t.Value); err != nil {
x.Err(log, err).Fatal("Unable to encode interface")
return 0
}
bo = b.CreateByteVector(buf.Bytes())
}
so := b.CreateString(t.Source) // Do this before posting start.
if bo > 0 {
types.PostingAddValue(b, bo)
}
types.PostingAddUid(b, t.ValueId)
types.PostingAddSource(b, so)
types.PostingAddTs(b, t.Timestamp.UnixNano())
types.PostingAddOp(b, op)
return types.PostingEnd(b)
}
func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
so := b.CreateByteString(p.Source()) // Do this before posting start.
types.PostingStart(b)
types.PostingAddUid(b, p.Uid())
types.PostingAddSource(b, so)
types.PostingAddTs(b, p.Ts())
types.PostingAddOp(b, p.Op())
return types.PostingEnd(b)
}
var empty []byte
Manish R Jain
committed
// package level init
func init() {
{
b := flatbuffers.NewBuilder(0)
types.PostingListStart(b)
of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
}
{
b := flatbuffers.NewBuilder(0)
types.PostingStart(b)
types.PostingAddUid(b, 0)
of := types.PostingEnd(b)
b.Finish(of)
emptyPosting = b.Bytes[b.Head():]
}
log.Infof("Empty size: [%d] EmptyPosting size: [%d]",
len(empty), len(emptyPosting))
Manish R Jain
committed
}
func ParseValue(i interface{}, p types.Posting) error {
if p.ValueLength() == 0 {
return errors.New("No value found in posting")
}
var buf bytes.Buffer
buf.Write(p.ValueBytes())
dec := gob.NewDecoder(&buf)
return dec.Decode(i)
}
Manish R Jain
committed
func (l *List) Init(key []byte, pstore, mstore *store.Store) {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(empty) == 0 {
Manish R Jain
committed
log.Fatal("empty should have some bytes.")
}
l.key = key
l.pstore = pstore
l.mstore = mstore
var err error
if l.buffer, err = pstore.Get(key); err != nil {
log.Errorf("While retrieving posting list from db: %v\n", err)
// Error. Just set to empty.
Manish R Jain
committed
l.buffer = make([]byte, len(empty))
copy(l.buffer, empty)
}
Manish R Jain
committed
if l.mbuffer, err = mstore.Get(key); err != nil {
Manish R Jain
committed
log.Debugf("While retrieving mutation list from db: %v\n", err)
// Error. Just set to empty.
Manish R Jain
committed
l.mbuffer = make([]byte, len(empty))
copy(l.mbuffer, empty)
l.generateIndex()
}
func findIndex(pl *types.PostingList, uid uint64, begin, end int) int {
if begin > end {
return -1
}
mid := (begin + end) / 2
var pmid types.Posting
if ok := pl.Postings(&pmid, mid); !ok {
return -1
}
if uid < pmid.Uid() {
return findIndex(pl, uid, begin, mid-1)
}
if uid > pmid.Uid() {
return findIndex(pl, uid, mid+1, end)
}
return mid
}
// Caller must hold at least a read lock.
func (l *List) find(uid uint64) int {
posting := types.GetRootAsPostingList(l.buffer, 0)
return findIndex(posting, uid, 0, posting.PostingsLength())
func (l *List) Length() int {
l.mutex.RLock()
defer l.mutex.RUnlock()
Manish R Jain
committed
plist := types.GetRootAsPostingList(l.buffer, 0)
return plist.PostingsLength() + l.mdelta
}
func (l *List) Get(p *types.Posting, i int) bool {
Manish R Jain
committed
l.mutex.RLock()
defer l.mutex.RUnlock()
plist := types.GetRootAsPostingList(l.buffer, 0)
if l.mindex == nil {
return plist.Postings(p, i)
}
// Iterate over mindex, and see if we have instructions
// for the given index. Otherwise, sum up the move indexes
// uptil the given index, so we know where to look in
// mlayer and/or the main posting list.
move := 0
Manish R Jain
committed
for e := l.mindex.Front(); e != nil; e = e.Next() {
mlink := e.Value.(*MutationLink)
if mlink.idx > i {
break
} else if mlink.idx == i {
// Found an instruction. Check what is says.
if mlink.posting.Op() == 0x01 {
// ADD
*p = mlink.posting
return true
} else if mlink.posting.Op() == 0x02 {
// DELETE
// The loop will break in the next iteration, after updating the move
// variable.
} else {
log.Fatal("Someone, I mean you, forgot to tackle" +
" this operation. Stop drinking.")
}
Manish R Jain
committed
}
move += mlink.moveidx
}
newidx := i + move
// Check if we have any replace instruction in mlayer.
if val, ok := l.mlayer[newidx]; ok {
*p = val
return true
}
// Hit the main posting list.
if newidx >= plist.PostingsLength() {
return false
Manish R Jain
committed
}
Manish R Jain
committed
}
// mutationIndex is useful to avoid having to parse the entire postinglist
// upto idx, for every Get(*types.Posting, idx), which has a complexity
// of O(idx). Iteration over N size posting list would this push us into
// O(N^2) territory, without this technique.
//
// Using this technique,
// we can overlay mutation layers over immutable posting list, to allow for
// O(m) lookups, where m = size of mutation list. Obviously, the size of
// mutation list should be much smaller than the size of posting list, except
// in tiny posting lists, where performance wouldn't be such a concern anyways.
//
// Say we have this data:
// Posting List (plist, immutable):
// idx: 0 1 2 3 4 5
// value: 2 5 9 10 13 15
//
// Mutation List (mlist):
// idx: 0 1 2
// value: 7 10 13' // posting uid is 13 but other values vary.
// Op: SET DEL SET
// Effective: ADD DEL REP (REP = replace)
//
// ----------------------------------------------------------------------------
// generateIndex would generate these:
// mlayer (layer just above posting list contains only replace instructions)
// idx: 4
// value: 13'
// Op: SET
// Effective: REP (REP = replace)
//
// mindex:
// idx: 2 4
// value: 7 10
// moveidx: -1 +1
// Effective: ADD DEL
//
// Now, let's see how the access would work:
// idx: get --> calculation [idx, served from, value]
// idx: 0 --> 0 [0, plist, 2]
// idx: 1 --> 1 [1, plist, 5]
// idx: 2 --> ADD from mindex
// --> [2, mindex, 7] // also has moveidx = -1
// idx: 3 --> 3 + moveidx=-1 = 2 [2, plist, 9]
// idx: 4 --> DEL from mindex
// --> 4 + moveidx=-1 + moveidx=+1 = 4 [4, mlayer, 13']
// idx: 5 --> 5 + moveidx=-1 + moveidx=+1 = 5 [5, plist, 15]
//
// Thus we can provide mutation layers over immutable posting list, while
// still ensuring fast lookup access.
//
// NOTE: This function expects the caller to hold a RW Lock.
func (l *List) generateIndex() {
l.mindex = nil
l.mdelta = 0
l.mlayer = make(map[int]types.Posting)
Manish R Jain
committed
mlist := types.GetRootAsPostingList(l.mbuffer, 0)
plist := types.GetRootAsPostingList(l.buffer, 0)
if mlist.PostingsLength() == 0 {
Manish R Jain
committed
}
var muts []*types.Posting
for i := 0; i < mlist.PostingsLength(); i++ {
var mp types.Posting
mlist.Postings(&mp, i)
muts = append(muts, &mp)
}
sort.Sort(ByUid(muts))
mchain := linked.New()
pi := 0
pp := new(types.Posting)
if ok := plist.Postings(pp, pi); !ok {
// There's some weird padding before Posting starts. Get that padding.
padding := flatbuffers.GetUOffsetT(emptyPosting)
pp.Init(emptyPosting, padding)
if pp.Uid() != 0 {
log.Fatal("Playing with bytes is like playing with fire." +
" Someone got burnt today!")
}
}
Manish R Jain
committed
Manish R Jain
committed
for mi, mp := range muts {
// TODO: Consider converting to binary search later.
Manish R Jain
committed
for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ {
Manish R Jain
committed
}
Manish R Jain
committed
mlink := new(MutationLink)
mlink.posting = *mp
if pp.Uid() == mp.Uid() {
if mp.Op() == Set {
// This is a replace, so store it in mlayer, instead of mindex.
// Note that mlayer index is based right off the plist.
l.mlayer[pi] = *mp
} else if mp.Op() == Del {
// This is a delete, so move the plist index forward.
mlink.moveidx = 1
} else {
log.Fatal("This operation isn't being handled.")
}
} else if mp.Uid() < pp.Uid() {
// This is an add, so move the plist index backwards.
mlink.moveidx = -1
l.mdelta += 1
} else {
// mp.Uid() > pp.Uid()
// We've crossed over from posting list. Posting List shouldn't be
// consulted in this case, so moveidx wouldn't be used. Just set it
// to zero anyways, to represent that.
mlink.moveidx = 0
l.mdelta += 1
}
mlink.idx = pi + mi
Manish R Jain
committed
mchain.PushBack(mlink)
}
l.mindex = mchain
}
func (l *List) addIfValid(b *flatbuffers.Builder,
offsets *[]flatbuffers.UOffsetT, t x.Triple, op byte) {
if op == Del {
if fi := l.find(t.ValueId); fi >= 0 {
// Delete. Only add it to the list if it exists in the posting list.
*offsets = append(*offsets, addTripleToPosting(b, t, op))
}
} else {
*offsets = append(*offsets, addTripleToPosting(b, t, op))
}
Manish R Jain
committed
func (l *List) AddMutation(t x.Triple, op byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
// Mutation arrives:
// - Check if we had any(SET/DEL) before this, stored in the mutation list.
// - If yes, then replace that mutation. Jump to a)
// a) check if the entity exists in main posting list.
// - If yes, store the mutation.
// - If no, disregard this mutation.
// All triples with a value set, have the same uid. In other words,
// an (entity, attribute) can only have one interface{} value.
if t.Value != nil {
t.ValueId = math.MaxUint64
}
b := flatbuffers.NewBuilder(0)
Manish R Jain
committed
muts := types.GetRootAsPostingList(l.mbuffer, 0)
var offsets []flatbuffers.UOffsetT
for i := 0; i < muts.PostingsLength(); i++ {
var p types.Posting
if ok := muts.Postings(&p, i); !ok {
log.Fatal("While reading posting")
return errors.New("Error reading posting")
}
if p.Uid() != t.ValueId {
offsets = append(offsets, addPosting(b, p))
} else {
// An operation on something we already have a mutation for.
// Overwrite the previous one if it came earlier.
if p.Ts() <= t.Timestamp.UnixNano() {
l.addIfValid(b, &offsets, t, op)
} // else keep the previous one.
added = true
if !added {
l.addIfValid(b, &offsets, t, op)
}
types.PostingListStartPostingsVector(b, len(offsets))
for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i])
vend := b.EndVector(len(offsets))
types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
end := types.PostingListEnd(b)
b.Finish(end)
Manish R Jain
committed
l.mbuffer = b.Bytes[b.Head():]
Manish R Jain
committed
return l.mstore.SetOne(l.key, l.mbuffer)
func addOrSet(ll *linked.List, p *types.Posting) {
added := false
for e := ll.Front(); e != nil; e = e.Next() {
pe := e.Value.(*types.Posting)
if pe == nil {
log.Fatal("Posting shouldn't be nil!")
}
if !added && pe.Uid() > p.Uid() {
ll.InsertBefore(p, e)
added = true
} else if pe.Uid() == p.Uid() {
added = true
e.Value = p
}
}
if !added {
ll.PushBack(p)
}
func remove(ll *linked.List, p *types.Posting) {
for e := ll.Front(); e != nil; e = e.Next() {
pe := e.Value.(*types.Posting)
if pe.Uid() == p.Uid() {
ll.Remove(e)
}
}
}
func (l *List) generateLinkedList() *linked.List {
plist := types.GetRootAsPostingList(l.buffer, 0)
ll := linked.New()
for i := 0; i < plist.PostingsLength(); i++ {
p := new(types.Posting)
plist.Postings(p, i)
ll.PushBack(p)
}
Manish R Jain
committed
mlist := types.GetRootAsPostingList(l.mbuffer, 0)
// Now go through mutations
for i := 0; i < mlist.PostingsLength(); i++ {
p := new(types.Posting)
mlist.Postings(p, i)
if p.Op() == 0x01 {
// Set/Add
addOrSet(ll, p)
} else if p.Op() == 0x02 {
// Delete
remove(ll, p)
} else {
log.Fatalf("Strange mutation: %+v", p)
func (l *List) isDirty() bool {
l.mutex.RLock()
defer l.mutex.RUnlock()
}
func (l *List) CommitIfDirty() error {
if !l.isDirty() {
log.Debug("Not dirty. Ignoring commit.")
return nil
}
l.mutex.Lock()
defer l.mutex.Unlock()
ll := l.generateLinkedList()
b := flatbuffers.NewBuilder(0)
var offsets []flatbuffers.UOffsetT
for e := ll.Front(); e != nil; e = e.Next() {
p := e.Value.(*types.Posting)
off := addPosting(b, *p)
offsets = append(offsets, off)
types.PostingListStartPostingsVector(b, ll.Len())
for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i])
}
vend := b.EndVector(ll.Len())
types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
end := types.PostingListEnd(b)
b.Finish(end)
l.buffer = b.Bytes[b.Head():]
Manish R Jain
committed
if err := l.pstore.SetOne(l.key, l.buffer); err != nil {
log.WithField("error", err).Errorf("While storing posting list")
return err
}
if err := l.mstore.Delete(l.key); err != nil {
log.WithField("error", err).Errorf("While deleting mutation list")
return err
}
Manish R Jain
committed
l.mbuffer = make([]byte, len(empty))
copy(l.mbuffer, empty)
Manish R Jain
committed
return nil