Skip to content
Snippets Groups Projects
Commit aeb72365 authored by Manish R Jain's avatar Manish R Jain
Browse files

switch uid list to uidmatrix

parent 488b2630
No related branches found
No related tags found
No related merge requests found
......@@ -26,10 +26,10 @@ import (
"sort"
"sync"
"github.com/google/flatbuffers/go"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
linked "container/list"
)
......@@ -597,18 +597,20 @@ func (l *List) CommitIfDirty() error {
// This is a blocking function. It would block when the channel buffer capacity
// has been reached.
func (l *List) StreamUids(ch chan uint64) {
func (l *List) GetUids() []uint64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
result := make([]uint64, l.length())
result = result[:0]
var p types.Posting
for i := 0; i < l.length(); i++ {
if ok := l.get(&p, i); !ok || p.Uid() == math.MaxUint64 {
break
}
ch <- p.Uid()
result = append(result, p.Uid())
}
close(ch)
return result
}
func (l *List) Value() (result []byte, rerr error) {
......
package posting
import (
"container/heap"
"github.com/google/flatbuffers/go"
"github.com/dgraph-io/dgraph/task"
"github.com/google/flatbuffers/go"
)
/*
type elem struct {
Uid uint64
Chidx int // channel index
......@@ -27,7 +26,9 @@ func (h *elemHeap) Pop() interface{} {
*h = old[0 : n-1]
return x
}
*/
/*
func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT {
// Invert the sorted uids to maintain same order in flatbuffers.
task.ResultStartUidsVector(b, len(sorted))
......@@ -36,6 +37,20 @@ func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT {
}
return b.EndVector(len(sorted))
}
*/
func uidlistOffset(b *flatbuffers.Builder,
sorted []uint64) flatbuffers.UOffsetT {
task.UidListStartUidsVector(b, len(sorted))
for i := len(sorted) - 1; i >= 0; i-- {
b.PrependUint64(sorted[i])
}
ulist := b.EndVector(len(sorted))
task.UidListStart(b)
task.UidListAddUids(b, ulist)
return task.UidListEnd(b)
}
func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
......@@ -43,9 +58,9 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
q.Init(query, uo)
b := flatbuffers.NewBuilder(0)
var voffsets []flatbuffers.UOffsetT
voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
var channels []chan uint64
attr := string(q.Attr())
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
......@@ -60,11 +75,10 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
valoffset = b.CreateByteVector(val)
}
task.ValueAddVal(b, valoffset)
voffsets = append(voffsets, task.ValueEnd(b))
voffsets[i] = task.ValueEnd(b)
ch := make(chan uint64, 1000)
go pl.StreamUids(ch)
channels = append(channels, ch)
ulist := pl.GetUids()
uoffsets[i] = uidlistOffset(b, ulist)
}
task.ResultStartValuesVector(b, len(voffsets))
for i := len(voffsets) - 1; i >= 0; i-- {
......@@ -72,44 +86,15 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
}
valuesVent := b.EndVector(len(voffsets))
h := &elemHeap{}
heap.Init(h)
for i, ch := range channels {
e := elem{Chidx: i}
if uid, ok := <-ch; ok {
e.Uid = uid
heap.Push(h, e)
}
}
var last uint64
var ruids []uint64
last = 0
for h.Len() > 0 {
// Pick the minimum uid.
me := (*h)[0]
if me.Uid != last {
// We're iterating over sorted streams of uint64s. Avoid adding duplicates.
ruids = append(ruids, me.Uid)
last = me.Uid
}
// Now pick the next element from the channel which had the min uid.
ch := channels[me.Chidx]
if uid, ok := <-ch; !ok {
heap.Pop(h)
} else {
me.Uid = uid
(*h)[0] = me
heap.Fix(h, 0) // Faster than Pop() followed by Push().
}
task.ResultStartUidmatrixVector(b, len(uoffsets))
for i := len(uoffsets) - 1; i >= 0; i-- {
b.PrependUOffsetT(uoffsets[i])
}
uidsVend := addUids(b, ruids)
matrixVent := b.EndVector(len(uoffsets))
task.ResultStart(b)
task.ResultAddValues(b, valuesVent)
task.ResultAddUids(b, uidsVend)
task.ResultAddUidmatrix(b, matrixVent)
rend := task.ResultEnd(b)
b.Finish(rend)
return b.Bytes[b.Head():], nil
......
package posting
import (
"container/heap"
"fmt"
"os"
"testing"
"time"
"github.com/Sirupsen/logrus"
"github.com/google/flatbuffers/go"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
)
func TestPush(t *testing.T) {
h := &elemHeap{}
heap.Init(h)
e := elem{Uid: 5}
heap.Push(h, e)
e.Uid = 3
heap.Push(h, e)
e.Uid = 4
heap.Push(h, e)
if h.Len() != 3 {
t.Errorf("Expected len 3. Found: %v", h.Len())
}
if (*h)[0].Uid != 3 {
t.Errorf("Expected min 3. Found: %+v", (*h)[0])
}
e.Uid = 10
(*h)[0] = e
heap.Fix(h, 0)
if (*h)[0].Uid != 4 {
t.Errorf("Expected min 4. Found: %+v", (*h)[0])
}
e.Uid = 11
(*h)[0] = e
heap.Fix(h, 0)
if (*h)[0].Uid != 5 {
t.Errorf("Expected min 5. Found: %+v", (*h)[0])
}
e = heap.Pop(h).(elem)
if e.Uid != 5 {
t.Errorf("Expected min 5. Found %+v", e)
func addEdge(t *testing.T, edge x.DirectedEdge, l *List) {
if err := l.AddMutation(edge, Set); err != nil {
t.Error(err)
}
}
e = heap.Pop(h).(elem)
if e.Uid != 10 {
t.Errorf("Expected min 10. Found: %+v", e)
}
e = heap.Pop(h).(elem)
if e.Uid != 11 {
t.Errorf("Expected min 11. Found: %+v", e)
func check(r *task.Result, idx int, expected []uint64) error {
var m task.UidList
if ok := r.Uidmatrix(&m, idx); !ok {
return fmt.Errorf("Unable to retrieve uidlist")
}
if h.Len() != 0 {
t.Errorf("Expected len 0. Found: %v, values: %+v", h.Len(), h)
if m.UidsLength() != len(expected) {
return fmt.Errorf("Expected length: %v. Got: %v",
len(expected), m.UidsLength())
}
}
func addEdge(t *testing.T, edge x.DirectedEdge, l *List) {
if err := l.AddMutation(edge, Set); err != nil {
t.Error(err)
for i, uid := range expected {
if m.Uids(i) != uid {
return fmt.Errorf("Uid mismatch at index: %v. Expected: %v. Got: %v",
i, uid, m.Uids(i))
}
}
return nil
}
func TestProcessTask(t *testing.T) {
......@@ -114,25 +84,22 @@ func TestProcessTask(t *testing.T) {
r := new(task.Result)
r.Init(result, ro)
if r.UidsLength() != 4 {
t.Errorf("Expected 4. Got uids length: %v", r.UidsLength())
}
if r.Uids(0) != 23 {
t.Errorf("Expected 23. Got: %v", r.Uids(0))
if r.UidmatrixLength() != 3 {
t.Errorf("Expected 3. Got uidmatrix length: %v", r.UidmatrixLength())
}
if r.Uids(1) != 25 {
t.Errorf("Expected 25. Got: %v", r.Uids(0))
if err := check(r, 0, []uint64{23, 31}); err != nil {
t.Error(err)
}
if r.Uids(2) != 26 {
t.Errorf("Expected 26. Got: %v", r.Uids(0))
if err := check(r, 1, []uint64{23}); err != nil {
t.Error(err)
}
if r.Uids(3) != 31 {
t.Errorf("Expected 31. Got: %v", r.Uids(0))
if err := check(r, 2, []uint64{23, 25, 26, 31}); err != nil {
t.Error(err)
}
if r.ValuesLength() != 3 {
t.Errorf("Expected 3. Got values length: %v", r.ValuesLength())
}
var tval task.Value
if ok := r.Values(&tval, 0); !ok {
t.Errorf("Unable to retrieve value")
......@@ -141,11 +108,17 @@ func TestProcessTask(t *testing.T) {
tval.ValBytes()[0] != 0x00 {
t.Errorf("Invalid byte value at index 0")
}
if ok := r.Values(&tval, 1); !ok {
t.Errorf("Unable to retrieve value")
}
if tval.ValLength() != 1 ||
tval.ValBytes()[0] != 0x00 {
t.Errorf("Invalid byte value at index 0")
}
if ok := r.Values(&tval, 2); !ok {
t.Errorf("Unable to retrieve value")
}
var v string
if err := ParseValue(&v, tval.ValBytes()); err != nil {
t.Error(err)
......
......@@ -9,7 +9,11 @@ table Value {
val:[ubyte];
}
table Result {
table UidList {
uids:[ulong];
}
table Result {
uidmatrix:[UidList];
values:[Value];
}
......@@ -14,16 +14,22 @@ func (rcv *Result) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Pos = i
}
func (rcv *Result) Uids(j int) uint64 {
func (rcv *Result) Uidmatrix(obj *UidList, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8))
x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(UidList)
}
return 0
obj.Init(rcv._tab.Bytes, x)
return true
}
return false
}
func (rcv *Result) UidsLength() int {
func (rcv *Result) UidmatrixLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
......@@ -55,8 +61,8 @@ func (rcv *Result) ValuesLength() int {
}
func ResultStart(builder *flatbuffers.Builder) { builder.StartObject(2) }
func ResultAddUids(builder *flatbuffers.Builder, uids flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uids), 0) }
func ResultStartUidsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8)
func ResultAddUidmatrix(builder *flatbuffers.Builder, uidmatrix flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uidmatrix), 0) }
func ResultStartUidmatrixVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4)
}
func ResultAddValues(builder *flatbuffers.Builder, values flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(values), 0) }
func ResultStartValuesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4)
......
// automatically generated, do not modify
package task
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type UidList struct {
_tab flatbuffers.Table
}
func (rcv *UidList) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *UidList) Uids(j int) uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8))
}
return 0
}
func (rcv *UidList) UidsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func UidListStart(builder *flatbuffers.Builder) { builder.StartObject(1) }
func UidListAddUids(builder *flatbuffers.Builder, uids flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uids), 0) }
func UidListStartUidsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8)
}
func UidListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package x
type elem struct {
Uid uint64
Chidx int // channel index
}
type Uint64Heap []elem
func (h Uint64Heap) Len() int { return len(h) }
func (h Uint64Heap) Less(i, j int) bool { return h[i].Uid < h[j].Uid }
func (h Uint64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *Uint64Heap) Push(x interface{}) {
*h = append(*h, x.(elem))
}
func (h *Uint64Heap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
package x
import (
"container/heap"
"testing"
)
func TestPush(t *testing.T) {
h := &Uint64Heap{}
heap.Init(h)
e := elem{Uid: 5}
heap.Push(h, e)
e.Uid = 3
heap.Push(h, e)
e.Uid = 4
heap.Push(h, e)
if h.Len() != 3 {
t.Errorf("Expected len 3. Found: %v", h.Len())
}
if (*h)[0].Uid != 3 {
t.Errorf("Expected min 3. Found: %+v", (*h)[0])
}
e.Uid = 10
(*h)[0] = e
heap.Fix(h, 0)
if (*h)[0].Uid != 4 {
t.Errorf("Expected min 4. Found: %+v", (*h)[0])
}
e.Uid = 11
(*h)[0] = e
heap.Fix(h, 0)
if (*h)[0].Uid != 5 {
t.Errorf("Expected min 5. Found: %+v", (*h)[0])
}
e = heap.Pop(h).(elem)
if e.Uid != 5 {
t.Errorf("Expected min 5. Found %+v", e)
}
e = heap.Pop(h).(elem)
if e.Uid != 10 {
t.Errorf("Expected min 10. Found: %+v", e)
}
e = heap.Pop(h).(elem)
if e.Uid != 11 {
t.Errorf("Expected min 11. Found: %+v", e)
}
if h.Len() != 0 {
t.Errorf("Expected len 0. Found: %v, values: %+v", h.Len(), h)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment