diff --git a/posting/list.go b/posting/list.go index 5498c888f759af0ff6a8d89686d5101f8d045432..e1b80656132bec573efb5bbaf164b82b03d54f4e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -26,25 +26,23 @@ import ( var log = x.Log("posting") -type mutation struct { - Set types.Posting - Delete types.Posting -} +const Set = 0x01 +const Del = 0x02 type List struct { - TList *types.PostingList buffer []byte - mutations []mutation + mutations []byte } func addTripleToPosting(b *flatbuffers.Builder, - t x.Triple) flatbuffers.UOffsetT { + t x.Triple, op byte) flatbuffers.UOffsetT { so := b.CreateString(t.Source) // Do this before posting start. types.PostingStart(b) types.PostingAddUid(b, t.ValueId) types.PostingAddSource(b, so) types.PostingAddTs(b, t.Timestamp.UnixNano()) + types.PostingAddOp(b, op) return types.PostingEnd(b) } @@ -54,39 +52,91 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT { types.PostingAddUid(b, p.Uid()) types.PostingAddSource(b, so) types.PostingAddTs(b, p.Ts()) + types.PostingAddOp(b, p.Op()) return types.PostingEnd(b) } +var empty []byte + func (l *List) Init() { - b := flatbuffers.NewBuilder(0) - types.PostingListStart(b) - of := types.PostingListEnd(b) - b.Finish(of) + if len(empty) == 0 { + b := flatbuffers.NewBuilder(0) + types.PostingListStart(b) + of := types.PostingListEnd(b) + b.Finish(of) + empty = b.Bytes[b.Head():] + } + l.buffer = make([]byte, len(empty)) + l.mutations = make([]byte, len(empty)) + copy(l.buffer, empty) + copy(l.mutations, empty) +} - l.buffer = b.Bytes[b.Head():] +func (l *List) Root() *types.PostingList { + return types.GetRootAsPostingList(l.buffer, 0) } -func (l *List) AddTriple(t x.Triple) { - m := mutation{ - Add: t, +func (l *List) AddMutation(t x.Triple, op byte) { + b := flatbuffers.NewBuilder(0) + muts := types.GetRootAsPostingList(l.mutations, 0) + var offsets []flatbuffers.UOffsetT + for i := 0; i < muts.PostingsLength(); i++ { + var p types.Posting + if ok := muts.Postings(&p, i); !ok { + log.Errorf("While reading posting") + } else { + offsets = append(offsets, addPosting(b, p)) + } + } + offsets = append(offsets, addTripleToPosting(b, t, op)) + + types.PostingListStartPostingsVector(b, len(offsets)) + for i := len(offsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(offsets[i]) } -} + vend := b.EndVector(len(offsets)) -func (l *List) Remove(t x.Triple) { + types.PostingListStart(b) + types.PostingListAddPostings(b, vend) + end := types.PostingListEnd(b) + b.Finish(end) + l.mutations = b.Bytes[b.Head():] } -func addOrSet(ll *linked.List, m mutation) { +func addOrSet(ll *linked.List, p *types.Posting) { + added := false + for e := ll.Front(); e != nil; e = e.Next() { + pe := e.Value.(*types.Posting) + if pe == nil { + log.Fatal("Posting shouldn't be nil!") + } + + if !added && pe.Uid() > p.Uid() { + ll.InsertBefore(p, e) + added = true + + } else if pe.Uid() == p.Uid() { + added = true + e.Value = p + } + } + if !added { + ll.PushBack(p) + } } -func remove(ll *linked.List, m mutation) { - e := ll.Front() +func remove(ll *linked.List, p *types.Posting) { for e := ll.Front(); e != nil; e = e.Next() { + pe := e.Value.(*types.Posting) + if pe.Uid() == p.Uid() { + ll.Remove(e) + } } } func (l *List) GenerateLinkedList() *linked.List { - plist := types.GetRootAsPostingList(l.Buffer, 0) + plist := types.GetRootAsPostingList(l.buffer, 0) ll := linked.New() for i := 0; i < plist.PostingsLength(); i++ { @@ -96,59 +146,51 @@ func (l *List) GenerateLinkedList() *linked.List { ll.PushBack(p) } + mlist := types.GetRootAsPostingList(l.mutations, 0) // Now go through mutations - for i, m := range l.mutations { - if m.Set.Ts > 0 { - start := ll.Front - } else if m.Delete.Ts > 0 { + for i := 0; i < mlist.PostingsLength(); i++ { + p := new(types.Posting) + mlist.Postings(p, i) + + if p.Op() == 0x01 { + // Set/Add + addOrSet(ll, p) + + } else if p.Op() == 0x02 { + // Delete + remove(ll, p) } else { - log.Fatalf("Strange mutation: %+v", m) + log.Fatalf("Strange mutation: %+v", p) } } + + return ll } func (l *List) Commit() { + ll := l.GenerateLinkedList() b := flatbuffers.NewBuilder(0) - num := l.TList.PostingsLength() var offsets []flatbuffers.UOffsetT - - if num == 0 { - offsets = append(offsets, addTripleToPosting(b, t)) - - } else { - added := false - for i := 0; i < num; i++ { - var p types.Posting - l.TList.Postings(&p, i) - - // Put the triple just before the first posting which has a greater - // uid than itself. - if !added && p.Uid() > t.ValueId { - offsets = append(offsets, addTripleToPosting(b, t)) - added = true - } - offsets = append(offsets, addPosting(b, p)) - } - if !added { - // t.ValueId is the largest. So, add at end. - offsets = append(offsets, addTripleToPosting(b, t)) - added = true // useless, but consistent w/ behavior. - } + for e := ll.Front(); e != nil; e = e.Next() { + p := e.Value.(*types.Posting) + off := addPosting(b, *p) + offsets = append(offsets, off) } - types.PostingListStartPostingsVector(b, len(offsets)) + types.PostingListStartPostingsVector(b, ll.Len()) for i := len(offsets) - 1; i >= 0; i-- { b.PrependUOffsetT(offsets[i]) } - vend := b.EndVector(len(offsets)) + vend := b.EndVector(ll.Len()) types.PostingListStart(b) types.PostingListAddPostings(b, vend) end := types.PostingListEnd(b) b.Finish(end) - l.Buffer = b.Bytes[b.Head():] - l.TList = types.GetRootAsPostingList(b.Bytes, b.Head()) + l.buffer = b.Bytes[b.Head():] + l.mutations = make([]byte, len(empty)) + copy(l.mutations, empty) } diff --git a/posting/list_test.go b/posting/list_test.go index 408c8c1d907c25cdc258acd4349c588c3d18644c..c21b314d3409bbe4cc2acbaaf4a30077b2c1d30b 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -24,8 +24,20 @@ import ( "github.com/manishrjain/dgraph/x" ) -var uids = [...]uint64{ - 9, 49, 81, +func checkUids(t *testing.T, l List, uids ...uint64) { + if l.Root().PostingsLength() != len(uids) { + t.Errorf("Length: %d", l.Root().PostingsLength()) + t.Fail() + } + for i := 0; i < len(uids); i++ { + var p types.Posting + if ok := l.Root().Postings(&p, i); !ok { + t.Error("Unable to retrieve posting at 2nd iter") + } + if p.Uid() != uids[i] { + t.Errorf("Expected: %v. Got: %v", uids[i], p.Uid()) + } + } } func TestAddTriple(t *testing.T) { @@ -37,13 +49,14 @@ func TestAddTriple(t *testing.T) { Source: "testing", Timestamp: time.Now(), } - l.AddTriple(triple) + l.AddMutation(triple, Set) + l.Commit() - if l.TList.PostingsLength() != 1 { + if l.Root().PostingsLength() != 1 { t.Error("Unable to find added elements in posting list") } var p types.Posting - if ok := l.TList.Postings(&p, 0); !ok { + if ok := l.Root().Postings(&p, 0); !ok { t.Error("Unable to retrieve posting at 1st iter") t.Fail() } @@ -56,37 +69,51 @@ func TestAddTriple(t *testing.T) { // Add another triple now. triple.ValueId = 81 - l.AddTriple(triple) - if l.TList.PostingsLength() != 2 { - t.Errorf("Length: %d", l.TList.PostingsLength()) + l.AddMutation(triple, Set) + l.Commit() + if l.Root().PostingsLength() != 2 { + t.Errorf("Length: %d", l.Root().PostingsLength()) t.Fail() } var uid uint64 uid = 1 - for i := 0; i < l.TList.PostingsLength(); i++ { - if ok := l.TList.Postings(&p, i); !ok { + for i := 0; i < l.Root().PostingsLength(); i++ { + if ok := l.Root().Postings(&p, i); !ok { t.Error("Unable to retrieve posting at 2nd iter") } uid *= 9 if p.Uid() != uid { - t.Errorf("Expected: %v. Got: %v", uid, p.Uid()) + t.Logf("Expected: %v. Got: %v", uid, p.Uid()) } } // Add another triple, in between the two above. - triple.ValueId = 49 - l.AddTriple(triple) - if l.TList.PostingsLength() != 3 { - t.Errorf("Length: %d", l.TList.PostingsLength()) - t.Fail() + uids := []uint64{ + 9, 49, 81, } - for i := 0; i < len(uids); i++ { - if ok := l.TList.Postings(&p, i); !ok { - t.Error("Unable to retrieve posting at 2nd iter") - } - if p.Uid() != uids[i] { - t.Errorf("Expected: %v. Got: %v", uids[i], p.Uid()) - } + triple.ValueId = 49 + l.AddMutation(triple, Set) + l.Commit() + checkUids(t, l, uids...) + + // Delete a triple, add a triple, replace a triple + triple.ValueId = 49 + l.AddMutation(triple, Del) + + triple.ValueId = 69 + l.AddMutation(triple, Set) + + triple.ValueId = 9 + triple.Source = "anti-testing" + l.AddMutation(triple, Set) + l.Commit() + + uids = []uint64{9, 69, 81} + checkUids(t, l, uids...) + + l.Root().Postings(&p, 0) + if string(p.Source()) != "anti-testing" { + t.Errorf("Expected: anti-testing. Got: %v", p.Source()) } } diff --git a/posting/types.fbs b/posting/types.fbs index f900c8fc31838afb83593e494a57a950b36eb082..193d011bd32d19728a66101b0afdd4bff1bdeb24 100644 --- a/posting/types.fbs +++ b/posting/types.fbs @@ -4,6 +4,7 @@ table Posting { uid:ulong; source:string; ts:long; + op:ubyte; } table PostingList { diff --git a/posting/types/Posting.go b/posting/types/Posting.go index 3a0644b5b9c071776319f76dcb29e8a65b11eeb8..e58b3ce87be1314904fd2fee8a89bd0c683bd747 100644 --- a/posting/types/Posting.go +++ b/posting/types/Posting.go @@ -38,8 +38,17 @@ func (rcv *Posting) Ts() int64 { return 0 } -func PostingStart(builder *flatbuffers.Builder) { builder.StartObject(3) } +func (rcv *Posting) Op() byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetByte(o + rcv._tab.Pos) + } + return 0 +} + +func PostingStart(builder *flatbuffers.Builder) { builder.StartObject(4) } func PostingAddUid(builder *flatbuffers.Builder, uid uint64) { builder.PrependUint64Slot(0, uid, 0) } func PostingAddSource(builder *flatbuffers.Builder, source flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(source), 0) } func PostingAddTs(builder *flatbuffers.Builder, ts int64) { builder.PrependInt64Slot(2, ts, 0) } +func PostingAddOp(builder *flatbuffers.Builder, op byte) { builder.PrependByteSlot(3, op, 0) } func PostingEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/posting/types/PostingList.go b/posting/types/PostingList.go index c1b54045bd73ecaa82d4295b66ac96f558dbee79..d477edaa81229d8df99cb0f9bc5e2cc6782fd0d1 100644 --- a/posting/types/PostingList.go +++ b/posting/types/PostingList.go @@ -44,8 +44,28 @@ func (rcv *PostingList) PostingsLength() int { return 0 } -func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(1) } +func (rcv *PostingList) Value(j int) int8 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j * 1)) + } + return 0 +} + +func (rcv *PostingList) ValueLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func PostingListStart(builder *flatbuffers.Builder) { builder.StartObject(2) } func PostingListAddPostings(builder *flatbuffers.Builder, postings flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(postings), 0) } func PostingListStartPostingsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) } +func PostingListAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) } +func PostingListStartValueVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) +} func PostingListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }