diff --git a/posting/list.go b/posting/list.go index d7edaa4d189ecb5c4529bce40a2ff4ff333d43dd..c4e1830ed996281a3018985d12d0184aaadf3e15 100644 --- a/posting/list.go +++ b/posting/list.go @@ -595,8 +595,6 @@ func (l *List) CommitIfDirty() error { return nil } -// This is a blocking function. It would block when the channel buffer capacity -// has been reached. func (l *List) GetUids() []uint64 { l.mutex.RLock() defer l.mutex.RUnlock() diff --git a/posting/worker.go b/posting/worker.go index 7e8e8bab1dfbf049ffda1738d15e7935efb1cf20..8f39f2c1bcb31c3f5747c44e7f4cefc1fa721d25 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -2,56 +2,10 @@ package posting import ( "github.com/dgraph-io/dgraph/task" + "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -/* -type elem struct { - Uid uint64 - Chidx int // channel index -} - -type elemHeap []elem - -func (h elemHeap) Len() int { return len(h) } -func (h elemHeap) Less(i, j int) bool { return h[i].Uid < h[j].Uid } -func (h elemHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *elemHeap) Push(x interface{}) { - *h = append(*h, x.(elem)) -} -func (h *elemHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} -*/ - -/* -func addUids(b *flatbuffers.Builder, sorted []uint64) flatbuffers.UOffsetT { - // Invert the sorted uids to maintain same order in flatbuffers. - task.ResultStartUidsVector(b, len(sorted)) - for i := len(sorted) - 1; i >= 0; i-- { - b.PrependUint64(sorted[i]) - } - return b.EndVector(len(sorted)) -} -*/ - -func uidlistOffset(b *flatbuffers.Builder, - sorted []uint64) flatbuffers.UOffsetT { - - task.UidListStartUidsVector(b, len(sorted)) - for i := len(sorted) - 1; i >= 0; i-- { - b.PrependUint64(sorted[i]) - } - ulist := b.EndVector(len(sorted)) - task.UidListStart(b) - task.UidListAddUids(b, ulist) - return task.UidListEnd(b) -} - func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) @@ -78,7 +32,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { voffsets[i] = task.ValueEnd(b) ulist := pl.GetUids() - uoffsets[i] = uidlistOffset(b, ulist) + uoffsets[i] = x.UidlistOffset(b, ulist) } task.ResultStartValuesVector(b, len(voffsets)) for i := len(voffsets) - 1; i >= 0; i-- { diff --git a/query/query.go b/query/query.go index c4aed9287f8f3712dd6f4b5dfec1e5215cd93144..4aff167d226c2a8c54dc10972a73e285f8140366 100644 --- a/query/query.go +++ b/query/query.go @@ -17,8 +17,10 @@ package query import ( + "container/heap" "fmt" + "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" @@ -79,7 +81,7 @@ import ( * Return errors, if any. */ -var log = x.Log("query") +var glog = x.Log("query") // SubGraph is the way to represent data internally. It contains both the // query and the response. Once generated, this can then be encoded to other @@ -92,6 +94,7 @@ type SubGraph struct { result []byte } +/* func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) { var l []interface{} for i := 0; i < r.UidsLength(); i++ { @@ -118,7 +121,9 @@ func getChildren(r *task.Result, sg *SubGraph) (result interface{}, rerr error) } } } +*/ +/* func processChild(result *[]map[string]interface{}, g *SubGraph) error { ro := flatbuffers.GetUOffsetT(g.result) r := new(task.Result) @@ -152,7 +157,9 @@ func processChild(result *[]map[string]interface{}, g *SubGraph) error { } } } +*/ +/* func (sg SubGraph) ToJson() (result []byte, rerr error) { ro := flatbuffers.GetUOffsetT(sg.result) r := new(task.Result) @@ -162,6 +169,7 @@ func (sg SubGraph) ToJson() (result []byte, rerr error) { rlist[i]["uid"] = r.Uids(i) } } +*/ func NewGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, @@ -169,45 +177,49 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { if len(exid) > 0 { u, err := uid.GetOrAssign(exid) if err != nil { - x.Err(log, err).WithField("xid", exid).Error( + x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") return nil, err } - log.WithField("xid", exid).WithField("uid", u).Debug("GetOrAssign") + glog.WithField("xid", exid).WithField("uid", u).Debug("GetOrAssign") euid = u } if euid == 0 { err := fmt.Errorf("Query internal id is zero") - x.Err(log, err).Error("Invalid query") + x.Err(glog, err).Error("Invalid query") return nil, err } // Encode uid into result flatbuffer. b := flatbuffers.NewBuilder(0) - task.ResultStartUidsVector(b, 1) - b.PrependUint64(euid) - vend := b.EndVector(1) + omatrix := x.UidlistOffset(b, []uint64{euid}) + + task.ResultStartUidmatrixVector(b, 1) + b.PrependUOffsetT(omatrix) + mend := b.EndVector(1) + task.ResultStart(b) - task.ResultAddUids(b, vend) + task.ResultAddUidmatrix(b, mend) rend := task.ResultEnd(b) b.Finish(rend) sg := new(SubGraph) + sg.Attr = "_root_" sg.result = b.Bytes[b.Head():] return sg, nil } -func createTaskQuery(attr string, r *task.Result) []byte { +// createTaskQuery generates the query buffer. +func createTaskQuery(attr string, sorted []uint64) []byte { b := flatbuffers.NewBuilder(0) ao := b.CreateString(attr) - task.QueryStartUidsVector(b, r.UidsLength()) - for i := r.UidsLength() - 1; i >= 0; i-- { - uid := r.Uids(i) - b.PrependUint64(uid) + task.QueryStartUidsVector(b, len(sorted)) + for i := len(sorted) - 1; i >= 0; i-- { + b.PrependUint64(sorted[i]) } - vend := b.EndVector(r.UidsLength()) + vend := b.EndVector(len(sorted)) task.QueryStart(b) task.QueryAddAttr(b, ao) @@ -217,12 +229,69 @@ func createTaskQuery(attr string, r *task.Result) []byte { return b.Bytes[b.Head():] } +type ListChannel struct { + TList *task.UidList + Idx int +} + +func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { + // Let's serialize the matrix of uids in result to a + // sorted unique list of uids. + h := &x.Uint64Heap{} + heap.Init(h) + + channels := make([]*ListChannel, r.UidmatrixLength()) + for i := 0; i < r.UidmatrixLength(); i++ { + tlist := new(task.UidList) + if ok := r.Uidmatrix(tlist, i); !ok { + return sorted, fmt.Errorf("While parsing Uidmatrix") + } + if tlist.UidsLength() > 0 { + e := x.Elem{ + Uid: tlist.Uids(0), + Idx: i, + } + heap.Push(h, e) + } + channels[i] = &ListChannel{TList: tlist, Idx: 1} + } + + // The resulting list of uids will be stored here. + sorted = make([]uint64, 100) + sorted = sorted[:0] + + var last uint64 + last = 0 + // Itearate over the heap. + for h.Len() > 0 { + me := (*h)[0] // Peek at the top element in heap. + if me.Uid != last { + sorted = append(sorted, me.Uid) // Add if unique. + last = me.Uid + } + lc := channels[me.Idx] + if lc.Idx >= lc.TList.UidsLength() { + heap.Pop(h) + + } else { + uid := lc.TList.Uids(lc.Idx) + lc.Idx += 1 + + me.Uid = uid + (*h)[0] = me + heap.Fix(h, 0) // Faster than Pop() followed by Push(). + } + } + return sorted, nil +} + func ProcessGraph(sg *SubGraph, rch chan error) { var err error if len(sg.query) > 0 { // This task execution would go over the wire in later versions. sg.result, err = posting.ProcessTask(sg.query) if err != nil { + x.Err(glog, err).Error("While processing task.") rch <- err return } @@ -231,10 +300,21 @@ func ProcessGraph(sg *SubGraph, rch chan error) { uo := flatbuffers.GetUOffsetT(sg.result) r := new(task.Result) r.Init(sg.result, uo) - if r.UidsLength() == 0 { + + sorted, err := sortedUniqueUids(r) + if err != nil { + x.Err(glog, err).Error("While processing task.") + rch <- err + return + } + + if len(sorted) == 0 { // Looks like we're done here. if len(sg.Children) > 0 { - log.Debug("Have some children but no results. Life got cut short early.") + glog.Debugf("Have some children but no results. Life got cut short early."+ + "Current attribute: %q", sg.Attr) + } else { + glog.Debugf("No more things to process for Attr: %v", sg.Attr) } rch <- nil return @@ -247,14 +327,20 @@ func ProcessGraph(sg *SubGraph, rch chan error) { childchan := make(chan error, len(sg.Children)) for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] - child.query = createTaskQuery(child.Attr, r) + child.query = createTaskQuery(child.Attr, sorted) go ProcessGraph(child, childchan) } // Now get all the results back. for i := 0; i < len(sg.Children); i++ { err = <-childchan + glog.WithFields(logrus.Fields{ + "num_children": len(sg.Children), + "index": i, + "attr": sg.Children[i].Attr, + }).Debug("Reply from child") if err != nil { + x.Err(glog, err).Error("While processing child task.") rch <- err return } diff --git a/query/query_test.go b/query/query_test.go index 9a6e26d134c358882eeffea25011e2ea1535260f..0ee5018a7e55d3c14bb12418550ccf217271ada9 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -23,11 +23,11 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/google/flatbuffers/go" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" + "github.com/google/flatbuffers/go" ) func setErr(err *error, nerr error) { @@ -155,14 +155,48 @@ func checkSingleValue(t *testing.T, child *SubGraph, r := new(task.Result) r.Init(child.result, uo) if r.ValuesLength() != 1 { - t.Error("Expected value length 1. Got: %v", r.ValuesLength()) + t.Errorf("Expected value length 1. Got: %v", r.ValuesLength()) + } + if r.UidmatrixLength() != 1 { + t.Errorf("Expected uidmatrix length 1. Got: %v", r.UidmatrixLength()) } - if r.UidsLength() != 0 { - t.Error("Expected uids length 0. Got: %v", r.UidsLength()) + var ul task.UidList + if ok := r.Uidmatrix(&ul, 0); !ok { + t.Errorf("While parsing uidlist") + } + + if ul.UidsLength() != 0 { + t.Error("Expected uids length 0. Got: %v", ul.UidsLength()) } checkName(t, r, 0, value) } +func TestNewGraph(t *testing.T) { + var ex uint64 + ex = 101 + sg, err := NewGraph(ex, "") + if err != nil { + t.Error(err) + } + + uo := flatbuffers.GetUOffsetT(sg.result) + r := new(task.Result) + r.Init(sg.result, uo) + if r.UidmatrixLength() != 1 { + t.Errorf("Expected length 1. Got: %v", r.UidmatrixLength()) + } + var ul task.UidList + if ok := r.Uidmatrix(&ul, 0); !ok { + t.Errorf("Unable to parse uidlist at index 0") + } + if ul.UidsLength() != 1 { + t.Errorf("Expected length 1. Got: %v", ul.UidsLength()) + } + if ul.Uids(0) != ex { + t.Errorf("Expected uid: %v. Got: %v", ex, ul.Uids(0)) + } +} + func TestProcessGraph(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) @@ -262,16 +296,25 @@ func TestProcessGraph(t *testing.T) { } if len(child.result) == 0 { t.Errorf("Expected some result.") + return } uo := flatbuffers.GetUOffsetT(child.result) r := new(task.Result) r.Init(child.result, uo) - if r.UidsLength() != 5 { - t.Errorf("Expected 5 friends. Got: %v", r.UidsLength()) + if r.UidmatrixLength() != 1 { + t.Errorf("Expected 1 matrix. Got: %v", r.UidmatrixLength()) + } + var ul task.UidList + if ok := r.Uidmatrix(&ul, 0); !ok { + t.Errorf("While parsing uidlist") + } + + if ul.UidsLength() != 5 { + t.Errorf("Expected 5 friends. Got: %v", ul.UidsLength()) } - if r.Uids(0) != 23 || r.Uids(1) != 24 || r.Uids(2) != 25 || - r.Uids(3) != 31 || r.Uids(4) != 101 { + if ul.Uids(0) != 23 || ul.Uids(1) != 24 || ul.Uids(2) != 25 || + ul.Uids(3) != 31 || ul.Uids(4) != 101 { t.Errorf("Friend ids don't match") } if len(child.Children) != 1 || child.Children[0].Attr != "name" { diff --git a/x/elem.go b/x/heap.go similarity index 89% rename from x/elem.go rename to x/heap.go index 5b13ac2402cab34591f7685b02267dd82d9780cd..384ecc0b8099d663a9277f7d11695d7914f94d86 100644 --- a/x/elem.go +++ b/x/heap.go @@ -16,18 +16,18 @@ package x -type elem struct { - Uid uint64 - Chidx int // channel index +type Elem struct { + Uid uint64 + Idx int // channel index } -type Uint64Heap []elem +type Uint64Heap []Elem func (h Uint64Heap) Len() int { return len(h) } func (h Uint64Heap) Less(i, j int) bool { return h[i].Uid < h[j].Uid } func (h Uint64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *Uint64Heap) Push(x interface{}) { - *h = append(*h, x.(elem)) + *h = append(*h, x.(Elem)) } func (h *Uint64Heap) Pop() interface{} { old := *h diff --git a/x/elem_test.go b/x/heap_test.go similarity index 90% rename from x/elem_test.go rename to x/heap_test.go index 32a5ffb3eb079f2ef85a5129d3d4481e32c8e2b6..6f9241e08e9234aedfadf6fb713dd4bf24967cdd 100644 --- a/x/elem_test.go +++ b/x/heap_test.go @@ -9,7 +9,7 @@ func TestPush(t *testing.T) { h := &Uint64Heap{} heap.Init(h) - e := elem{Uid: 5} + e := Elem{Uid: 5} heap.Push(h, e) e.Uid = 3 heap.Push(h, e) @@ -35,16 +35,16 @@ func TestPush(t *testing.T) { t.Errorf("Expected min 5. Found: %+v", (*h)[0]) } - e = heap.Pop(h).(elem) + e = heap.Pop(h).(Elem) if e.Uid != 5 { t.Errorf("Expected min 5. Found %+v", e) } - e = heap.Pop(h).(elem) + e = heap.Pop(h).(Elem) if e.Uid != 10 { t.Errorf("Expected min 10. Found: %+v", e) } - e = heap.Pop(h).(elem) + e = heap.Pop(h).(Elem) if e.Uid != 11 { t.Errorf("Expected min 11. Found: %+v", e) } diff --git a/x/x.go b/x/x.go index 5873177eb281122a83c966b73aa79e0af3a7ef4a..3ab6e3b04656e423f8989feb25b183a5c720d40d 100644 --- a/x/x.go +++ b/x/x.go @@ -7,6 +7,8 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/task" + "github.com/google/flatbuffers/go" ) const ( @@ -75,3 +77,16 @@ func ParseRequest(w http.ResponseWriter, r *http.Request, data interface{}) bool } return true } + +func UidlistOffset(b *flatbuffers.Builder, + sorted []uint64) flatbuffers.UOffsetT { + + task.UidListStartUidsVector(b, len(sorted)) + for i := len(sorted) - 1; i >= 0; i-- { + b.PrependUint64(sorted[i]) + } + ulist := b.EndVector(len(sorted)) + task.UidListStart(b) + task.UidListAddUids(b, ulist) + return task.UidListEnd(b) +}