Skip to content
Snippets Groups Projects
Commit ecdb53ab authored by Manish R Jain's avatar Manish R Jain
Browse files

Make list matrix work, yeah! This would serialize uids for the next query,...

Make list matrix work, yeah! This would serialize uids for the next query, which would then again return a matrix, so on and so forth.
parent aeb72365
No related branches found
No related tags found
No related merge requests found
...@@ -595,8 +595,6 @@ func (l *List) CommitIfDirty() error { ...@@ -595,8 +595,6 @@ func (l *List) CommitIfDirty() error {
return nil return nil
} }
// This is a blocking function. It would block when the channel buffer capacity
// has been reached.
func (l *List) GetUids() []uint64 { func (l *List) GetUids() []uint64 {
l.mutex.RLock() l.mutex.RLock()
defer l.mutex.RUnlock() defer l.mutex.RUnlock()
......
...@@ -2,56 +2,10 @@ package posting ...@@ -2,56 +2,10 @@ package posting
import ( import (
"github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go" "github.com/google/flatbuffers/go"
) )
/*
type elem struct {
Uid uint64
Chidx int // channel index
}
type elemHeap []elem
func (h elemHeap) Len() int { return len(h) }
func (h elemHeap) Less(i, j int) bool { return h[i].Uid < h[j].Uid }
func (h elemHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *elemHeap) Push(x interface{}) {
*h = append(*h, x.(elem))
}
func (h *elemHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
*/
/*
func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT {
// Invert the sorted uids to maintain same order in flatbuffers.
task.ResultStartUidsVector(b, len(sorted))
for i := len(sorted) - 1; i >= 0; i-- {
b.PrependUint64(sorted[i])
}
return b.EndVector(len(sorted))
}
*/
func uidlistOffset(b *flatbuffers.Builder,
sorted []uint64) flatbuffers.UOffsetT {
task.UidListStartUidsVector(b, len(sorted))
for i := len(sorted) - 1; i >= 0; i-- {
b.PrependUint64(sorted[i])
}
ulist := b.EndVector(len(sorted))
task.UidListStart(b)
task.UidListAddUids(b, ulist)
return task.UidListEnd(b)
}
func ProcessTask(query []byte) (result []byte, rerr error) { func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query) uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query) q := new(task.Query)
...@@ -78,7 +32,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { ...@@ -78,7 +32,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
voffsets[i] = task.ValueEnd(b) voffsets[i] = task.ValueEnd(b)
ulist := pl.GetUids() ulist := pl.GetUids()
uoffsets[i] = uidlistOffset(b, ulist) uoffsets[i] = x.UidlistOffset(b, ulist)
} }
task.ResultStartValuesVector(b, len(voffsets)) task.ResultStartValuesVector(b, len(voffsets))
for i := len(voffsets) - 1; i >= 0; i-- { for i := len(voffsets) - 1; i >= 0; i-- {
......
...@@ -17,8 +17,10 @@ ...@@ -17,8 +17,10 @@
package query package query
import ( import (
"container/heap"
"fmt" "fmt"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/uid"
...@@ -79,7 +81,7 @@ import ( ...@@ -79,7 +81,7 @@ import (
* Return errors, if any. * Return errors, if any.
*/ */
var log = x.Log("query") var glog = x.Log("query")
// SubGraph is the way to represent data internally. It contains both the // SubGraph is the way to represent data internally. It contains both the
// query and the response. Once generated, this can then be encoded to other // query and the response. Once generated, this can then be encoded to other
...@@ -92,6 +94,7 @@ type SubGraph struct { ...@@ -92,6 +94,7 @@ type SubGraph struct {
result []byte result []byte
} }
/*
func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) { func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) {
var l []interface{} var l []interface{}
for i := 0; i < r.UidsLength(); i++ { for i := 0; i < r.UidsLength(); i++ {
...@@ -118,7 +121,9 @@ func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) ...@@ -118,7 +121,9 @@ func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error)
} }
} }
} }
*/
/*
func processChild(result *[]map[string]interface{}, g *SubGraph) error { func processChild(result *[]map[string]interface{}, g *SubGraph) error {
ro := flatbuffers.GetUOffsetT(g.result) ro := flatbuffers.GetUOffsetT(g.result)
r := new(task.Result) r := new(task.Result)
...@@ -152,7 +157,9 @@ func processChild(result *[]map[string]interface{}, g *SubGraph) error { ...@@ -152,7 +157,9 @@ func processChild(result *[]map[string]interface{}, g *SubGraph) error {
} }
} }
} }
*/
/*
func (sg SubGraph) ToJson() (result []byte, rerr error) { func (sg SubGraph) ToJson() (result []byte, rerr error) {
ro := flatbuffers.GetUOffsetT(sg.result) ro := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result) r := new(task.Result)
...@@ -162,6 +169,7 @@ func (sg SubGraph) ToJson() (result []byte, rerr error) { ...@@ -162,6 +169,7 @@ func (sg SubGraph) ToJson() (result []byte, rerr error) {
rlist[i]["uid"] = r.Uids(i) rlist[i]["uid"] = r.Uids(i)
} }
} }
*/
func NewGraph(euid uint64, exid string) (*SubGraph, error) { func NewGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph, // This would set the Result field in SubGraph,
...@@ -169,45 +177,49 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { ...@@ -169,45 +177,49 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
if len(exid) > 0 { if len(exid) > 0 {
u, err := uid.GetOrAssign(exid) u, err := uid.GetOrAssign(exid)
if err != nil { if err != nil {
x.Err(log, err).WithField("xid", exid).Error( x.Err(glog, err).WithField("xid", exid).Error(
"While GetOrAssign uid from external id") "While GetOrAssign uid from external id")
return nil, err return nil, err
} }
log.WithField("xid", exid).WithField("uid", u).Debug("GetOrAssign") glog.WithField("xid", exid).WithField("uid", u).Debug("GetOrAssign")
euid = u euid = u
} }
if euid == 0 { if euid == 0 {
err := fmt.Errorf("Query internal id is zero") err := fmt.Errorf("Query internal id is zero")
x.Err(log, err).Error("Invalid query") x.Err(glog, err).Error("Invalid query")
return nil, err return nil, err
} }
// Encode uid into result flatbuffer. // Encode uid into result flatbuffer.
b := flatbuffers.NewBuilder(0) b := flatbuffers.NewBuilder(0)
task.ResultStartUidsVector(b, 1) omatrix := x.UidlistOffset(b, []uint64{euid})
b.PrependUint64(euid)
vend := b.EndVector(1) task.ResultStartUidmatrixVector(b, 1)
b.PrependUOffsetT(omatrix)
mend := b.EndVector(1)
task.ResultStart(b) task.ResultStart(b)
task.ResultAddUids(b, vend) task.ResultAddUidmatrix(b, mend)
rend := task.ResultEnd(b) rend := task.ResultEnd(b)
b.Finish(rend) b.Finish(rend)
sg := new(SubGraph) sg := new(SubGraph)
sg.Attr = "_root_"
sg.result = b.Bytes[b.Head():] sg.result = b.Bytes[b.Head():]
return sg, nil return sg, nil
} }
func createTaskQuery(attr string, r *task.Result) []byte { // createTaskQuery generates the query buffer.
func createTaskQuery(attr string, sorted []uint64) []byte {
b := flatbuffers.NewBuilder(0) b := flatbuffers.NewBuilder(0)
ao := b.CreateString(attr) ao := b.CreateString(attr)
task.QueryStartUidsVector(b, r.UidsLength()) task.QueryStartUidsVector(b, len(sorted))
for i := r.UidsLength() - 1; i >= 0; i-- { for i := len(sorted) - 1; i >= 0; i-- {
uid := r.Uids(i) b.PrependUint64(sorted[i])
b.PrependUint64(uid)
} }
vend := b.EndVector(r.UidsLength()) vend := b.EndVector(len(sorted))
task.QueryStart(b) task.QueryStart(b)
task.QueryAddAttr(b, ao) task.QueryAddAttr(b, ao)
...@@ -217,12 +229,69 @@ func createTaskQuery(attr string, r *task.Result) []byte { ...@@ -217,12 +229,69 @@ func createTaskQuery(attr string, r *task.Result) []byte {
return b.Bytes[b.Head():] return b.Bytes[b.Head():]
} }
type ListChannel struct {
TList *task.UidList
Idx int
}
func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
// Let's serialize the matrix of uids in result to a
// sorted unique list of uids.
h := &x.Uint64Heap{}
heap.Init(h)
channels := make([]*ListChannel, r.UidmatrixLength())
for i := 0; i < r.UidmatrixLength(); i++ {
tlist := new(task.UidList)
if ok := r.Uidmatrix(tlist, i); !ok {
return sorted, fmt.Errorf("While parsing Uidmatrix")
}
if tlist.UidsLength() > 0 {
e := x.Elem{
Uid: tlist.Uids(0),
Idx: i,
}
heap.Push(h, e)
}
channels[i] = &ListChannel{TList: tlist, Idx: 1}
}
// The resulting list of uids will be stored here.
sorted = make([]uint64, 100)
sorted = sorted[:0]
var last uint64
last = 0
// Itearate over the heap.
for h.Len() > 0 {
me := (*h)[0] // Peek at the top element in heap.
if me.Uid != last {
sorted = append(sorted, me.Uid) // Add if unique.
last = me.Uid
}
lc := channels[me.Idx]
if lc.Idx >= lc.TList.UidsLength() {
heap.Pop(h)
} else {
uid := lc.TList.Uids(lc.Idx)
lc.Idx += 1
me.Uid = uid
(*h)[0] = me
heap.Fix(h, 0) // Faster than Pop() followed by Push().
}
}
return sorted, nil
}
func ProcessGraph(sg *SubGraph, rch chan error) { func ProcessGraph(sg *SubGraph, rch chan error) {
var err error var err error
if len(sg.query) > 0 { if len(sg.query) > 0 {
// This task execution would go over the wire in later versions. // This task execution would go over the wire in later versions.
sg.result, err = posting.ProcessTask(sg.query) sg.result, err = posting.ProcessTask(sg.query)
if err != nil { if err != nil {
x.Err(glog, err).Error("While processing task.")
rch <- err rch <- err
return return
} }
...@@ -231,10 +300,21 @@ func ProcessGraph(sg *SubGraph, rch chan error) { ...@@ -231,10 +300,21 @@ func ProcessGraph(sg *SubGraph, rch chan error) {
uo := flatbuffers.GetUOffsetT(sg.result) uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result) r := new(task.Result)
r.Init(sg.result, uo) r.Init(sg.result, uo)
if r.UidsLength() == 0 {
sorted, err := sortedUniqueUids(r)
if err != nil {
x.Err(glog, err).Error("While processing task.")
rch <- err
return
}
if len(sorted) == 0 {
// Looks like we're done here. // Looks like we're done here.
if len(sg.Children) > 0 { if len(sg.Children) > 0 {
log.Debug("Have some children but no results. Life got cut short early.") glog.Debugf("Have some children but no results. Life got cut short early."+
"Current attribute: %q", sg.Attr)
} else {
glog.Debugf("No more things to process for Attr: %v", sg.Attr)
} }
rch <- nil rch <- nil
return return
...@@ -247,14 +327,20 @@ func ProcessGraph(sg *SubGraph, rch chan error) { ...@@ -247,14 +327,20 @@ func ProcessGraph(sg *SubGraph, rch chan error) {
childchan := make(chan error, len(sg.Children)) childchan := make(chan error, len(sg.Children))
for i := 0; i < len(sg.Children); i++ { for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i] child := sg.Children[i]
child.query = createTaskQuery(child.Attr, r) child.query = createTaskQuery(child.Attr, sorted)
go ProcessGraph(child, childchan) go ProcessGraph(child, childchan)
} }
// Now get all the results back. // Now get all the results back.
for i := 0; i < len(sg.Children); i++ { for i := 0; i < len(sg.Children); i++ {
err = <-childchan err = <-childchan
glog.WithFields(logrus.Fields{
"num_children": len(sg.Children),
"index": i,
"attr": sg.Children[i].Attr,
}).Debug("Reply from child")
if err != nil { if err != nil {
x.Err(glog, err).Error("While processing child task.")
rch <- err rch <- err
return return
} }
......
...@@ -23,11 +23,11 @@ import ( ...@@ -23,11 +23,11 @@ import (
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/google/flatbuffers/go"
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
) )
func setErr(err *error, nerr error) { func setErr(err *error, nerr error) {
...@@ -155,14 +155,48 @@ func checkSingleValue(t *testing.T, child *SubGraph, ...@@ -155,14 +155,48 @@ func checkSingleValue(t *testing.T, child *SubGraph,
r := new(task.Result) r := new(task.Result)
r.Init(child.result, uo) r.Init(child.result, uo)
if r.ValuesLength() != 1 { if r.ValuesLength() != 1 {
t.Error("Expected value length 1. Got: %v", r.ValuesLength()) t.Errorf("Expected value length 1. Got: %v", r.ValuesLength())
}
if r.UidmatrixLength() != 1 {
t.Errorf("Expected uidmatrix length 1. Got: %v", r.UidmatrixLength())
} }
if r.UidsLength() != 0 { var ul task.UidList
t.Error("Expected uids length 0. Got: %v", r.UidsLength()) if ok := r.Uidmatrix(&ul, 0); !ok {
t.Errorf("While parsing uidlist")
}
if ul.UidsLength() != 0 {
t.Error("Expected uids length 0. Got: %v", ul.UidsLength())
} }
checkName(t, r, 0, value) checkName(t, r, 0, value)
} }
func TestNewGraph(t *testing.T) {
var ex uint64
ex = 101
sg, err := NewGraph(ex, "")
if err != nil {
t.Error(err)
}
uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
r.Init(sg.result, uo)
if r.UidmatrixLength() != 1 {
t.Errorf("Expected length 1. Got: %v", r.UidmatrixLength())
}
var ul task.UidList
if ok := r.Uidmatrix(&ul, 0); !ok {
t.Errorf("Unable to parse uidlist at index 0")
}
if ul.UidsLength() != 1 {
t.Errorf("Expected length 1. Got: %v", ul.UidsLength())
}
if ul.Uids(0) != ex {
t.Errorf("Expected uid: %v. Got: %v", ex, ul.Uids(0))
}
}
func TestProcessGraph(t *testing.T) { func TestProcessGraph(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
...@@ -262,16 +296,25 @@ func TestProcessGraph(t *testing.T) { ...@@ -262,16 +296,25 @@ func TestProcessGraph(t *testing.T) {
} }
if len(child.result) == 0 { if len(child.result) == 0 {
t.Errorf("Expected some result.") t.Errorf("Expected some result.")
return
} }
uo := flatbuffers.GetUOffsetT(child.result) uo := flatbuffers.GetUOffsetT(child.result)
r := new(task.Result) r := new(task.Result)
r.Init(child.result, uo) r.Init(child.result, uo)
if r.UidsLength() != 5 { if r.UidmatrixLength() != 1 {
t.Errorf("Expected 5 friends. Got: %v", r.UidsLength()) t.Errorf("Expected 1 matrix. Got: %v", r.UidmatrixLength())
}
var ul task.UidList
if ok := r.Uidmatrix(&ul, 0); !ok {
t.Errorf("While parsing uidlist")
}
if ul.UidsLength() != 5 {
t.Errorf("Expected 5 friends. Got: %v", ul.UidsLength())
} }
if r.Uids(0) != 23 || r.Uids(1) != 24 || r.Uids(2) != 25 || if ul.Uids(0) != 23 || ul.Uids(1) != 24 || ul.Uids(2) != 25 ||
r.Uids(3) != 31 || r.Uids(4) != 101 { ul.Uids(3) != 31 || ul.Uids(4) != 101 {
t.Errorf("Friend ids don't match") t.Errorf("Friend ids don't match")
} }
if len(child.Children) != 1 || child.Children[0].Attr != "name" { if len(child.Children) != 1 || child.Children[0].Attr != "name" {
......
...@@ -16,18 +16,18 @@ ...@@ -16,18 +16,18 @@
package x package x
type elem struct { type Elem struct {
Uid uint64 Uid uint64
Chidx int // channel index Idx int // channel index
} }
type Uint64Heap []elem type Uint64Heap []Elem
func (h Uint64Heap) Len() int { return len(h) } func (h Uint64Heap) Len() int { return len(h) }
func (h Uint64Heap) Less(i, j int) bool { return h[i].Uid < h[j].Uid } func (h Uint64Heap) Less(i, j int) bool { return h[i].Uid < h[j].Uid }
func (h Uint64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h Uint64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *Uint64Heap) Push(x interface{}) { func (h *Uint64Heap) Push(x interface{}) {
*h = append(*h, x.(elem)) *h = append(*h, x.(Elem))
} }
func (h *Uint64Heap) Pop() interface{} { func (h *Uint64Heap) Pop() interface{} {
old := *h old := *h
......
...@@ -9,7 +9,7 @@ func TestPush(t *testing.T) { ...@@ -9,7 +9,7 @@ func TestPush(t *testing.T) {
h := &Uint64Heap{} h := &Uint64Heap{}
heap.Init(h) heap.Init(h)
e := elem{Uid: 5} e := Elem{Uid: 5}
heap.Push(h, e) heap.Push(h, e)
e.Uid = 3 e.Uid = 3
heap.Push(h, e) heap.Push(h, e)
...@@ -35,16 +35,16 @@ func TestPush(t *testing.T) { ...@@ -35,16 +35,16 @@ func TestPush(t *testing.T) {
t.Errorf("Expected min 5. Found: %+v", (*h)[0]) t.Errorf("Expected min 5. Found: %+v", (*h)[0])
} }
e = heap.Pop(h).(elem) e = heap.Pop(h).(Elem)
if e.Uid != 5 { if e.Uid != 5 {
t.Errorf("Expected min 5. Found %+v", e) t.Errorf("Expected min 5. Found %+v", e)
} }
e = heap.Pop(h).(elem) e = heap.Pop(h).(Elem)
if e.Uid != 10 { if e.Uid != 10 {
t.Errorf("Expected min 10. Found: %+v", e) t.Errorf("Expected min 10. Found: %+v", e)
} }
e = heap.Pop(h).(elem) e = heap.Pop(h).(Elem)
if e.Uid != 11 { if e.Uid != 11 {
t.Errorf("Expected min 11. Found: %+v", e) t.Errorf("Expected min 11. Found: %+v", e)
} }
......
...@@ -7,6 +7,8 @@ import ( ...@@ -7,6 +7,8 @@ import (
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/task"
"github.com/google/flatbuffers/go"
) )
const ( const (
...@@ -75,3 +77,16 @@ func ParseRequest(w http.ResponseWriter, r *http.Request, data interface{}) bool ...@@ -75,3 +77,16 @@ func ParseRequest(w http.ResponseWriter, r *http.Request, data interface{}) bool
} }
return true return true
} }
func UidlistOffset(b *flatbuffers.Builder,
sorted []uint64) flatbuffers.UOffsetT {
task.UidListStartUidsVector(b, len(sorted))
for i := len(sorted) - 1; i >= 0; i-- {
b.PrependUint64(sorted[i])
}
ulist := b.EndVector(len(sorted))
task.UidListStart(b)
task.UidListAddUids(b, ulist)
return task.UidListEnd(b)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment