Skip to content
Snippets Groups Projects
Unverified Commit f50fc7cc authored by Ashwin Ramesh's avatar Ashwin Ramesh
Browse files

Support shortest path queries

parent 6cea69d3
No related branches found
No related tags found
No related merge requests found
...@@ -197,6 +197,23 @@ func TestParseQueryWithMultipleVar(t *testing.T) { ...@@ -197,6 +197,23 @@ func TestParseQueryWithMultipleVar(t *testing.T) {
require.Equal(t, []string{"B"}, res.QueryVars[2].Needs) require.Equal(t, []string{"B"}, res.QueryVars[2].Needs)
} }
func TestParseShortestPath(t *testing.T) {
query := `
{
shortest(from:0x0a, to:0x0b) {
friends
name
}
}
`
res, err := Parse(query)
require.NoError(t, err)
require.NotNil(t, res.Query)
require.Equal(t, 1, len(res.Query))
require.Equal(t, "0x0a", res.Query[0].Args["from"])
require.Equal(t, "0x0b", res.Query[0].Args["to"])
}
func TestParseMultipleQueries(t *testing.T) { func TestParseMultipleQueries(t *testing.T) {
query := ` query := `
{ {
......
...@@ -41,7 +41,7 @@ import ( ...@@ -41,7 +41,7 @@ import (
func ToProtocolBuf(l *Latency, sgl []*SubGraph) ([]*graph.Node, error) { func ToProtocolBuf(l *Latency, sgl []*SubGraph) ([]*graph.Node, error) {
var resNode []*graph.Node var resNode []*graph.Node
for _, sg := range sgl { for _, sg := range sgl {
if sg.Params.Alias == "var" { if sg.Params.Alias == "var" || sg.Params.Alias == "shortest" {
continue continue
} }
node, err := sg.ToProtocolBuffer(l) node, err := sg.ToProtocolBuffer(l)
...@@ -59,7 +59,7 @@ func ToJson(l *Latency, sgl []*SubGraph, w io.Writer) error { ...@@ -59,7 +59,7 @@ func ToJson(l *Latency, sgl []*SubGraph, w io.Writer) error {
Attr: "__", Attr: "__",
} }
for _, sg := range sgl { for _, sg := range sgl {
if sg.Params.Alias == "var" { if sg.Params.Alias == "var" || sg.Params.Alias == "shortest" {
continue continue
} }
if sg.Params.isDebug { if sg.Params.isDebug {
......
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
"github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
) )
/* /*
...@@ -128,6 +129,8 @@ type params struct { ...@@ -128,6 +129,8 @@ type params struct {
NeedsVar []string NeedsVar []string
ParentVars map[string]*task.List ParentVars map[string]*task.List
Normalize bool Normalize bool
From uint64
To uint64
} }
// SubGraph is the way to represent data internally. It contains both the // SubGraph is the way to represent data internally. It contains both the
...@@ -477,6 +480,22 @@ func (args *params) fill(gq *gql.GraphQuery) error { ...@@ -477,6 +480,22 @@ func (args *params) fill(gq *gql.GraphQuery) error {
} }
args.AfterUID = uint64(after) args.AfterUID = uint64(after)
} }
if v, ok := gq.Args["from"]; ok {
from, err := strconv.ParseUint(v, 0, 64)
if err != nil {
// Treat it as an XID.
from = farm.Fingerprint64([]byte(v))
}
args.From = uint64(from)
}
if v, ok := gq.Args["to"]; ok {
to, err := strconv.ParseUint(v, 0, 64)
if err != nil {
// Treat it as an XID.
to = farm.Fingerprint64([]byte(v))
}
args.To = uint64(to)
}
if v, ok := gq.Args["first"]; ok { if v, ok := gq.Args["first"]; ok {
first, err := strconv.ParseInt(v, 0, 32) first, err := strconv.ParseInt(v, 0, 32)
if err != nil { if err != nil {
...@@ -507,7 +526,7 @@ func ToSubGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) { ...@@ -507,7 +526,7 @@ func ToSubGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) {
func newGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) { func newGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) {
// This would set the Result field in SubGraph, // This would set the Result field in SubGraph,
// and populate the children for attributes. // and populate the children for attributes.
if len(gq.UID) == 0 && gq.Func == nil && len(gq.NeedsVar) == 0 { if len(gq.UID) == 0 && gq.Func == nil && len(gq.NeedsVar) == 0 && gq.Alias != "shortest" {
err := x.Errorf("Invalid query, query internal id is zero and generator is nil") err := x.Errorf("Invalid query, query internal id is zero and generator is nil")
x.TraceError(ctx, err) x.TraceError(ctx, err)
return nil, err return nil, err
...@@ -619,7 +638,8 @@ func ProcessQuery(ctx context.Context, res gql.Result, l *Latency) ([]*SubGraph, ...@@ -619,7 +638,8 @@ func ProcessQuery(ctx context.Context, res gql.Result, l *Latency) ([]*SubGraph,
loopStart := time.Now() loopStart := time.Now()
for i := 0; i < len(res.Query); i++ { for i := 0; i < len(res.Query); i++ {
gq := res.Query[i] gq := res.Query[i]
if gq == nil || (len(gq.UID) == 0 && gq.Func == nil && len(gq.NeedsVar) == 0) { if gq == nil || (len(gq.UID) == 0 && gq.Func == nil &&
len(gq.NeedsVar) == 0 && gq.Alias != "shortest") {
continue continue
} }
sg, err := ToSubGraph(ctx, gq) sg, err := ToSubGraph(ctx, gq)
...@@ -674,14 +694,24 @@ func ProcessQuery(ctx context.Context, res gql.Result, l *Latency) ([]*SubGraph, ...@@ -674,14 +694,24 @@ func ProcessQuery(ctx context.Context, res gql.Result, l *Latency) ([]*SubGraph,
sg.recursiveFillVars(doneVars) sg.recursiveFillVars(doneVars)
hasExecuted[idx] = true hasExecuted[idx] = true
idxList = append(idxList, idx)
numQueriesDone++ numQueriesDone++
go ProcessGraph(ctx, sg, nil, errChan) idxList = append(idxList, idx)
if sg.Params.Alias == "shortest" {
err := ShortestPath(ctx, sg)
if err != nil {
return nil, err
}
} else {
go ProcessGraph(ctx, sg, nil, errChan)
}
x.Trace(ctx, "Graph processed") x.Trace(ctx, "Graph processed")
} }
// Wait for the execution that was started in this iteration. // Wait for the execution that was started in this iteration.
for i := 0; i < len(idxList); i++ { for i := 0; i < len(idxList); i++ {
if sgl[idxList[i]].Params.Alias == "shortest" {
continue
}
select { select {
case err := <-errChan: case err := <-errChan:
if err != nil { if err != nil {
...@@ -720,6 +750,10 @@ func ProcessQuery(ctx context.Context, res gql.Result, l *Latency) ([]*SubGraph, ...@@ -720,6 +750,10 @@ func ProcessQuery(ctx context.Context, res gql.Result, l *Latency) ([]*SubGraph,
// shouldCascade returns true if the query block is not self depenedent and we should // shouldCascade returns true if the query block is not self depenedent and we should
// remove the uids from the bottom up if the children are empty. // remove the uids from the bottom up if the children are empty.
func shouldCascade(res gql.Result, idx int) bool { func shouldCascade(res gql.Result, idx int) bool {
if res.Query[idx].Attr == "shortest" {
return false
}
for _, def := range res.QueryVars[idx].Defines { for _, def := range res.QueryVars[idx].Defines {
for _, need := range res.QueryVars[idx].Needs { for _, need := range res.QueryVars[idx].Needs {
if def == need { if def == need {
...@@ -730,8 +764,14 @@ func shouldCascade(res gql.Result, idx int) bool { ...@@ -730,8 +764,14 @@ func shouldCascade(res gql.Result, idx int) bool {
return true return true
} }
// TODO(Ashwin): Benchmark this function. Map implementation might be slow. // TODO(Ashwin): Benchmark this function.
func populateVarMap(sg *SubGraph, doneVars map[string]*task.List, isCascade bool) { func populateVarMap(sg *SubGraph, doneVars map[string]*task.List, isCascade bool) {
out := algo.NewWriteIterator(sg.DestUIDs, 0)
it := algo.NewListIterator(sg.DestUIDs)
i := -1
if sg.Params.Alias == "shortest" {
goto AssignStep
}
for _, child := range sg.Children { for _, child := range sg.Children {
populateVarMap(child, doneVars, isCascade) populateVarMap(child, doneVars, isCascade)
if !isCascade { if !isCascade {
...@@ -745,10 +785,6 @@ func populateVarMap(sg *SubGraph, doneVars map[string]*task.List, isCascade bool ...@@ -745,10 +785,6 @@ func populateVarMap(sg *SubGraph, doneVars map[string]*task.List, isCascade bool
} }
} }
o := new(task.List)
out := algo.NewWriteIterator(o, 0)
it := algo.NewListIterator(sg.DestUIDs)
i := -1
if !isCascade { if !isCascade {
goto AssignStep goto AssignStep
} }
...@@ -770,7 +806,6 @@ func populateVarMap(sg *SubGraph, doneVars map[string]*task.List, isCascade bool ...@@ -770,7 +806,6 @@ func populateVarMap(sg *SubGraph, doneVars map[string]*task.List, isCascade bool
} }
} }
out.End() out.End()
sg.DestUIDs = o
AssignStep: AssignStep:
if sg.Params.Var != "" { if sg.Params.Var != "" {
...@@ -1069,7 +1104,7 @@ func (sg *SubGraph) applyOrderAndPagination(ctx context.Context) error { ...@@ -1069,7 +1104,7 @@ func (sg *SubGraph) applyOrderAndPagination(ctx context.Context) error {
// isValidArg checks if arg passed is valid keyword. // isValidArg checks if arg passed is valid keyword.
func isValidArg(a string) bool { func isValidArg(a string) bool {
switch a { switch a {
case "orderasc", "orderdesc", "first", "offset", "after": case "from", "to", "orderasc", "orderdesc", "first", "offset", "after":
return true return true
} }
return false return false
......
...@@ -142,6 +142,28 @@ func populateGraph(t *testing.T) { ...@@ -142,6 +142,28 @@ func populateGraph(t *testing.T) {
addEdgeToUID(t, "friend", 31, 24) addEdgeToUID(t, "friend", 31, 24)
addEdgeToUID(t, "friend", 23, 1) addEdgeToUID(t, "friend", 23, 1)
addEdgeToUID(t, "follow", 1, 31)
addEdgeToUID(t, "follow", 1, 24)
addEdgeToUID(t, "follow", 31, 1001)
addEdgeToUID(t, "follow", 1001, 1000)
addEdgeToUID(t, "follow", 1002, 1000)
addEdgeToUID(t, "follow", 1001, 1003)
addEdgeToUID(t, "follow", 1001, 1003)
addEdgeToUID(t, "follow", 1003, 1002)
addEdgeToUID(t, "path", 1, 31)
addEdgeToUID(t, "path", 1, 24)
addEdgeToUID(t, "path", 31, 1000)
addEdgeToUID(t, "path", 1000, 1001)
addEdgeToUID(t, "path", 1000, 1002)
addEdgeToUID(t, "path", 1001, 1002)
addEdgeToUID(t, "path", 1002, 1003)
addEdgeToUID(t, "path", 1003, 1001)
addEdgeToValue(t, "name", 1000, "Alice")
addEdgeToValue(t, "name", 1001, "Bob")
addEdgeToValue(t, "name", 1002, "Matt")
addEdgeToValue(t, "name", 1003, "John")
// Now let's add a few properties for the main user. // Now let's add a few properties for the main user.
addEdgeToValue(t, "name", 1, "Michonne") addEdgeToValue(t, "name", 1, "Michonne")
addEdgeToValue(t, "gender", 1, "female") addEdgeToValue(t, "gender", 1, "female")
...@@ -484,6 +506,136 @@ func TestUseVarsFilterVarReuse3(t *testing.T) { ...@@ -484,6 +506,136 @@ func TestUseVarsFilterVarReuse3(t *testing.T) {
js) js)
} }
func TestShortestPath_NoPath(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:0x01, to:101) {
path
follow
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{}`,
js)
}
func TestShortestPath(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:0x01, to:31) {
friend
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{"me":[{"name":"Michonne"},{"name":"Andrea"}]}`,
js)
}
func TestShortestPath2(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:0x01, to:1000) {
path
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{"me":[{"name":"Michonne"},{"name":"Andrea"},{"name":"Alice"}]}`,
js)
}
func TestShortestPath3(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:1, to:1003) {
path
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{"me":[{"name":"Michonne"},{"name":"Andrea"},{"name":"Alice"},{"name":"Matt"},{"name":"John"}]}`,
js)
}
func TestShortestPath4(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:1, to:1003) {
path
follow
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{"me":[{"name":"Michonne"},{"name":"Andrea"},{"name":"Bob"},{"name":"John"}]}`,
js)
}
func TestShortestPath_filter(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:1, to:1002) {
path @filter(not anyof(name, "alice"))
follow
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{"me":[{"name":"Michonne"},{"name":"Andrea"},{"name":"Bob"},{"name":"Matt"}]}`,
js)
}
func TestShortestPath_filter2(t *testing.T) {
populateGraph(t)
query := `
{
A as shortest(from:1, to:1002) {
path @filter(not anyof(name, "alice"))
follow @filter(not anyof(name, "bob"))
}
me(var: A) {
name
}
}`
js := processToFastJSON(t, query)
require.JSONEq(t,
`{}`,
js)
}
func TestUseVarsFilterMultiId(t *testing.T) { func TestUseVarsFilterMultiId(t *testing.T) {
populateGraph(t) populateGraph(t)
query := ` query := `
......
package query
import (
"container/heap"
"context"
"github.com/dgraph-io/dgraph/algo"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
)
type Item struct {
uid uint64 // uid of the node.
cost float64 // cost of taking the path till this uid.
hop int // number of hops taken to reach this node.
index int
}
var ErrStop = x.Errorf("STOP")
var ErrTooBig = x.Errorf("Query exceeded memory limit")
type priorityQueue []*Item
func (h priorityQueue) Len() int { return len(h) }
func (h priorityQueue) Less(i, j int) bool { return h[i].cost < h[j].cost }
func (h priorityQueue) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *priorityQueue) Push(x interface{}) {
n := len(*h)
item := x.(*Item)
item.index = n
*h = append(*h, item)
}
func (h *priorityQueue) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
x.index = -1
return x
}
// We manintain a map from UID to nodeInfo for Djikstras.
type nodeInfo struct {
cost float64
parent uint64
// Pointer to the item in heap. Used to update priority
node *Item
}
func (start *SubGraph) expandOut(ctx context.Context,
adjacencyMap map[uint64]map[uint64]float64, next chan bool, rch chan error) {
var numEdges uint64
var exec []*SubGraph
var err error
var in task.List
it := algo.NewWriteIterator(&in, 0)
it.Append(start.Params.From)
it.End()
start.SrcUIDs = &in
start.uidMatrix = []*task.List{&in}
start.DestUIDs = start.SrcUIDs
for _, child := range start.Children {
child.SrcUIDs = start.DestUIDs
exec = append(exec, child)
}
dummy := &SubGraph{}
for {
over := <-next
if over {
return
}
rrch := make(chan error, len(exec))
for _, sg := range exec {
go ProcessGraph(ctx, sg, dummy, rrch)
}
for _ = range exec {
select {
case err = <-rrch:
if err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while processing child task"))
rch <- err
return
}
case <-ctx.Done():
x.TraceError(ctx, x.Wrapf(ctx.Err(), "Context done before full execution"))
rch <- ctx.Err()
return
}
}
for _, sg := range exec {
// Send the destuids in res chan.
it := algo.NewListIterator(sg.SrcUIDs)
idx := -1
for ; it.Valid(); it.Next() { // idx, fromUID := range sg.SrcUIDs.Uids {
idx++
fromUID := it.Val()
it1 := algo.NewListIterator(sg.uidMatrix[idx])
// ul := sg.uidMatrix[idx].Uids
for ; it1.Valid(); it1.Next() { // _, toUid := range ul {
toUid := it1.Val()
if adjacencyMap[fromUID] == nil {
adjacencyMap[fromUID] = make(map[uint64]float64)
}
adjacencyMap[fromUID][toUid] = 1.0 // cost is 1 for now.
numEdges++
}
}
}
if numEdges > 10000000 {
// If we've seen too many nodes, stop the query.
rch <- ErrTooBig
}
// modify the exec and attach child nodes.
var out []*SubGraph
for _, sg := range exec {
if algo.ListLen(sg.DestUIDs) == 0 {
continue
}
for _, child := range start.Children {
temp := new(SubGraph)
*temp = *child
// Filter out the uids that we have already seen
temp.Children = []*SubGraph{}
temp.SrcUIDs = sg.DestUIDs
// Remove those nodes which we have already traversed. As this cannot be
// in the path again.
algo.ApplyFilter(temp.SrcUIDs, func(uid uint64, i int) bool {
_, ok := adjacencyMap[uid]
return !ok
})
if algo.ListLen(temp.SrcUIDs) == 0 {
continue
}
sg.Children = append(sg.Children, temp)
out = append(out, temp)
}
}
if len(out) == 0 {
rch <- ErrStop
}
rch <- nil
exec = out
}
}
// Djikstras algorithm pseudocode for reference.
//
//
// 1 function Dijkstra(Graph, source):
// 2 dist[source] ← 0 // Initialization
// 3
// 4 create vertex set Q
// 5
// 6 for each vertex v in Graph:
// 7 if v ≠ source
// 8 dist[v] ← INFINITY // Unknown distance from source to v
// 9 prev[v] ← UNDEFINED // Predecessor of v
// 10
// 11 Q.add_with_priority(v, dist[v])
// 12
// 13
// 14 while Q is not empty: // The main loop
// 15 u ← Q.extract_min() // Remove and return best vertex
// 16 for each neighbor v of u: // only v that is still in Q
// 17 alt = dist[u] + length(u, v)
// 18 if alt < dist[v]
// 19 dist[v] ← alt
// 20 prev[v] ← u
// 21 Q.decrease_priority(v, alt)
// 22
// 23 return dist[], prev[]
func ShortestPath(ctx context.Context, sg *SubGraph) error {
var err error
if sg.Params.Alias != "shortest" {
return x.Errorf("Invalid shortest path query")
}
pq := make(priorityQueue, 0)
heap.Init(&pq)
// Initialize and push the source node.
srcNode := &Item{
uid: sg.Params.From,
cost: 0,
hop: 0,
}
heap.Push(&pq, srcNode)
numHops := -1
next := make(chan bool, 2)
expandErr := make(chan error, 2)
adjacencyMap := make(map[uint64]map[uint64]float64)
go sg.expandOut(ctx, adjacencyMap, next, expandErr)
// map to store the min cost and parent of nodes.
dist := make(map[uint64]nodeInfo)
dist[srcNode.uid] = nodeInfo{
parent: 0,
cost: 0,
node: srcNode,
}
var stopExpansion bool
// For now, lets allow a maximum of 10 hops.
for pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
if item.uid == sg.Params.To {
break
}
if item.hop > numHops {
// Explore the next level by calling processGraph and add them
// to the queue.
if !stopExpansion {
next <- false
}
select {
case err = <-expandErr:
if err != nil {
if err == ErrTooBig {
return err
} else if err == ErrStop {
stopExpansion = true
} else {
x.TraceError(ctx, x.Wrapf(err, "Error while processing child task"))
return err
}
}
case <-ctx.Done():
x.TraceError(ctx, x.Wrapf(ctx.Err(), "Context done before full execution"))
return ctx.Err()
}
numHops++
}
if !stopExpansion {
neighbours := adjacencyMap[item.uid]
for toUid, cost := range neighbours {
d, ok := dist[toUid]
if ok && d.cost <= item.cost+cost {
continue
}
if !ok {
// This is the first time we're seeing this node. So
// create a new node and add it to the heap nad map.
node := &Item{
uid: toUid,
cost: item.cost + cost,
hop: item.hop + 1,
}
heap.Push(&pq, node)
dist[toUid] = nodeInfo{
cost: item.cost + cost,
parent: item.uid,
node: node,
}
} else {
// We've already seen this node. So, just update the cost
// and fix the priority in the queue.
node := dist[toUid].node
node.cost = item.cost + cost
node.hop = item.hop + 1
heap.Fix(&pq, node.index)
}
}
}
}
// Go through the distance map to find the path.
var result []uint64
cur := sg.Params.To
for i := 0; cur != sg.Params.From && i < len(dist); i++ {
result = append(result, cur)
cur = dist[cur].parent
}
// Put the path in DestUIDs of the root.
if cur == sg.Params.From {
result = append(result, cur)
l := len(result)
// Reverse the list.
for i := 0; i < l/2; i++ {
result[i], result[l-i-1] = result[l-i-1], result[i]
}
var r task.List
out := algo.NewWriteIterator(&r, 0)
for i := 0; i < len(result); i++ {
out.Append(result[i])
}
out.End()
sg.DestUIDs = &r
} else {
sg.DestUIDs = &task.List{}
}
next <- true
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment