Skip to content
Snippets Groups Projects
Commit 39fadeae authored by Manish R Jain's avatar Manish R Jain
Browse files

Works, mutation worksvim list_test.go

parent 945d8416
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,10 @@ joins by minimizing network calls required, and hence to keep end-to-end latency
low.
# MVP Design
This is an MVP design doc. This would only contain part of the functionality,
> In a rapidly changing environment, there is no sense in doing the detailed analysis of a problem until they day before you are able to start working on it. Requirements and the technical ecosystem change too rapidly to generate a positive return on these activities when done far in advance. -Jason Buberel (PM, Golang @ Google)
Following from Jason, this is an MVP design doc. This would only contain part of the functionality,
which can be pushed out of the door within a month. This version
would not enforce strong consistency, and might not be as distributed. Also,
shard movement from dead machines might not make a cut in this version.
......@@ -28,7 +31,8 @@ type DirectedEdge struct {
## Technologies Used
- Use [RocksDB](http://rocksdb.org/) for storing original data and posting lists.
- Use [Cap'n Proto](https://capnproto.org/) for in-memory and on-disk representation,
- Use [Flatbuffers](https://google.github.io/flatbuffers/) for in-memory and on-disk representation.
Had considered Cap'n Proto before, but Flatbuffers team provides better Go support than the latter.
- For this version, stick to doing everything on a single server. Possibly still
using TCP layer, to avoid complexities later.
- Possibly use a simple go mutex library for txn locking.
......@@ -51,6 +55,12 @@ Ref: [experiment](https://github.com/dgraph-io/experiments/tree/master/vrpc)
- Graph languages, like Facebook's GraphQL. For this version, just use some internal lingo
as the mode of communication.
## No Go zone
- Versioning of data wouldn't be provided in this, or later versions. The best way I can
currently think of to do versioning would involve writing the deltas, and reading them back
to generate the final state. This would be too slow and memory consuming for generating the
long posting lists that we'll encounter in DGraph.
## Terminology
Term | Definition | Link
......
......@@ -17,6 +17,7 @@
package posting
import (
"errors"
"sort"
"sync"
......@@ -46,11 +47,11 @@ type List struct {
mbuffer []byte
pstore *store.Store // postinglist store
mstore *store.Store // mutation store
dirty bool
// mlayer keeps only replace instructions for the posting list.
// This works at the
mlayer map[int]types.Posting
mdelta int // Delta based on number of elements in posting list.
mindex *linked.List
}
......@@ -83,14 +84,29 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
}
var empty []byte
var emptyPosting []byte
// package level init
func init() {
b := flatbuffers.NewBuilder(0)
types.PostingListStart(b)
of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
{
b := flatbuffers.NewBuilder(0)
types.PostingListStart(b)
of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
}
{
b := flatbuffers.NewBuilder(0)
types.PostingStart(b)
types.PostingAddUid(b, 0)
of := types.PostingEnd(b)
b.Finish(of)
emptyPosting = b.Bytes[b.Head():]
}
log.Infof("Empty size: [%d] EmptyPosting size: [%d]",
len(empty), len(emptyPosting))
}
func (l *List) Init(key []byte, pstore, mstore *store.Store) {
......@@ -118,6 +134,31 @@ func (l *List) Init(key []byte, pstore, mstore *store.Store) {
l.mbuffer = make([]byte, len(empty))
copy(l.mbuffer, empty)
}
l.generateIndex()
}
func findIndex(pl *types.PostingList, uid uint64, begin, end int) int {
if begin > end {
return -1
}
mid := (begin + end) / 2
var pmid types.Posting
if ok := pl.Postings(&pmid, mid); !ok {
return -1
}
if uid < pmid.Uid() {
return findIndex(pl, uid, begin, mid-1)
}
if uid > pmid.Uid() {
return findIndex(pl, uid, mid+1, end)
}
return mid
}
// Caller must hold at least a read lock.
func (l *List) find(uid uint64) int {
posting := types.GetRootAsPostingList(l.buffer, 0)
return findIndex(posting, uid, 0, posting.PostingsLength())
}
func (l *List) Length() int {
......@@ -125,8 +166,7 @@ func (l *List) Length() int {
defer l.mutex.RUnlock()
plist := types.GetRootAsPostingList(l.buffer, 0)
mlist := types.GetRootAsPostingList(l.mbuffer, 0)
return plist.PostingsLength() + mlist.PostingsLength()
return plist.PostingsLength() + l.mdelta
}
func (l *List) Get(p *types.Posting, i int) bool {
......@@ -138,22 +178,47 @@ func (l *List) Get(p *types.Posting, i int) bool {
return plist.Postings(p, i)
}
if i >= plist.PostingsLength()+l.mindex.Len() {
return false
}
count := 0
// Iterate over mindex, and see if we have instructions
// for the given index. Otherwise, sum up the move indexes
// uptil the given index, so we know where to look in
// mlayer and/or the main posting list.
move := 0
for e := l.mindex.Front(); e != nil; e = e.Next() {
mlink := e.Value.(*MutationLink)
if mlink.idx > i {
break
} else if mlink.idx == i {
*p = mlink.posting
return true
// Found an instruction. Check what is says.
if mlink.posting.Op() == 0x01 {
// ADD
*p = mlink.posting
return true
} else if mlink.posting.Op() == 0x02 {
// DELETE
// The loop will break in the next iteration, after updating the move
// variable.
} else {
log.Fatal("Someone, I mean you, forgot to tackle" +
" this operation. Stop drinking.")
}
}
count += 1
move += mlink.moveidx
}
newidx := i + move
// Check if we have any replace instruction in mlayer.
if val, ok := l.mlayer[newidx]; ok {
*p = val
return true
}
// Hit the main posting list.
if newidx >= plist.PostingsLength() {
return false
}
return plist.Postings(p, i-count)
return plist.Postings(p, newidx)
}
// mutationIndex is useful to avoid having to parse the entire postinglist
......@@ -179,7 +244,7 @@ func (l *List) Get(p *types.Posting, i int) bool {
// Effective: ADD DEL REP (REP = replace)
//
// ----------------------------------------------------------------------------
// mutationIndex would generate these:
// generateIndex would generate these:
// mlayer (layer just above posting list contains only replace instructions)
// idx: 4
// value: 13'
......@@ -205,11 +270,16 @@ func (l *List) Get(p *types.Posting, i int) bool {
//
// Thus we can provide mutation layers over immutable posting list, while
// still ensuring fast lookup access.
func (l *List) mutationIndex() *linked.List {
//
// NOTE: This function expects the caller to hold a RW Lock.
func (l *List) generateIndex() {
l.mindex = nil
l.mdelta = 0
l.mlayer = make(map[int]types.Posting)
mlist := types.GetRootAsPostingList(l.mbuffer, 0)
plist := types.GetRootAsPostingList(l.buffer, 0)
if mlist.PostingsLength() == 0 {
return nil
return
}
var muts []*types.Posting
for i := 0; i < mlist.PostingsLength(); i++ {
......@@ -219,58 +289,112 @@ func (l *List) mutationIndex() *linked.List {
}
sort.Sort(ByUid(muts))
// TODO: Convert to binary search once this works.
mchain := linked.New()
pi := 0
var pp types.Posting
plist.Postings(&pp, pi)
pp := new(types.Posting)
if ok := plist.Postings(pp, pi); !ok {
// There's some weird padding before Posting starts. Get that padding.
padding := flatbuffers.GetUOffsetT(emptyPosting)
pp.Init(emptyPosting, padding)
if pp.Uid() != 0 {
log.Fatal("Playing with bytes is like playing with fire." +
" Someone got burnt today!")
}
}
l.mdelta = 0
for mi, mp := range muts {
// TODO: Consider converting to binary search later.
for ; pi < plist.PostingsLength() && pp.Uid() < mp.Uid(); pi++ {
plist.Postings(&pp, pi)
plist.Postings(pp, pi)
}
mlink := new(MutationLink)
mlink.posting = *mp
if pp.Uid() == mp.Uid() {
if mp.Op() == 0x01 {
// This is a replace, so don't move the main index forward.
mlink.moveidx = 0
} else if mp.Op() == 0x02 {
// This is a delete, so move the main index next.
if mp.Op() == Set {
// This is a replace, so store it in mlayer, instead of mindex.
// Note that mlayer index is based right off the plist.
l.mlayer[pi] = *mp
} else if mp.Op() == Del {
// This is a delete, so move the plist index forward.
mlink.moveidx = 1
l.mdelta -= 1
} else {
log.Fatal("This operation isn't being handled.")
}
} else if mp.Uid() < pp.Uid() {
// This is an add, so move the plist index backwards.
mlink.moveidx = -1
l.mdelta += 1
} else {
// mp.Uid() > pp.Uid()
// We've crossed over from posting list. Posting List shouldn't be
// consulted in this case, so moveidx wouldn't be used. Just set it
// to zero anyways, to represent that.
mlink.moveidx = 0
l.mdelta += 1
}
mlink.idx = pi + mi
mchain.PushBack(mlink)
}
return mchain
l.mindex = mchain
}
func (l *List) addIfValid(b *flatbuffers.Builder,
offsets *[]flatbuffers.UOffsetT, t x.Triple, op byte) {
if op == Del {
if fi := l.find(t.ValueId); fi >= 0 {
// Delete. Only add it to the list if it exists in the posting list.
*offsets = append(*offsets, addTripleToPosting(b, t, op))
}
} else {
*offsets = append(*offsets, addTripleToPosting(b, t, op))
}
}
func (l *List) AddMutation(t x.Triple, op byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
l.dirty = true // Mark as dirty.
// Mutation arrives:
// - Check if we had any(SET/DEL) before this, stored in the mutation list.
// - If yes, then replace that mutation. Jump to a)
// a) check if the entity exists in main posting list.
// - If yes, store the mutation.
// - If no, disregard this mutation.
b := flatbuffers.NewBuilder(0)
muts := types.GetRootAsPostingList(l.mbuffer, 0)
var offsets []flatbuffers.UOffsetT
added := false
for i := 0; i < muts.PostingsLength(); i++ {
var p types.Posting
if ok := muts.Postings(&p, i); !ok {
log.Errorf("While reading posting")
} else {
log.Fatal("While reading posting")
return errors.New("Error reading posting")
}
if p.Uid() != t.ValueId {
offsets = append(offsets, addPosting(b, p))
} else {
// An operation on something we already have a mutation for.
// Overwrite the previous one if it came earlier.
if p.Ts() <= t.Timestamp.UnixNano() {
l.addIfValid(b, &offsets, t, op)
} // else keep the previous one.
added = true
}
}
offsets = append(offsets, addTripleToPosting(b, t, op))
if !added {
l.addIfValid(b, &offsets, t, op)
}
types.PostingListStartPostingsVector(b, len(offsets))
for i := len(offsets) - 1; i >= 0; i-- {
......@@ -284,7 +408,7 @@ func (l *List) AddMutation(t x.Triple, op byte) error {
b.Finish(end)
l.mbuffer = b.Bytes[b.Head():]
l.mindex = l.mutationIndex()
l.generateIndex()
return l.mstore.SetOne(l.key, l.mbuffer)
}
......@@ -355,11 +479,12 @@ func (l *List) generateLinkedList() *linked.List {
func (l *List) isDirty() bool {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.dirty
return l.mindex != nil
}
func (l *List) CommitIfDirty() error {
if !l.isDirty() {
log.Debug("Not dirty. Ignoring commit.")
return nil
}
......@@ -399,5 +524,6 @@ func (l *List) CommitIfDirty() error {
}
l.mbuffer = make([]byte, len(empty))
copy(l.mbuffer, empty)
l.generateIndex()
return nil
}
......@@ -17,6 +17,7 @@
package posting
import (
"fmt"
"io/ioutil"
"os"
"testing"
......@@ -27,20 +28,20 @@ import (
"github.com/manishrjain/dgraph/x"
)
func checkUids(t *testing.T, l List, uids ...uint64) {
func checkUids(t *testing.T, l List, uids ...uint64) error {
if l.Length() != len(uids) {
t.Errorf("Length: %d", l.Length())
t.Fail()
return fmt.Errorf("Length: %d", l.Length())
}
for i := 0; i < len(uids); i++ {
var p types.Posting
if ok := l.Get(&p, i); !ok {
t.Error("Unable to retrieve posting at 2nd iter")
return fmt.Errorf("Unable to retrieve posting.")
}
if p.Uid() != uids[i] {
t.Errorf("Expected: %v. Got: %v", uids[i], p.Uid())
return fmt.Errorf("Expected: %v. Got: %v", uids[i], p.Uid())
}
}
return nil
}
func NewStore(t *testing.T) string {
......@@ -96,6 +97,7 @@ func TestAddTriple(t *testing.T) {
if string(p.Source()) != "testing" {
t.Errorf("Expected testing. Got: %v", string(p.Source()))
}
// return // Test 1.
// Add another triple now.
triple.ValueId = 81
......@@ -117,6 +119,7 @@ func TestAddTriple(t *testing.T) {
t.Logf("Expected: %v. Got: %v", uid, p.Uid())
}
}
// return // Test 2.
// Add another triple, in between the two above.
uids := []uint64{
......@@ -131,7 +134,10 @@ func TestAddTriple(t *testing.T) {
t.Error(err)
}
*/
checkUids(t, l, uids...)
if err := checkUids(t, l, uids...); err != nil {
t.Error(err)
}
// return // Test 3.
// Delete a triple, add a triple, replace a triple
triple.ValueId = 49
......@@ -156,15 +162,37 @@ func TestAddTriple(t *testing.T) {
*/
uids = []uint64{9, 69, 81}
checkUids(t, l, uids...)
if err := checkUids(t, l, uids...); err != nil {
t.Error(err)
}
l.Get(&p, 0)
if string(p.Source()) != "anti-testing" {
t.Errorf("Expected: anti-testing. Got: %v", string(p.Source()))
}
/*
if err := l.CommitIfDirty(); err != nil {
t.Error(err)
}
*/
// Try reading the same data in another PostingList.
// var dl List
// dl.Init(key, ps, ms)
// checkUids(t, dl, uids...)
var dl List
dl.Init(key, ps, ms)
if err := checkUids(t, dl, uids...); err != nil {
t.Error(err)
}
if err := dl.CommitIfDirty(); err != nil {
t.Error(err)
}
if err := checkUids(t, dl, uids...); err != nil {
t.Error(err)
}
var ol List
ol.Init(key, ps, ms)
if err := checkUids(t, ol, uids...); err != nil {
t.Error(err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment