diff --git a/posting/list.go b/posting/list.go index 8c0a9d0a39663ef76964e001de875dffa5805760..d7edaa4d189ecb5c4529bce40a2ff4ff333d43dd 100644 --- a/posting/list.go +++ b/posting/list.go @@ -26,10 +26,10 @@ import ( "sort" "sync" - "github.com/google/flatbuffers/go" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" + "github.com/google/flatbuffers/go" linked "container/list" ) @@ -597,18 +597,20 @@ func (l *List) CommitIfDirty() error { // This is a blocking function. It would block when the channel buffer capacity // has been reached. -func (l *List) StreamUids(ch chan uint64) { +func (l *List) GetUids() []uint64 { l.mutex.RLock() defer l.mutex.RUnlock() + result := make([]uint64, l.length()) + result = result[:0] var p types.Posting for i := 0; i < l.length(); i++ { if ok := l.get(&p, i); !ok || p.Uid() == math.MaxUint64 { break } - ch <- p.Uid() + result = append(result, p.Uid()) } - close(ch) + return result } func (l *List) Value() (result []byte, rerr error) { diff --git a/posting/worker.go b/posting/worker.go index acee33c971be7c6df33cf27e6746057b63cb1765..7e8e8bab1dfbf049ffda1738d15e7935efb1cf20 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -1,12 +1,11 @@ package posting import ( - "container/heap" - - "github.com/google/flatbuffers/go" "github.com/dgraph-io/dgraph/task" + "github.com/google/flatbuffers/go" ) +/* type elem struct { Uid uint64 Chidx int // channel index @@ -27,7 +26,9 @@ func (h *elemHeap) Pop() interface{} { *h = old[0 : n-1] return x } +*/ +/* func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT { // Invert the sorted uids to maintain same order in flatbuffers. task.ResultStartUidsVector(b, len(sorted)) @@ -36,6 +37,20 @@ func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT { } return b.EndVector(len(sorted)) } +*/ + +func uidlistOffset(b *flatbuffers.Builder, + sorted []uint64) flatbuffers.UOffsetT { + + task.UidListStartUidsVector(b, len(sorted)) + for i := len(sorted) - 1; i >= 0; i-- { + b.PrependUint64(sorted[i]) + } + ulist := b.EndVector(len(sorted)) + task.UidListStart(b) + task.UidListAddUids(b, ulist) + return task.UidListEnd(b) +} func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) @@ -43,9 +58,9 @@ func ProcessTask(query []byte) (result []byte, rerr error) { q.Init(query, uo) b := flatbuffers.NewBuilder(0) - var voffsets []flatbuffers.UOffsetT + voffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) + uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength()) - var channels []chan uint64 attr := string(q.Attr()) for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) @@ -60,11 +75,10 @@ func ProcessTask(query []byte) (result []byte, rerr error) { valoffset = b.CreateByteVector(val) } task.ValueAddVal(b, valoffset) - voffsets = append(voffsets, task.ValueEnd(b)) + voffsets[i] = task.ValueEnd(b) - ch := make(chan uint64, 1000) - go pl.StreamUids(ch) - channels = append(channels, ch) + ulist := pl.GetUids() + uoffsets[i] = uidlistOffset(b, ulist) } task.ResultStartValuesVector(b, len(voffsets)) for i := len(voffsets) - 1; i >= 0; i-- { @@ -72,44 +86,15 @@ func ProcessTask(query []byte) (result []byte, rerr error) { } valuesVent := b.EndVector(len(voffsets)) - h := &elemHeap{} - heap.Init(h) - for i, ch := range channels { - e := elem{Chidx: i} - if uid, ok := <-ch; ok { - e.Uid = uid - heap.Push(h, e) - } - } - - var last uint64 - var ruids []uint64 - last = 0 - for h.Len() > 0 { - // Pick the minimum uid. - me := (*h)[0] - if me.Uid != last { - // We're iterating over sorted streams of uint64s. Avoid adding duplicates. - ruids = append(ruids, me.Uid) - last = me.Uid - } - - // Now pick the next element from the channel which had the min uid. - ch := channels[me.Chidx] - if uid, ok := <-ch; !ok { - heap.Pop(h) - - } else { - me.Uid = uid - (*h)[0] = me - heap.Fix(h, 0) // Faster than Pop() followed by Push(). - } + task.ResultStartUidmatrixVector(b, len(uoffsets)) + for i := len(uoffsets) - 1; i >= 0; i-- { + b.PrependUOffsetT(uoffsets[i]) } - uidsVend := addUids(b, ruids) + matrixVent := b.EndVector(len(uoffsets)) task.ResultStart(b) task.ResultAddValues(b, valuesVent) - task.ResultAddUids(b, uidsVend) + task.ResultAddUidmatrix(b, matrixVent) rend := task.ResultEnd(b) b.Finish(rend) return b.Bytes[b.Head():], nil diff --git a/posting/worker_test.go b/posting/worker_test.go index 6412feaef2d022204d87c45742e0efa3858cd596..466b9dfaee5344d13f2e69a682bf3970e7b5305b 100644 --- a/posting/worker_test.go +++ b/posting/worker_test.go @@ -1,71 +1,41 @@ package posting import ( - "container/heap" + "fmt" "os" "testing" "time" "github.com/Sirupsen/logrus" - "github.com/google/flatbuffers/go" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" + "github.com/google/flatbuffers/go" ) -func TestPush(t *testing.T) { - h := &elemHeap{} - heap.Init(h) - - e := elem{Uid: 5} - heap.Push(h, e) - e.Uid = 3 - heap.Push(h, e) - e.Uid = 4 - heap.Push(h, e) - - if h.Len() != 3 { - t.Errorf("Expected len 3. Found: %v", h.Len()) - } - if (*h)[0].Uid != 3 { - t.Errorf("Expected min 3. Found: %+v", (*h)[0]) - } - e.Uid = 10 - (*h)[0] = e - heap.Fix(h, 0) - if (*h)[0].Uid != 4 { - t.Errorf("Expected min 4. Found: %+v", (*h)[0]) - } - e.Uid = 11 - (*h)[0] = e - heap.Fix(h, 0) - if (*h)[0].Uid != 5 { - t.Errorf("Expected min 5. Found: %+v", (*h)[0]) - } - - e = heap.Pop(h).(elem) - if e.Uid != 5 { - t.Errorf("Expected min 5. Found %+v", e) +func addEdge(t *testing.T, edge x.DirectedEdge, l *List) { + if err := l.AddMutation(edge, Set); err != nil { + t.Error(err) } +} - e = heap.Pop(h).(elem) - if e.Uid != 10 { - t.Errorf("Expected min 10. Found: %+v", e) - } - e = heap.Pop(h).(elem) - if e.Uid != 11 { - t.Errorf("Expected min 11. Found: %+v", e) +func check(r *task.Result, idx int, expected []uint64) error { + var m task.UidList + if ok := r.Uidmatrix(&m, idx); !ok { + return fmt.Errorf("Unable to retrieve uidlist") } - if h.Len() != 0 { - t.Errorf("Expected len 0. Found: %v, values: %+v", h.Len(), h) + if m.UidsLength() != len(expected) { + return fmt.Errorf("Expected length: %v. Got: %v", + len(expected), m.UidsLength()) } -} - -func addEdge(t *testing.T, edge x.DirectedEdge, l *List) { - if err := l.AddMutation(edge, Set); err != nil { - t.Error(err) + for i, uid := range expected { + if m.Uids(i) != uid { + return fmt.Errorf("Uid mismatch at index: %v. Expected: %v. Got: %v", + i, uid, m.Uids(i)) + } } + return nil } func TestProcessTask(t *testing.T) { @@ -114,25 +84,22 @@ func TestProcessTask(t *testing.T) { r := new(task.Result) r.Init(result, ro) - if r.UidsLength() != 4 { - t.Errorf("Expected 4. Got uids length: %v", r.UidsLength()) - } - if r.Uids(0) != 23 { - t.Errorf("Expected 23. Got: %v", r.Uids(0)) + if r.UidmatrixLength() != 3 { + t.Errorf("Expected 3. Got uidmatrix length: %v", r.UidmatrixLength()) } - if r.Uids(1) != 25 { - t.Errorf("Expected 25. Got: %v", r.Uids(0)) + if err := check(r, 0, []uint64{23, 31}); err != nil { + t.Error(err) } - if r.Uids(2) != 26 { - t.Errorf("Expected 26. Got: %v", r.Uids(0)) + if err := check(r, 1, []uint64{23}); err != nil { + t.Error(err) } - if r.Uids(3) != 31 { - t.Errorf("Expected 31. Got: %v", r.Uids(0)) + if err := check(r, 2, []uint64{23, 25, 26, 31}); err != nil { + t.Error(err) } + if r.ValuesLength() != 3 { t.Errorf("Expected 3. Got values length: %v", r.ValuesLength()) } - var tval task.Value if ok := r.Values(&tval, 0); !ok { t.Errorf("Unable to retrieve value") @@ -141,11 +108,17 @@ func TestProcessTask(t *testing.T) { tval.ValBytes()[0] != 0x00 { t.Errorf("Invalid byte value at index 0") } + if ok := r.Values(&tval, 1); !ok { + t.Errorf("Unable to retrieve value") + } + if tval.ValLength() != 1 || + tval.ValBytes()[0] != 0x00 { + t.Errorf("Invalid byte value at index 0") + } if ok := r.Values(&tval, 2); !ok { t.Errorf("Unable to retrieve value") } - var v string if err := ParseValue(&v, tval.ValBytes()); err != nil { t.Error(err) diff --git a/task.fbs b/task.fbs index 0e1768d70673004620190507e340c25fdcf0eb09..39e877cff06b507f36cf5185f73bb788384080b3 100644 --- a/task.fbs +++ b/task.fbs @@ -9,7 +9,11 @@ table Value { val:[ubyte]; } -table Result { +table UidList { uids:[ulong]; +} + +table Result { + uidmatrix:[UidList]; values:[Value]; } diff --git a/task/Result.go b/task/Result.go index 0d834a8108d48f29bd319e89fc8104d8bf315af8..98dff807096d2126c8865b3902bc50a95918a3e0 100644 --- a/task/Result.go +++ b/task/Result.go @@ -14,16 +14,22 @@ func (rcv *Result) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Pos = i } -func (rcv *Result) Uids(j int) uint64 { +func (rcv *Result) Uidmatrix(obj *UidList, j int) bool { o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8)) + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + if obj == nil { + obj = new(UidList) } - return 0 + obj.Init(rcv._tab.Bytes, x) + return true + } + return false } -func (rcv *Result) UidsLength() int { +func (rcv *Result) UidmatrixLength() int { o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) if o != 0 { return rcv._tab.VectorLen(o) @@ -55,8 +61,8 @@ func (rcv *Result) ValuesLength() int { } func ResultStart(builder *flatbuffers.Builder) { builder.StartObject(2) } -func ResultAddUids(builder *flatbuffers.Builder, uids flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uids), 0) } -func ResultStartUidsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8) +func ResultAddUidmatrix(builder *flatbuffers.Builder, uidmatrix flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uidmatrix), 0) } +func ResultStartUidmatrixVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) } func ResultAddValues(builder *flatbuffers.Builder, values flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(values), 0) } func ResultStartValuesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) diff --git a/task/UidList.go b/task/UidList.go new file mode 100644 index 0000000000000000000000000000000000000000..88d520f9412ca4e8f1b250a73c80b257a42dc626 --- /dev/null +++ b/task/UidList.go @@ -0,0 +1,38 @@ +// automatically generated, do not modify + +package task + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) +type UidList struct { + _tab flatbuffers.Table +} + +func (rcv *UidList) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *UidList) Uids(j int) uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8)) + } + return 0 +} + +func (rcv *UidList) UidsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func UidListStart(builder *flatbuffers.Builder) { builder.StartObject(1) } +func UidListAddUids(builder *flatbuffers.Builder, uids flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uids), 0) } +func UidListStartUidsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8) +} +func UidListEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } diff --git a/x/elem.go b/x/elem.go new file mode 100644 index 0000000000000000000000000000000000000000..5b13ac2402cab34591f7685b02267dd82d9780cd --- /dev/null +++ b/x/elem.go @@ -0,0 +1,38 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package x + +type elem struct { + Uid uint64 + Chidx int // channel index +} + +type Uint64Heap []elem + +func (h Uint64Heap) Len() int { return len(h) } +func (h Uint64Heap) Less(i, j int) bool { return h[i].Uid < h[j].Uid } +func (h Uint64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *Uint64Heap) Push(x interface{}) { + *h = append(*h, x.(elem)) +} +func (h *Uint64Heap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/x/elem_test.go b/x/elem_test.go new file mode 100644 index 0000000000000000000000000000000000000000..32a5ffb3eb079f2ef85a5129d3d4481e32c8e2b6 --- /dev/null +++ b/x/elem_test.go @@ -0,0 +1,55 @@ +package x + +import ( + "container/heap" + "testing" +) + +func TestPush(t *testing.T) { + h := &Uint64Heap{} + heap.Init(h) + + e := elem{Uid: 5} + heap.Push(h, e) + e.Uid = 3 + heap.Push(h, e) + e.Uid = 4 + heap.Push(h, e) + + if h.Len() != 3 { + t.Errorf("Expected len 3. Found: %v", h.Len()) + } + if (*h)[0].Uid != 3 { + t.Errorf("Expected min 3. Found: %+v", (*h)[0]) + } + e.Uid = 10 + (*h)[0] = e + heap.Fix(h, 0) + if (*h)[0].Uid != 4 { + t.Errorf("Expected min 4. Found: %+v", (*h)[0]) + } + e.Uid = 11 + (*h)[0] = e + heap.Fix(h, 0) + if (*h)[0].Uid != 5 { + t.Errorf("Expected min 5. Found: %+v", (*h)[0]) + } + + e = heap.Pop(h).(elem) + if e.Uid != 5 { + t.Errorf("Expected min 5. Found %+v", e) + } + + e = heap.Pop(h).(elem) + if e.Uid != 10 { + t.Errorf("Expected min 10. Found: %+v", e) + } + e = heap.Pop(h).(elem) + if e.Uid != 11 { + t.Errorf("Expected min 11. Found: %+v", e) + } + + if h.Len() != 0 { + t.Errorf("Expected len 0. Found: %v, values: %+v", h.Len(), h) + } +}