Skip to content
Snippets Groups Projects
list.go 15.5 KiB
Newer Older
/*
 * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * 		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package posting

import (
	"github.com/google/flatbuffers/go"
	"github.com/dgraph-io/dgraph/posting/types"
	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/x"
const Set = 0x01
const Del = 0x02
	key     []byte
	mutex   sync.RWMutex
	buffer  []byte
	mbuffer []byte
	pstore  *store.Store // postinglist store
	mstore  *store.Store // mutation store

	// mlayer keeps only replace instructions for the posting list.
	// This works at the
	mlayer map[int]types.Posting
	mdelta int // Delta based on number of elements in posting list.
type ByUid []*types.Posting

func (pa ByUid) Len() int           { return len(pa) }
func (pa ByUid) Swap(i, j int)      { pa[i], pa[j] = pa[j], pa[i] }
func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }

// key = (entity uid, attribute)
func Key(uid uint64, attr string) []byte {
	buf := new(bytes.Buffer)
	buf.WriteString(attr)
	if err := binary.Write(buf, binary.LittleEndian, uid); err != nil {
		log.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid)
func addTripleToPosting(b *flatbuffers.Builder,
	t x.Triple, op byte) flatbuffers.UOffsetT {
	var bo flatbuffers.UOffsetT
	if t.Value != nil {
		if t.ValueId != math.MaxUint64 {
			log.Fatal("This should have already been set by the caller.")
		}
		var buf bytes.Buffer
		enc := gob.NewEncoder(&buf)
		if err := enc.Encode(t.Value); err != nil {
			x.Err(log, err).Fatal("Unable to encode interface")
			return 0
		}
		bo = b.CreateByteVector(buf.Bytes())
	}
	so := b.CreateString(t.Source) // Do this before posting start.
	types.PostingStart(b)
	if bo > 0 {
		types.PostingAddValue(b, bo)
	}
	types.PostingAddSource(b, so)
	types.PostingAddTs(b, t.Timestamp.UnixNano())
	types.PostingAddOp(b, op)
	return types.PostingEnd(b)
}

func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
	so := b.CreateByteString(p.Source()) // Do this before posting start.
	var bo flatbuffers.UOffsetT
	if p.ValueLength() > 0 {
		bo = b.CreateByteVector(p.ValueBytes())
	}

	types.PostingStart(b)
	types.PostingAddUid(b, p.Uid())
	if bo > 0 {
		types.PostingAddValue(b, bo)
	}
	types.PostingAddSource(b, so)
	types.PostingAddTs(b, p.Ts())
	types.PostingAddOp(b, p.Op())
	return types.PostingEnd(b)
}

var emptyPosting []byte
	{
		b := flatbuffers.NewBuilder(0)
		types.PostingListStart(b)
		of := types.PostingListEnd(b)
		b.Finish(of)
		empty = b.Bytes[b.Head():]
	}

	{
		b := flatbuffers.NewBuilder(0)
		types.PostingStart(b)
		types.PostingAddUid(b, 0)
		of := types.PostingEnd(b)
		b.Finish(of)
		emptyPosting = b.Bytes[b.Head():]
	}

	log.Infof("Empty size: [%d] EmptyPosting size: [%d]",
		len(empty), len(emptyPosting))
func ParseValue(i interface{}, value []byte) error {
	if len(value) == 0 {
		return errors.New("No value found in posting")
	}
	var buf bytes.Buffer
	dec := gob.NewDecoder(&buf)
	return dec.Decode(i)
}

func (l *List) init(key []byte, pstore, mstore *store.Store) {
	l.mutex.Lock()
	defer l.mutex.Unlock()

		log.Fatal("empty should have some bytes.")
	}
	l.key = key
	l.pstore = pstore
	l.mstore = mstore

	var err error
	if l.buffer, err = pstore.Get(key); err != nil {
		// log.Debugf("While retrieving posting list from db: %v\n", err)
		l.buffer = make([]byte, len(empty))
		copy(l.buffer, empty)
	}

	if l.mbuffer, err = mstore.Get(key); err != nil {
		// log.Debugf("While retrieving mutation list from db: %v\n", err)
		l.mbuffer = make([]byte, len(empty))
		copy(l.mbuffer, empty)
	l.generateIndex()
}

func findIndex(pl *types.PostingList, uid uint64, begin, end int) int {
	if begin > end {
		return -1
	}
	mid := (begin + end) / 2
	var pmid types.Posting
	if ok := pl.Postings(&pmid, mid); !ok {
		return -1
	}
	if uid < pmid.Uid() {
		return findIndex(pl, uid, begin, mid-1)
	}
	if uid > pmid.Uid() {
		return findIndex(pl, uid, mid+1, end)
	}
	return mid
}

// Caller must hold at least a read lock.
func (l *List) find(uid uint64) int {
	posting := types.GetRootAsPostingList(l.buffer, 0)
	return findIndex(posting, uid, 0, posting.PostingsLength())
// Caller must hold at least a read lock.
func (l *List) length() int {
	plist := types.GetRootAsPostingList(l.buffer, 0)
	return plist.PostingsLength() + l.mdelta
}

	l.mutex.RLock()
	defer l.mutex.RUnlock()
}

func (l *List) Get(p *types.Posting, i int) bool {
// Caller must hold at least a read lock.
func (l *List) get(p *types.Posting, i int) bool {
	plist := types.GetRootAsPostingList(l.buffer, 0)
	if l.mindex == nil {
		return plist.Postings(p, i)
	}

	// Iterate over mindex, and see if we have instructions
	// for the given index. Otherwise, sum up the move indexes
	// uptil the given index, so we know where to look in
	// mlayer and/or the main posting list.
	move := 0
	for e := l.mindex.Front(); e != nil; e = e.Next() {
		mlink := e.Value.(*MutationLink)
		if mlink.idx > i {
			break

		} else if mlink.idx == i {
			// Found an instruction. Check what is says.
			if mlink.posting.Op() == 0x01 {
				// ADD
				*p = mlink.posting
				return true

			} else if mlink.posting.Op() == 0x02 {
				// DELETE
				// The loop will break in the next iteration, after updating the move
				// variable.

			} else {
				log.Fatal("Someone, I mean you, forgot to tackle" +
					" this operation. Stop drinking.")
			}
		move += mlink.moveidx
	}
	newidx := i + move

	// Check if we have any replace instruction in mlayer.
	if val, ok := l.mlayer[newidx]; ok {
		*p = val
		return true
	}
	// Hit the main posting list.
	if newidx >= plist.PostingsLength() {
		return false
	return plist.Postings(p, newidx)
// mutationIndex is useful to avoid having to parse the entire postinglist
// upto idx, for every Get(*types.Posting, idx), which has a complexity
// of O(idx). Iteration over N size posting list would this push us into
// O(N^2) territory, without this technique.
//
// Using this technique,
// we can overlay mutation layers over immutable posting list, to allow for
// O(m) lookups, where m = size of mutation list. Obviously, the size of
// mutation list should be much smaller than the size of posting list, except
// in tiny posting lists, where performance wouldn't be such a concern anyways.
//
// Say we have this data:
// Posting List (plist, immutable):
// idx:   0  1  2  3  4  5
// value: 2  5  9 10 13 15
//
// Mutation List (mlist):
// idx:          0   1   2
// value:        7  10  13' // posting uid is 13 but other values vary.
// Op:         SET DEL SET
// Effective:  ADD DEL REP  (REP = replace)
//
// ----------------------------------------------------------------------------
// generateIndex would generate these:
// mlayer (layer just above posting list contains only replace instructions)
// idx:          4
// value:       13'
// Op:       	 SET
// Effective:  REP  (REP = replace)
//
// mindex:
// idx:          2   4
// value:        7  10
// moveidx:     -1  +1
// Effective:  ADD DEL
//
// Now, let's see how the access would work:
// idx: get --> calculation [idx, served from, value]
// idx: 0 --> 0   [0, plist, 2]
// idx: 1 --> 1   [1, plist, 5]
// idx: 2 --> ADD from mindex
//        -->     [2, mindex, 7] // also has moveidx = -1
// idx: 3 --> 3 + moveidx=-1 = 2 [2, plist, 9]
// idx: 4 --> DEL from mindex
//        --> 4 + moveidx=-1 + moveidx=+1 = 4 [4, mlayer, 13']
// idx: 5 --> 5 + moveidx=-1 + moveidx=+1 = 5 [5, plist, 15]
//
// Thus we can provide mutation layers over immutable posting list, while
// still ensuring fast lookup access.
//
// NOTE: This function expects the caller to hold a RW Lock.
func (l *List) generateIndex() {
	l.mindex = nil
	l.mdelta = 0
	l.mlayer = make(map[int]types.Posting)
	mlist := types.GetRootAsPostingList(l.mbuffer, 0)
	plist := types.GetRootAsPostingList(l.buffer, 0)
	if mlist.PostingsLength() == 0 {
	}
	var muts []*types.Posting
	for i := 0; i < mlist.PostingsLength(); i++ {
		var mp types.Posting
		mlist.Postings(&mp, i)
		muts = append(muts, &mp)
	}
	sort.Sort(ByUid(muts))

	mchain := linked.New()
	pi := 0
	pp := new(types.Posting)
	if ok := plist.Postings(pp, pi); !ok {
		// There's some weird padding before Posting starts. Get that padding.
		padding := flatbuffers.GetUOffsetT(emptyPosting)
		pp.Init(emptyPosting, padding)
		if pp.Uid() != 0 {
			log.Fatal("Playing with bytes is like playing with fire." +
				" Someone got burnt today!")
		}
	}
	l.mdelta = 0
		// TODO: Consider converting to binary search later.
		for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ {
			plist.Postings(pp, pi)
		mlink := new(MutationLink)
		mlink.posting = *mp
			if mp.Op() == Set {
				// This is a replace, so store it in mlayer, instead of mindex.
				// Note that mlayer index is based right off the plist.
				l.mlayer[pi] = *mp

			} else if mp.Op() == Del {
				// This is a delete, so move the plist index forward.
			} else {
				log.Fatal("This operation isn't being handled.")
			}
		} else if mp.Uid() < pp.Uid() {
			// This is an add, so move the plist index backwards.
			l.mdelta += 1

		} else {
			// mp.Uid() > pp.Uid()
			// We've crossed over from posting list. Posting List shouldn't be
			// consulted in this case, so moveidx wouldn't be used. Just set it
			// to zero anyways, to represent that.
			mlink.moveidx = 0
			l.mdelta += 1
	l.mindex = mchain
}

func (l *List) addIfValid(b *flatbuffers.Builder,
	offsets *[]flatbuffers.UOffsetT, t x.Triple, op byte) {

	if op == Del {
		if fi := l.find(t.ValueId); fi >= 0 {
			// Delete. Only add it to the list if it exists in the posting list.
			*offsets = append(*offsets, addTripleToPosting(b, t, op))
		}
	} else {
		*offsets = append(*offsets, addTripleToPosting(b, t, op))
	}
func (l *List) AddMutation(t x.Triple, op byte) error {
	l.mutex.Lock()
	defer l.mutex.Unlock()

	// Mutation arrives:
	// - Check if we had any(SET/DEL) before this, stored in the mutation list.
	//		- If yes, then replace that mutation. Jump to a)
	// a)		check if the entity exists in main posting list.
	// 				- If yes, store the mutation.
	// 				- If no, disregard this mutation.

	// All triples with a value set, have the same uid. In other words,
	// an (entity, attribute) can only have one interface{} value.
	if t.Value != nil {
		t.ValueId = math.MaxUint64
	}

	b := flatbuffers.NewBuilder(0)
	muts := types.GetRootAsPostingList(l.mbuffer, 0)
	var offsets []flatbuffers.UOffsetT
	added := false
	for i := 0; i < muts.PostingsLength(); i++ {
		var p types.Posting
		if ok := muts.Postings(&p, i); !ok {
			log.Fatal("While reading posting")
			return errors.New("Error reading posting")
		}

		if p.Uid() != t.ValueId {
			offsets = append(offsets, addPosting(b, p))

		} else {
			// An operation on something we already have a mutation for.
			// Overwrite the previous one if it came earlier.
			if p.Ts() <= t.Timestamp.UnixNano() {
				l.addIfValid(b, &offsets, t, op)
			} // else keep the previous one.
			added = true
	if !added {
		l.addIfValid(b, &offsets, t, op)
	}

	types.PostingListStartPostingsVector(b, len(offsets))
	for i := len(offsets) - 1; i >= 0; i-- {
		b.PrependUOffsetT(offsets[i])
	vend := b.EndVector(len(offsets))
	types.PostingListStart(b)
	types.PostingListAddPostings(b, vend)
	end := types.PostingListEnd(b)
	b.Finish(end)
	l.generateIndex()
	return l.mstore.SetOne(l.key, l.mbuffer)
func addOrSet(ll *linked.List, p *types.Posting) {
	added := false
	for e := ll.Front(); e != nil; e = e.Next() {
		pe := e.Value.(*types.Posting)
		if pe == nil {
			log.Fatal("Posting shouldn't be nil!")
		}

		if !added && pe.Uid() > p.Uid() {
			ll.InsertBefore(p, e)
			added = true

		} else if pe.Uid() == p.Uid() {
			added = true
			e.Value = p
		}
	}
	if !added {
		ll.PushBack(p)
	}
func remove(ll *linked.List, p *types.Posting) {
	for e := ll.Front(); e != nil; e = e.Next() {
		pe := e.Value.(*types.Posting)
		if pe.Uid() == p.Uid() {
			ll.Remove(e)
		}
func (l *List) generateLinkedList() *linked.List {
	plist := types.GetRootAsPostingList(l.buffer, 0)
	ll := linked.New()

	for i := 0; i < plist.PostingsLength(); i++ {
		p := new(types.Posting)
		plist.Postings(p, i)

		ll.PushBack(p)
	}

	mlist := types.GetRootAsPostingList(l.mbuffer, 0)
	for i := 0; i < mlist.PostingsLength(); i++ {
		p := new(types.Posting)
		mlist.Postings(p, i)

		if p.Op() == 0x01 {
			// Set/Add
			addOrSet(ll, p)

		} else if p.Op() == 0x02 {
			// Delete
			remove(ll, p)
			log.Fatalf("Strange mutation: %+v", p)
func (l *List) isDirty() bool {
	l.mutex.RLock()
	defer l.mutex.RUnlock()
	return l.mindex != nil
}

func (l *List) CommitIfDirty() error {
	if !l.isDirty() {
		log.WithField("dirty", false).Debug("Not Committing")
	} else {
		log.WithField("dirty", true).Debug("Committing")
	l.mutex.Lock()
	defer l.mutex.Unlock()

	ll := l.generateLinkedList()
	b := flatbuffers.NewBuilder(0)

	var offsets []flatbuffers.UOffsetT
	for e := ll.Front(); e != nil; e = e.Next() {
		p := e.Value.(*types.Posting)
		off := addPosting(b, *p)
		offsets = append(offsets, off)
	types.PostingListStartPostingsVector(b, ll.Len())
	for i := len(offsets) - 1; i >= 0; i-- {
		b.PrependUOffsetT(offsets[i])
	}
	vend := b.EndVector(ll.Len())

	types.PostingListStart(b)
	types.PostingListAddPostings(b, vend)
	end := types.PostingListEnd(b)
	b.Finish(end)

	l.buffer = b.Bytes[b.Head():]
	if err := l.pstore.SetOne(l.key, l.buffer); err != nil {
		log.WithField("error", err).Errorf("While storing posting list")
		return err
	}

	if err := l.mstore.Delete(l.key); err != nil {
		log.WithField("error", err).Errorf("While deleting mutation list")
		return err
	}
	l.mbuffer = make([]byte, len(empty))
	copy(l.mbuffer, empty)
	l.generateIndex()

// This is a blocking function. It would block when the channel buffer capacity
// has been reached.
func (l *List) StreamUids(ch chan uint64) {
	l.mutex.RLock()
	defer l.mutex.RUnlock()

	var p types.Posting
	for i := 0; i < l.length(); i++ {
		if ok := l.get(&p, i); !ok || p.Uid() == math.MaxUint64 {
			break
		}
		ch <- p.Uid()
	}
	close(ch)
}

func (l *List) Value() (result []byte, rerr error) {
	l.mutex.RLock()
	defer l.mutex.RUnlock()

	if l.length() == 0 {
		return result, fmt.Errorf("No value found")
	}

	var p types.Posting
	if ok := l.get(&p, l.length()-1); !ok {
		return result, fmt.Errorf("Unable to get last posting")
	}
	if p.Uid() != math.MaxUint64 {
		return result, fmt.Errorf("No value found")
	}
	return p.ValueBytes(), nil
}