diff --git a/posting/list.go b/posting/list.go index c2a140a516d022c7f724ad6f340a80bda04406ab..5b290654b3c207a07b529be6b258fb61334781ea 100644 --- a/posting/list.go +++ b/posting/list.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "encoding/gob" "errors" + "fmt" "math" "sort" "sync" @@ -65,12 +66,12 @@ func (pa ByUid) Len() int { return len(pa) } func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] } func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() } -// key = (entity id, attribute) -func Key(eid uint64, attr string) []byte { +// key = (entity uid, attribute) +func Key(uid uint64, attr string) []byte { buf := new(bytes.Buffer) buf.WriteString(attr) - if err := binary.Write(buf, binary.LittleEndian, eid); err != nil { - log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid) + if err := binary.Write(buf, binary.LittleEndian, uid); err != nil { + log.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid) } return buf.Bytes() } @@ -210,18 +211,26 @@ func (l *List) find(uid uint64) int { return findIndex(posting, uid, 0, posting.PostingsLength()) } +// Caller must hold at least a read lock. +func (l *List) length() int { + plist := types.GetRootAsPostingList(l.buffer, 0) + return plist.PostingsLength() + l.mdelta +} + func (l *List) Length() int { l.mutex.RLock() defer l.mutex.RUnlock() - - plist := types.GetRootAsPostingList(l.buffer, 0) - return plist.PostingsLength() + l.mdelta + return l.length() } func (l *List) Get(p *types.Posting, i int) bool { l.mutex.RLock() defer l.mutex.RUnlock() + return l.get(p, i) +} +// Caller must hold at least a read lock. +func (l *List) get(p *types.Posting, i int) bool { plist := types.GetRootAsPostingList(l.buffer, 0) if l.mindex == nil { return plist.Postings(p, i) @@ -585,3 +594,37 @@ func (l *List) CommitIfDirty() error { l.generateIndex() return nil } + +// This is a blocking function. It would block when the channel buffer capacity +// has been reached. +func (l *List) StreamUids(ch chan uint64) { + l.mutex.RLock() + defer l.mutex.RUnlock() + + var p types.Posting + for i := 0; i < l.length(); i++ { + if ok := l.get(&p, i); !ok || p.Uid() == math.MaxUint64 { + break + } + ch <- p.Uid() + } + close(ch) +} + +func (l *List) Value() (result []byte, rerr error) { + l.mutex.RLock() + defer l.mutex.RUnlock() + + if l.length() == 0 { + return result, fmt.Errorf("No value found") + } + + var p types.Posting + if ok := l.get(&p, l.length()-1); !ok { + return result, fmt.Errorf("Unable to get last posting") + } + if p.Uid() != math.MaxUint64 { + return result, fmt.Errorf("No value found") + } + return p.ValueBytes(), nil +} diff --git a/posting/list_test.go b/posting/list_test.go index e2dc1c5aa9fbc1236a1d99a6093493a21a56f8b7..10aa5940b5f96e0cfab9528ce44fbda95ff4f76f 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -57,7 +57,7 @@ func NewStore(t *testing.T) string { func TestAddMutation(t *testing.T) { var l List - key := store.Key("name", 1) + key := Key(1, "name") pdir := NewStore(t) defer os.RemoveAll(pdir) ps := new(store.Store) @@ -194,7 +194,7 @@ func TestAddMutation(t *testing.T) { func TestAddMutation_Value(t *testing.T) { var ol List - key := store.Key("value", 10) + key := Key(10, "value") pdir := NewStore(t) defer os.RemoveAll(pdir) ps := new(store.Store) @@ -221,7 +221,7 @@ func TestAddMutation_Value(t *testing.T) { t.Errorf("All value uids should go to MaxUint64. Got: %v", p.Uid()) } var out string - if err := ParseValue(&out, p); err != nil { + if err := ParseValue(&out, p.ValueBytes()); err != nil { t.Error(err) } if out != "oh hey there" { @@ -237,7 +237,7 @@ func TestAddMutation_Value(t *testing.T) { if ok := ol.Get(&tp, 0); !ok { t.Error("While retrieving posting") } - if err := ParseValue(&out, tp); err != nil { + if err := ParseValue(&out, tp.ValueBytes()); err != nil { t.Error(err) } if out != "oh hey there" { @@ -257,7 +257,7 @@ func TestAddMutation_Value(t *testing.T) { t.Error("While retrieving posting") } var iout int - if err := ParseValue(&iout, p); err != nil { + if err := ParseValue(&iout, p.ValueBytes()); err != nil { t.Error(err) } if iout != 119 { diff --git a/posting/worker.go b/posting/worker.go new file mode 100644 index 0000000000000000000000000000000000000000..f7c1957efd7d812788d78be2fccb0b1a1819abe1 --- /dev/null +++ b/posting/worker.go @@ -0,0 +1,122 @@ +package posting + +import ( + "container/heap" + + "github.com/google/flatbuffers/go" + "github.com/manishrjain/dgraph/task" +) + +type elem struct { + Uid uint64 + Chidx int // channel index +} + +type elemHeap []elem + +func (h elemHeap) Len() int { return len(h) } +func (h elemHeap) Less(i, j int) bool { return h[i].Uid < h[j].Uid } +func (h elemHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *elemHeap) Push(x interface{}) { + *h = append(*h, x.(elem)) +} +func (h *elemHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT { + task.ResultStartUidsVector(b, len(sorted)) + for i := len(sorted) - 1; i >= 0; i-- { + b.PrependUint64(sorted[i]) + } + return b.EndVector(len(sorted)) +} + +func ProcessQuery(query []byte) (result []byte, rerr error) { + uo := flatbuffers.GetUOffsetT(query) + q := new(task.Query) + q.Init(query, uo) + + b := flatbuffers.NewBuilder(0) + var voffsets []flatbuffers.UOffsetT + + var channels []chan uint64 + attr := string(q.Attr()) + for i := 0; i < q.UidsLength(); i++ { + uid := q.Uids(i) + key := Key(uid, attr) + pl := Get(key) + + task.ValueStart(b) + var valoffset flatbuffers.UOffsetT + if val, err := pl.Value(); err != nil { + valoffset = b.CreateByteVector(nilbyte) + } else { + valoffset = b.CreateByteVector(val) + } + task.ValueAddVal(b, valoffset) + voffsets = append(voffsets, task.ValueEnd(b)) + + ch := make(chan uint64, 1000) + go pl.StreamUids(ch) + channels = append(channels, ch) + } + task.ResultStartValuesVector(b, len(voffsets)) + for i := len(voffsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(voffsets[i]) + } + valuesVent := b.EndVector(len(voffsets)) + + h := &elemHeap{} + heap.Init(h) + for i, ch := range channels { + e := elem{Chidx: i} + if uid, ok := <-ch; ok { + e.Uid = uid + heap.Push(h, e) + } + } + + var last uint64 + var ruids []uint64 + last = 0 + for h.Len() > 0 { + // Pick the minimum uid. + me := (*h)[0] + if me.Uid != last { + // We're iterating over sorted streams of uint64s. Avoid adding duplicates. + ruids = append(ruids, me.Uid) + last = me.Uid + } + + // Now pick the next element from the channel which had the min uid. + ch := channels[me.Chidx] + if uid, ok := <-ch; !ok { + heap.Pop(h) + + } else { + me.Uid = uid + (*h)[0] = me + heap.Fix(h, 0) // Faster than Pop() followed by Push(). + } + } + uidsVend := addUids(b, ruids) + + task.ResultStart(b) + task.ResultAddValues(b, valuesVent) + task.ResultAddUids(b, uidsVend) + rend := task.ResultEnd(b) + b.Finish(rend) + return b.Bytes[b.Head():], nil +} + +var nilbyte []byte + +func init() { + nilbyte = make([]byte, 1) + nilbyte[0] = 0x00 +} diff --git a/posting/worker_test.go b/posting/worker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5bfc49e6568b2cb7141fb991a7067dce615697c1 --- /dev/null +++ b/posting/worker_test.go @@ -0,0 +1,55 @@ +package posting + +import ( + "container/heap" + "testing" +) + +func TestPush(t *testing.T) { + h := &elemHeap{} + heap.Init(h) + + e := elem{Uid: 5} + heap.Push(h, e) + e.Uid = 3 + heap.Push(h, e) + e.Uid = 4 + heap.Push(h, e) + + if h.Len() != 3 { + t.Errorf("Expected len 3. Found: %v", h.Len()) + } + if (*h)[0].Uid != 3 { + t.Errorf("Expected min 3. Found: %+v", (*h)[0]) + } + e.Uid = 10 + (*h)[0] = e + heap.Fix(h, 0) + if (*h)[0].Uid != 4 { + t.Errorf("Expected min 4. Found: %+v", (*h)[0]) + } + e.Uid = 11 + (*h)[0] = e + heap.Fix(h, 0) + if (*h)[0].Uid != 5 { + t.Errorf("Expected min 5. Found: %+v", (*h)[0]) + } + + e = heap.Pop(h).(elem) + if e.Uid != 5 { + t.Errorf("Expected min 5. Found %+v", e) + } + + e = heap.Pop(h).(elem) + if e.Uid != 10 { + t.Errorf("Expected min 10. Found: %+v", e) + } + e = heap.Pop(h).(elem) + if e.Uid != 11 { + t.Errorf("Expected min 11. Found: %+v", e) + } + + if h.Len() != 0 { + t.Errorf("Expected len 0. Found: %v, values: %+v", h.Len(), h) + } +} diff --git a/query/result.fbs b/query/result.fbs deleted file mode 100644 index e75f4fc39a99c4da89c5475f0afccd97b922ae23..0000000000000000000000000000000000000000 --- a/query/result.fbs +++ /dev/null @@ -1,21 +0,0 @@ -namespace result; - -table Uids { - uid:[ulong]; -} - -table TaskQuery { - attr:string; - uids:[ulong]; -} - -struct Value { - val:[ubyte]; -} - -table TaskResult { - uids:[ulong]; - values:[Value]; -} - -root_type Uids; diff --git a/query/result/Uids.go b/query/result/Uids.go deleted file mode 100644 index 0e89f6adaa43d20f8e65f7d6de4c8785af9e9510..0000000000000000000000000000000000000000 --- a/query/result/Uids.go +++ /dev/null @@ -1,45 +0,0 @@ -// automatically generated, do not modify - -package result - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) -type Uids struct { - _tab flatbuffers.Table -} - -func GetRootAsUids(buf []byte, offset flatbuffers.UOffsetT) *Uids { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Uids{} - x.Init(buf, n + offset) - return x -} - -func (rcv *Uids) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Uids) Uid(j int) uint64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8)) - } - return 0 -} - -func (rcv *Uids) UidLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func UidsStart(builder *flatbuffers.Builder) { builder.StartObject(1) } -func UidsAddUid(builder *flatbuffers.Builder, uid flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uid), 0) } -func UidsStartUidVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8) -} -func UidsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/task.fbs b/task.fbs new file mode 100644 index 0000000000000000000000000000000000000000..0e1768d70673004620190507e340c25fdcf0eb09 --- /dev/null +++ b/task.fbs @@ -0,0 +1,15 @@ +namespace task; + +table Query { + attr:string; + uids:[ulong]; +} + +table Value { + val:[ubyte]; +} + +table Result { + uids:[ulong]; + values:[Value]; +} diff --git a/task/Query.go b/task/Query.go new file mode 100644 index 0000000000000000000000000000000000000000..9a2b8e25d4af593d1828ed2f60b400c9dd53f3bd --- /dev/null +++ b/task/Query.go @@ -0,0 +1,47 @@ +// automatically generated, do not modify + +package task + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) +type Query struct { + _tab flatbuffers.Table +} + +func (rcv *Query) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Query) Attr() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Query) Uids(j int) uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8)) + } + return 0 +} + +func (rcv *Query) UidsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func QueryStart(builder *flatbuffers.Builder) { builder.StartObject(2) } +func QueryAddAttr(builder *flatbuffers.Builder, attr flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(attr), 0) } +func QueryAddUids(builder *flatbuffers.Builder, uids flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(uids), 0) } +func QueryStartUidsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8) +} +func QueryEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/task/Result.go b/task/Result.go new file mode 100644 index 0000000000000000000000000000000000000000..0d834a8108d48f29bd319e89fc8104d8bf315af8 --- /dev/null +++ b/task/Result.go @@ -0,0 +1,64 @@ +// automatically generated, do not modify + +package task + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) +type Result struct { + _tab flatbuffers.Table +} + +func (rcv *Result) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Result) Uids(j int) uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8)) + } + return 0 +} + +func (rcv *Result) UidsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Result) Values(obj *Value, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + if obj == nil { + obj = new(Value) + } + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Result) ValuesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func ResultStart(builder *flatbuffers.Builder) { builder.StartObject(2) } +func ResultAddUids(builder *flatbuffers.Builder, uids flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uids), 0) } +func ResultStartUidsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8) +} +func ResultAddValues(builder *flatbuffers.Builder, values flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(values), 0) } +func ResultStartValuesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) +} +func ResultEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/task/Value.go b/task/Value.go new file mode 100644 index 0000000000000000000000000000000000000000..e8dee179a2acb5dd51d11ffa4fcc88cd1df3f83c --- /dev/null +++ b/task/Value.go @@ -0,0 +1,46 @@ +// automatically generated, do not modify + +package task + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) +type Value struct { + _tab flatbuffers.Table +} + +func (rcv *Value) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Value) Val(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j * 1)) + } + return 0 +} + +func (rcv *Value) ValLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Value) ValBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func ValueStart(builder *flatbuffers.Builder) { builder.StartObject(1) } +func ValueAddVal(builder *flatbuffers.Builder, val flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(val), 0) } +func ValueStartValVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) +} +func ValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }