Skip to content
Snippets Groups Projects
Commit cbf2e445 authored by Manish R Jain's avatar Manish R Jain
Browse files

Working mutation buffer, with commit back to main buffer

parent c859f212
No related branches found
No related tags found
No related merge requests found
...@@ -26,25 +26,23 @@ import ( ...@@ -26,25 +26,23 @@ import (
var log = x.Log("posting") var log = x.Log("posting")
type mutation struct { const Set = 0x01
Set types.Posting const Del = 0x02
Delete types.Posting
}
type List struct { type List struct {
TList *types.PostingList
buffer []byte buffer []byte
mutations []mutation mutations []byte
} }
func addTripleToPosting(b *flatbuffers.Builder, func addTripleToPosting(b *flatbuffers.Builder,
t x.Triple) flatbuffers.UOffsetT { t x.Triple, op byte) flatbuffers.UOffsetT {
so := b.CreateString(t.Source) // Do this before posting start. so := b.CreateString(t.Source) // Do this before posting start.
types.PostingStart(b) types.PostingStart(b)
types.PostingAddUid(b, t.ValueId) types.PostingAddUid(b, t.ValueId)
types.PostingAddSource(b, so) types.PostingAddSource(b, so)
types.PostingAddTs(b, t.Timestamp.UnixNano()) types.PostingAddTs(b, t.Timestamp.UnixNano())
types.PostingAddOp(b, op)
return types.PostingEnd(b) return types.PostingEnd(b)
} }
...@@ -54,39 +52,91 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT { ...@@ -54,39 +52,91 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
types.PostingAddUid(b, p.Uid()) types.PostingAddUid(b, p.Uid())
types.PostingAddSource(b, so) types.PostingAddSource(b, so)
types.PostingAddTs(b, p.Ts()) types.PostingAddTs(b, p.Ts())
types.PostingAddOp(b, p.Op())
return types.PostingEnd(b) return types.PostingEnd(b)
} }
var empty []byte
func (l *List) Init() { func (l *List) Init() {
b := flatbuffers.NewBuilder(0) if len(empty) == 0 {
types.PostingListStart(b) b := flatbuffers.NewBuilder(0)
of := types.PostingListEnd(b) types.PostingListStart(b)
b.Finish(of) of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
}
l.buffer = make([]byte, len(empty))
l.mutations = make([]byte, len(empty))
copy(l.buffer, empty)
copy(l.mutations, empty)
}
l.buffer = b.Bytes[b.Head():] func (l *List) Root() *types.PostingList {
return types.GetRootAsPostingList(l.buffer, 0)
} }
func (l *List) AddTriple(t x.Triple) { func (l *List) AddMutation(t x.Triple, op byte) {
m := mutation{ b := flatbuffers.NewBuilder(0)
Add: t, muts := types.GetRootAsPostingList(l.mutations, 0)
var offsets []flatbuffers.UOffsetT
for i := 0; i < muts.PostingsLength(); i++ {
var p types.Posting
if ok := muts.Postings(&p, i); !ok {
log.Errorf("While reading posting")
} else {
offsets = append(offsets, addPosting(b, p))
}
}
offsets = append(offsets, addTripleToPosting(b, t, op))
types.PostingListStartPostingsVector(b, len(offsets))
for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i])
} }
} vend := b.EndVector(len(offsets))
func (l *List) Remove(t x.Triple) { types.PostingListStart(b)
types.PostingListAddPostings(b, vend)
end := types.PostingListEnd(b)
b.Finish(end)
l.mutations = b.Bytes[b.Head():]
} }
func addOrSet(ll *linked.List, m mutation) { func addOrSet(ll *linked.List, p *types.Posting) {
added := false
for e := ll.Front(); e != nil; e = e.Next() {
pe := e.Value.(*types.Posting)
if pe == nil {
log.Fatal("Posting shouldn't be nil!")
}
if !added && pe.Uid() > p.Uid() {
ll.InsertBefore(p, e)
added = true
} else if pe.Uid() == p.Uid() {
added = true
e.Value = p
}
}
if !added {
ll.PushBack(p)
}
} }
func remove(ll *linked.List, m mutation) { func remove(ll *linked.List, p *types.Posting) {
e := ll.Front()
for e := ll.Front(); e != nil; e = e.Next() { for e := ll.Front(); e != nil; e = e.Next() {
pe := e.Value.(*types.Posting)
if pe.Uid() == p.Uid() {
ll.Remove(e)
}
} }
} }
func (l *List) GenerateLinkedList() *linked.List { func (l *List) GenerateLinkedList() *linked.List {
plist := types.GetRootAsPostingList(l.Buffer, 0) plist := types.GetRootAsPostingList(l.buffer, 0)
ll := linked.New() ll := linked.New()
for i := 0; i < plist.PostingsLength(); i++ { for i := 0; i < plist.PostingsLength(); i++ {
...@@ -96,59 +146,51 @@ func (l *List) GenerateLinkedList() *linked.List { ...@@ -96,59 +146,51 @@ func (l *List) GenerateLinkedList() *linked.List {
ll.PushBack(p) ll.PushBack(p)
} }
mlist := types.GetRootAsPostingList(l.mutations, 0)
// Now go through mutations // Now go through mutations
for i, m := range l.mutations { for i := 0; i < mlist.PostingsLength(); i++ {
if m.Set.Ts > 0 { p := new(types.Posting)
start := ll.Front mlist.Postings(p, i)
} else if m.Delete.Ts > 0 {
if p.Op() == 0x01 {
// Set/Add
addOrSet(ll, p)
} else if p.Op() == 0x02 {
// Delete
remove(ll, p)
} else { } else {
log.Fatalf("Strange mutation: %+v", m) log.Fatalf("Strange mutation: %+v", p)
} }
} }
return ll
} }
func (l *List) Commit() { func (l *List) Commit() {
ll := l.GenerateLinkedList()
b := flatbuffers.NewBuilder(0) b := flatbuffers.NewBuilder(0)
num := l.TList.PostingsLength()
var offsets []flatbuffers.UOffsetT var offsets []flatbuffers.UOffsetT
for e := ll.Front(); e != nil; e = e.Next() {
if num == 0 { p := e.Value.(*types.Posting)
offsets = append(offsets, addTripleToPosting(b, t)) off := addPosting(b, *p)
offsets = append(offsets, off)
} else {
added := false
for i := 0; i < num; i++ {
var p types.Posting
l.TList.Postings(&p, i)
// Put the triple just before the first posting which has a greater
// uid than itself.
if !added && p.Uid() > t.ValueId {
offsets = append(offsets, addTripleToPosting(b, t))
added = true
}
offsets = append(offsets, addPosting(b, p))
}
if !added {
// t.ValueId is the largest. So, add at end.
offsets = append(offsets, addTripleToPosting(b, t))
added = true // useless, but consistent w/ behavior.
}
} }
types.PostingListStartPostingsVector(b, len(offsets)) types.PostingListStartPostingsVector(b, ll.Len())
for i := len(offsets) - 1; i >= 0; i-- { for i := len(offsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(offsets[i]) b.PrependUOffsetT(offsets[i])
} }
vend := b.EndVector(len(offsets)) vend := b.EndVector(ll.Len())
types.PostingListStart(b) types.PostingListStart(b)
types.PostingListAddPostings(b, vend) types.PostingListAddPostings(b, vend)
end := types.PostingListEnd(b) end := types.PostingListEnd(b)
b.Finish(end) b.Finish(end)
l.Buffer = b.Bytes[b.Head():] l.buffer = b.Bytes[b.Head():]
l.TList = types.GetRootAsPostingList(b.Bytes, b.Head()) l.mutations = make([]byte, len(empty))
copy(l.mutations, empty)
} }
...@@ -24,8 +24,20 @@ import ( ...@@ -24,8 +24,20 @@ import (
"github.com/manishrjain/dgraph/x" "github.com/manishrjain/dgraph/x"
) )
var uids = [...]uint64{ func checkUids(t *testing.T, l List, uids ...uint64) {
9, 49, 81, if l.Root().PostingsLength() != len(uids) {
t.Errorf("Length: %d", l.Root().PostingsLength())
t.Fail()
}
for i := 0; i < len(uids); i++ {
var p types.Posting
if ok := l.Root().Postings(&p, i); !ok {
t.Error("Unable to retrieve posting at 2nd iter")
}
if p.Uid() != uids[i] {
t.Errorf("Expected: %v. Got: %v", uids[i], p.Uid())
}
}
} }
func TestAddTriple(t *testing.T) { func TestAddTriple(t *testing.T) {
...@@ -37,13 +49,14 @@ func TestAddTriple(t *testing.T) { ...@@ -37,13 +49,14 @@ func TestAddTriple(t *testing.T) {
Source: "testing", Source: "testing",
Timestamp: time.Now(), Timestamp: time.Now(),
} }
l.AddTriple(triple) l.AddMutation(triple, Set)
l.Commit()
if l.TList.PostingsLength() != 1 { if l.Root().PostingsLength() != 1 {
t.Error("Unable to find added elements in posting list") t.Error("Unable to find added elements in posting list")
} }
var p types.Posting var p types.Posting
if ok := l.TList.Postings(&p, 0); !ok { if ok := l.Root().Postings(&p, 0); !ok {
t.Error("Unable to retrieve posting at 1st iter") t.Error("Unable to retrieve posting at 1st iter")
t.Fail() t.Fail()
} }
...@@ -56,37 +69,51 @@ func TestAddTriple(t *testing.T) { ...@@ -56,37 +69,51 @@ func TestAddTriple(t *testing.T) {
// Add another triple now. // Add another triple now.
triple.ValueId = 81 triple.ValueId = 81
l.AddTriple(triple) l.AddMutation(triple, Set)
if l.TList.PostingsLength() != 2 { l.Commit()
t.Errorf("Length: %d", l.TList.PostingsLength()) if l.Root().PostingsLength() != 2 {
t.Errorf("Length: %d", l.Root().PostingsLength())
t.Fail() t.Fail()
} }
var uid uint64 var uid uint64
uid = 1 uid = 1
for i := 0; i < l.TList.PostingsLength(); i++ { for i := 0; i < l.Root().PostingsLength(); i++ {
if ok := l.TList.Postings(&p, i); !ok { if ok := l.Root().Postings(&p, i); !ok {
t.Error("Unable to retrieve posting at 2nd iter") t.Error("Unable to retrieve posting at 2nd iter")
} }
uid *= 9 uid *= 9
if p.Uid() != uid { if p.Uid() != uid {
t.Errorf("Expected: %v. Got: %v", uid, p.Uid()) t.Logf("Expected: %v. Got: %v", uid, p.Uid())
} }
} }
// Add another triple, in between the two above. // Add another triple, in between the two above.
triple.ValueId = 49 uids := []uint64{
l.AddTriple(triple) 9, 49, 81,
if l.TList.PostingsLength() != 3 {
t.Errorf("Length: %d", l.TList.PostingsLength())
t.Fail()
} }
for i := 0; i < len(uids); i++ { triple.ValueId = 49
if ok := l.TList.Postings(&p, i); !ok { l.AddMutation(triple, Set)
t.Error("Unable to retrieve posting at 2nd iter") l.Commit()
} checkUids(t, l, uids...)
if p.Uid() != uids[i] {
t.Errorf("Expected: %v. Got: %v", uids[i], p.Uid()) // Delete a triple, add a triple, replace a triple
} triple.ValueId = 49
l.AddMutation(triple, Del)
triple.ValueId = 69
l.AddMutation(triple, Set)
triple.ValueId = 9
triple.Source = "anti-testing"
l.AddMutation(triple, Set)
l.Commit()
uids = []uint64{9, 69, 81}
checkUids(t, l, uids...)
l.Root().Postings(&p, 0)
if string(p.Source()) != "anti-testing" {
t.Errorf("Expected: anti-testing. Got: %v", p.Source())
} }
} }
...@@ -4,6 +4,7 @@ table Posting { ...@@ -4,6 +4,7 @@ table Posting {
uid:ulong; uid:ulong;
source:string; source:string;
ts:long; ts:long;
op:ubyte;
} }
table PostingList { table PostingList {
......
...@@ -38,8 +38,17 @@ func (rcv *Posting) Ts() int64 { ...@@ -38,8 +38,17 @@ func (rcv *Posting) Ts() int64 {
return 0 return 0
} }
func PostingStart(builder *flatbuffers.Builder) { builder.StartObject(3) } func (rcv *Posting) Op() byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.GetByte(o + rcv._tab.Pos)
}
return 0
}
func PostingStart(builder *flatbuffers.Builder) { builder.StartObject(4) }
func PostingAddUid(builder *flatbuffers.Builder, uid uint64) { builder.PrependUint64Slot(0, uid, 0) } func PostingAddUid(builder *flatbuffers.Builder, uid uint64) { builder.PrependUint64Slot(0, uid, 0) }
func PostingAddSource(builder *flatbuffers.Builder, source flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(source), 0) } func PostingAddSource(builder *flatbuffers.Builder, source flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(source), 0) }
func PostingAddTs(builder *flatbuffers.Builder, ts int64) { builder.PrependInt64Slot(2, ts, 0) } func PostingAddTs(builder *flatbuffers.Builder, ts int64) { builder.PrependInt64Slot(2, ts, 0) }
func PostingAddOp(builder *flatbuffers.Builder, op byte) { builder.PrependByteSlot(3, op, 0) }
func PostingEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } func PostingEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }
...@@ -44,8 +44,28 @@ func (rcv *PostingList) PostingsLength() int { ...@@ -44,8 +44,28 @@ func (rcv *PostingList) PostingsLength() int {
return 0 return 0
} }
func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(1) } func (rcv *PostingList) Value(j int) int8 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j * 1))
}
return 0
}
func (rcv *PostingList) ValueLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(2) }
func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(postings), 0) } func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(postings), 0) }
func PostingListStartPostingsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) func PostingListStartPostingsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4)
} }
func PostingListAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) }
func PostingListStartValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1)
}
func PostingListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } func PostingListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment