From 39fadeaec283706ce4ef4d9a9abe1d0a927c4db2 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Tue, 20 Oct 2015 16:26:01 +1100
Subject: [PATCH] Works, mutation worksvim list_test.go

---
 docs/mvp.md          |  14 ++-
 posting/list.go      | 198 +++++++++++++++++++++++++++++++++++--------
 posting/list_test.go |  48 ++++++++---
 3 files changed, 212 insertions(+), 48 deletions(-)

diff --git a/docs/mvp.md b/docs/mvp.md
index 9a6c2062..595a27e3 100644
--- a/docs/mvp.md
+++ b/docs/mvp.md
@@ -7,7 +7,10 @@ joins by minimizing network calls required, and hence to keep end-to-end latency
 low.
 
 # MVP Design
-This is an MVP design doc. This would only contain part of the functionality,
+
+> In a rapidly changing environment, there is no sense in doing the detailed analysis of a problem until they day before you are able to start working on it. Requirements and the technical ecosystem change too rapidly to generate a positive return on these activities when done far in advance. -Jason Buberel (PM, Golang @ Google)
+
+Following from Jason, this is an MVP design doc. This would only contain part of the functionality,
 which can be pushed out of the door within a month. This version
 would not enforce strong consistency, and might not be as distributed. Also,
 shard movement from dead machines might not make a cut in this version.
@@ -28,7 +31,8 @@ type DirectedEdge struct {
 
 ## Technologies Used
 - Use [RocksDB](http://rocksdb.org/) for storing original data and posting lists.
-- Use [Cap'n Proto](https://capnproto.org/) for in-memory and on-disk representation,
+- Use [Flatbuffers](https://google.github.io/flatbuffers/) for in-memory and on-disk representation.
+Had considered Cap'n Proto before, but Flatbuffers team provides better Go support than the latter.
 - For this version, stick to doing everything on a single server. Possibly still
 using TCP layer, to avoid complexities later.
 - Possibly use a simple go mutex library for txn locking.
@@ -51,6 +55,12 @@ Ref: [experiment](https://github.com/dgraph-io/experiments/tree/master/vrpc)
 - Graph languages, like Facebook's GraphQL. For this version, just use some internal lingo
 as the mode of communication.
 
+## No Go zone
+- Versioning of data wouldn't be provided in this, or later versions. The best way I can
+currently think of to do versioning would involve writing the deltas, and reading them back
+to generate the final state. This would be too slow and memory consuming for generating the
+long posting lists that we'll encounter in DGraph.
+
 ## Terminology
 
 Term  | Definition | Link
diff --git a/posting/list.go b/posting/list.go
index 3dda509d..31691f88 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -17,6 +17,7 @@
 package posting
 
 import (
+	"errors"
 	"sort"
 	"sync"
 
@@ -46,11 +47,11 @@ type List struct {
 	mbuffer []byte
 	pstore  *store.Store // postinglist store
 	mstore  *store.Store // mutation store
-	dirty   bool
 
 	// mlayer keeps only replace instructions for the posting list.
 	// This works at the
 	mlayer map[int]types.Posting
+	mdelta int // Delta based on number of elements in posting list.
 	mindex *linked.List
 }
 
@@ -83,14 +84,29 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
 }
 
 var empty []byte
+var emptyPosting []byte
 
 // package level init
 func init() {
-	b := flatbuffers.NewBuilder(0)
-	types.PostingListStart(b)
-	of := types.PostingListEnd(b)
-	b.Finish(of)
-	empty = b.Bytes[b.Head():]
+	{
+		b := flatbuffers.NewBuilder(0)
+		types.PostingListStart(b)
+		of := types.PostingListEnd(b)
+		b.Finish(of)
+		empty = b.Bytes[b.Head():]
+	}
+
+	{
+		b := flatbuffers.NewBuilder(0)
+		types.PostingStart(b)
+		types.PostingAddUid(b, 0)
+		of := types.PostingEnd(b)
+		b.Finish(of)
+		emptyPosting = b.Bytes[b.Head():]
+	}
+
+	log.Infof("Empty size: [%d] EmptyPosting size: [%d]",
+		len(empty), len(emptyPosting))
 }
 
 func (l *List) Init(key []byte, pstore, mstore *store.Store) {
@@ -118,6 +134,31 @@ func (l *List) Init(key []byte, pstore, mstore *store.Store) {
 		l.mbuffer = make([]byte, len(empty))
 		copy(l.mbuffer, empty)
 	}
+	l.generateIndex()
+}
+
+func findIndex(pl *types.PostingList, uid uint64, begin, end int) int {
+	if begin > end {
+		return -1
+	}
+	mid := (begin + end) / 2
+	var pmid types.Posting
+	if ok := pl.Postings(&pmid, mid); !ok {
+		return -1
+	}
+	if uid < pmid.Uid() {
+		return findIndex(pl, uid, begin, mid-1)
+	}
+	if uid > pmid.Uid() {
+		return findIndex(pl, uid, mid+1, end)
+	}
+	return mid
+}
+
+// Caller must hold at least a read lock.
+func (l *List) find(uid uint64) int {
+	posting := types.GetRootAsPostingList(l.buffer, 0)
+	return findIndex(posting, uid, 0, posting.PostingsLength())
 }
 
 func (l *List) Length() int {
@@ -125,8 +166,7 @@ func (l *List) Length() int {
 	defer l.mutex.RUnlock()
 
 	plist := types.GetRootAsPostingList(l.buffer, 0)
-	mlist := types.GetRootAsPostingList(l.mbuffer, 0)
-	return plist.PostingsLength() + mlist.PostingsLength()
+	return plist.PostingsLength() + l.mdelta
 }
 
 func (l *List) Get(p *types.Posting, i int) bool {
@@ -138,22 +178,47 @@ func (l *List) Get(p *types.Posting, i int) bool {
 		return plist.Postings(p, i)
 	}
 
-	if i >= plist.PostingsLength()+l.mindex.Len() {
-		return false
-	}
-	count := 0
+	// Iterate over mindex, and see if we have instructions
+	// for the given index. Otherwise, sum up the move indexes
+	// uptil the given index, so we know where to look in
+	// mlayer and/or the main posting list.
+	move := 0
 	for e := l.mindex.Front(); e != nil; e = e.Next() {
 		mlink := e.Value.(*MutationLink)
 		if mlink.idx > i {
 			break
 
 		} else if mlink.idx == i {
-			*p = mlink.posting
-			return true
+			// Found an instruction. Check what is says.
+			if mlink.posting.Op() == 0x01 {
+				// ADD
+				*p = mlink.posting
+				return true
+
+			} else if mlink.posting.Op() == 0x02 {
+				// DELETE
+				// The loop will break in the next iteration, after updating the move
+				// variable.
+
+			} else {
+				log.Fatal("Someone, I mean you, forgot to tackle" +
+					" this operation. Stop drinking.")
+			}
 		}
-		count += 1
+		move += mlink.moveidx
+	}
+	newidx := i + move
+
+	// Check if we have any replace instruction in mlayer.
+	if val, ok := l.mlayer[newidx]; ok {
+		*p = val
+		return true
+	}
+	// Hit the main posting list.
+	if newidx >= plist.PostingsLength() {
+		return false
 	}
-	return plist.Postings(p, i-count)
+	return plist.Postings(p, newidx)
 }
 
 // mutationIndex is useful to avoid having to parse the entire postinglist
@@ -179,7 +244,7 @@ func (l *List) Get(p *types.Posting, i int) bool {
 // Effective:  ADD DEL REP  (REP = replace)
 //
 // ----------------------------------------------------------------------------
-// mutationIndex would generate these:
+// generateIndex would generate these:
 // mlayer (layer just above posting list contains only replace instructions)
 // idx:          4
 // value:       13'
@@ -205,11 +270,16 @@ func (l *List) Get(p *types.Posting, i int) bool {
 //
 // Thus we can provide mutation layers over immutable posting list, while
 // still ensuring fast lookup access.
-func (l *List) mutationIndex() *linked.List {
+//
+// NOTE: This function expects the caller to hold a RW Lock.
+func (l *List) generateIndex() {
+	l.mindex = nil
+	l.mdelta = 0
+	l.mlayer = make(map[int]types.Posting)
 	mlist := types.GetRootAsPostingList(l.mbuffer, 0)
 	plist := types.GetRootAsPostingList(l.buffer, 0)
 	if mlist.PostingsLength() == 0 {
-		return nil
+		return
 	}
 	var muts []*types.Posting
 	for i := 0; i < mlist.PostingsLength(); i++ {
@@ -219,58 +289,112 @@ func (l *List) mutationIndex() *linked.List {
 	}
 	sort.Sort(ByUid(muts))
 
-	// TODO: Convert to binary search once this works.
 	mchain := linked.New()
 	pi := 0
-	var pp types.Posting
-	plist.Postings(&pp, pi)
+	pp := new(types.Posting)
+	if ok := plist.Postings(pp, pi); !ok {
+		// There's some weird padding before Posting starts. Get that padding.
+		padding := flatbuffers.GetUOffsetT(emptyPosting)
+		pp.Init(emptyPosting, padding)
+		if pp.Uid() != 0 {
+			log.Fatal("Playing with bytes is like playing with fire." +
+				" Someone got burnt today!")
+		}
+	}
 
+	l.mdelta = 0
 	for mi, mp := range muts {
+		// TODO: Consider converting to binary search later.
 		for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ {
-			plist.Postings(&pp, pi)
+			plist.Postings(pp, pi)
 		}
 
 		mlink := new(MutationLink)
 		mlink.posting = *mp
 
 		if pp.Uid() == mp.Uid() {
-			if mp.Op() == 0x01 {
-				// This is a replace, so don't move the main index forward.
-				mlink.moveidx = 0
-			} else if mp.Op() == 0x02 {
-				// This is a delete, so move the main index next.
+			if mp.Op() == Set {
+				// This is a replace, so store it in mlayer, instead of mindex.
+				// Note that mlayer index is based right off the plist.
+				l.mlayer[pi] = *mp
+
+			} else if mp.Op() == Del {
+				// This is a delete, so move the plist index forward.
 				mlink.moveidx = 1
+				l.mdelta -= 1
+
 			} else {
 				log.Fatal("This operation isn't being handled.")
 			}
 		} else if mp.Uid() < pp.Uid() {
+			// This is an add, so move the plist index backwards.
 			mlink.moveidx = -1
+			l.mdelta += 1
+
+		} else {
+			// mp.Uid() > pp.Uid()
+			// We've crossed over from posting list. Posting List shouldn't be
+			// consulted in this case, so moveidx wouldn't be used. Just set it
+			// to zero anyways, to represent that.
+			mlink.moveidx = 0
+			l.mdelta += 1
 		}
 
 		mlink.idx = pi + mi
 		mchain.PushBack(mlink)
 	}
-	return mchain
+	l.mindex = mchain
+}
+
+func (l *List) addIfValid(b *flatbuffers.Builder,
+	offsets *[]flatbuffers.UOffsetT, t x.Triple, op byte) {
+
+	if op == Del {
+		if fi := l.find(t.ValueId); fi >= 0 {
+			// Delete. Only add it to the list if it exists in the posting list.
+			*offsets = append(*offsets, addTripleToPosting(b, t, op))
+		}
+	} else {
+		*offsets = append(*offsets, addTripleToPosting(b, t, op))
+	}
 }
 
 func (l *List) AddMutation(t x.Triple, op byte) error {
 	l.mutex.Lock()
 	defer l.mutex.Unlock()
 
-	l.dirty = true // Mark as dirty.
-
+	// Mutation arrives:
+	// - Check if we had any(SET/DEL) before this, stored in the mutation list.
+	//		- If yes, then replace that mutation. Jump to a)
+	// a)		check if the entity exists in main posting list.
+	// 				- If yes, store the mutation.
+	// 				- If no, disregard this mutation.
 	b := flatbuffers.NewBuilder(0)
 	muts := types.GetRootAsPostingList(l.mbuffer, 0)
 	var offsets []flatbuffers.UOffsetT
+	added := false
 	for i := 0; i < muts.PostingsLength(); i++ {
 		var p types.Posting
 		if ok := muts.Postings(&p, i); !ok {
-			log.Errorf("While reading posting")
-		} else {
+			log.Fatal("While reading posting")
+			return errors.New("Error reading posting")
+		}
+
+		if p.Uid() != t.ValueId {
 			offsets = append(offsets, addPosting(b, p))
+
+		} else {
+			// An operation on something we already have a mutation for.
+			// Overwrite the previous one if it came earlier.
+			if p.Ts() <= t.Timestamp.UnixNano() {
+				l.addIfValid(b, &offsets, t, op)
+			} // else keep the previous one.
+			added = true
 		}
 	}
-	offsets = append(offsets, addTripleToPosting(b, t, op))
+	if !added {
+		l.addIfValid(b, &offsets, t, op)
+	}
 
 	types.PostingListStartPostingsVector(b, len(offsets))
 	for i := len(offsets) - 1; i >= 0; i-- {
@@ -284,7 +408,7 @@ func (l *List) AddMutation(t x.Triple, op byte) error {
 	b.Finish(end)
 
 	l.mbuffer = b.Bytes[b.Head():]
-	l.mindex = l.mutationIndex()
+	l.generateIndex()
 	return l.mstore.SetOne(l.key, l.mbuffer)
 }
 
@@ -355,11 +479,12 @@ func (l *List) generateLinkedList() *linked.List {
 func (l *List) isDirty() bool {
 	l.mutex.RLock()
 	defer l.mutex.RUnlock()
-	return l.dirty
+	return l.mindex != nil
 }
 
 func (l *List) CommitIfDirty() error {
 	if !l.isDirty() {
+		log.Debug("Not dirty. Ignoring commit.")
 		return nil
 	}
 
@@ -399,5 +524,6 @@ func (l *List) CommitIfDirty() error {
 	}
 	l.mbuffer = make([]byte, len(empty))
 	copy(l.mbuffer, empty)
+	l.generateIndex()
 	return nil
 }
diff --git a/posting/list_test.go b/posting/list_test.go
index 9152907b..52de6dee 100644
--- a/posting/list_test.go
+++ b/posting/list_test.go
@@ -17,6 +17,7 @@
 package posting
 
 import (
+	"fmt"
 	"io/ioutil"
 	"os"
 	"testing"
@@ -27,20 +28,20 @@ import (
 	"github.com/manishrjain/dgraph/x"
 )
 
-func checkUids(t *testing.T, l List, uids ...uint64) {
+func checkUids(t *testing.T, l List, uids ...uint64) error {
 	if l.Length() != len(uids) {
-		t.Errorf("Length: %d", l.Length())
-		t.Fail()
+		return fmt.Errorf("Length: %d", l.Length())
 	}
 	for i := 0; i < len(uids); i++ {
 		var p types.Posting
 		if ok := l.Get(&p, i); !ok {
-			t.Error("Unable to retrieve posting at 2nd iter")
+			return fmt.Errorf("Unable to retrieve posting.")
 		}
 		if p.Uid() != uids[i] {
-			t.Errorf("Expected: %v. Got: %v", uids[i], p.Uid())
+			return fmt.Errorf("Expected: %v. Got: %v", uids[i], p.Uid())
 		}
 	}
+	return nil
 }
 
 func NewStore(t *testing.T) string {
@@ -96,6 +97,7 @@ func TestAddTriple(t *testing.T) {
 	if string(p.Source()) != "testing" {
 		t.Errorf("Expected testing. Got: %v", string(p.Source()))
 	}
+	// return // Test 1.
 
 	// Add another triple now.
 	triple.ValueId = 81
@@ -117,6 +119,7 @@ func TestAddTriple(t *testing.T) {
 			t.Logf("Expected: %v. Got: %v", uid, p.Uid())
 		}
 	}
+	// return // Test 2.
 
 	// Add another triple, in between the two above.
 	uids := []uint64{
@@ -131,7 +134,10 @@ func TestAddTriple(t *testing.T) {
 			t.Error(err)
 		}
 	*/
-	checkUids(t, l, uids...)
+	if err := checkUids(t, l, uids...); err != nil {
+		t.Error(err)
+	}
+	// return // Test 3.
 
 	// Delete a triple, add a triple, replace a triple
 	triple.ValueId = 49
@@ -156,15 +162,37 @@ func TestAddTriple(t *testing.T) {
 	*/
 
 	uids = []uint64{9, 69, 81}
-	checkUids(t, l, uids...)
+	if err := checkUids(t, l, uids...); err != nil {
+		t.Error(err)
+	}
 
 	l.Get(&p, 0)
 	if string(p.Source()) != "anti-testing" {
 		t.Errorf("Expected: anti-testing. Got: %v", string(p.Source()))
 	}
 
+	/*
+		if err := l.CommitIfDirty(); err != nil {
+			t.Error(err)
+		}
+	*/
 	// Try reading the same data in another PostingList.
-	// var dl List
-	// dl.Init(key, ps, ms)
-	// checkUids(t, dl, uids...)
+	var dl List
+	dl.Init(key, ps, ms)
+	if err := checkUids(t, dl, uids...); err != nil {
+		t.Error(err)
+	}
+
+	if err := dl.CommitIfDirty(); err != nil {
+		t.Error(err)
+	}
+	if err := checkUids(t, dl, uids...); err != nil {
+		t.Error(err)
+	}
+
+	var ol List
+	ol.Init(key, ps, ms)
+	if err := checkUids(t, ol, uids...); err != nil {
+		t.Error(err)
+	}
 }
-- 
GitLab