Skip to content
Snippets Groups Projects
query.go 15.4 KiB
Newer Older
  • Learn to ignore specific revisions
  •  * Copyright 2015 DGraph Labs, Inc.
    
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * 		http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package query
    
    
    Manish R Jain's avatar
    Manish R Jain committed
    	"reflect"
    
    	"github.com/dgraph-io/dgraph/posting"
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	"github.com/dgraph-io/dgraph/query/graph"
    
    	"github.com/dgraph-io/dgraph/task"
    
    	"github.com/dgraph-io/dgraph/worker"
    
    	"github.com/dgraph-io/dgraph/x"
    
    /*
     * QUERY:
     * Let's take this query from GraphQL as example:
     * {
     *   me {
     *     id
     *     firstName
     *     lastName
     *     birthday {
     *       month
     *       day
     *     }
     *     friends {
     *       name
     *     }
     *   }
     * }
     *
     * REPRESENTATION:
     * This would be represented in SubGraph format internally, as such:
     * SubGraph [result uid = me]
     *    |
     *  Children
     *    |
     *    --> SubGraph [Attr = "xid"]
     *    --> SubGraph [Attr = "firstName"]
     *    --> SubGraph [Attr = "lastName"]
     *    --> SubGraph [Attr = "birthday"]
     *           |
     *         Children
     *           |
     *           --> SubGraph [Attr = "month"]
     *           --> SubGraph [Attr = "day"]
     *    --> SubGraph [Attr = "friends"]
     *           |
     *         Children
     *           |
     *           --> SubGraph [Attr = "name"]
     *
     * ALGORITHM:
     * This is a rough and simple algorithm of how to process this SubGraph query
     * and populate the results:
     *
     * For a given entity, a new SubGraph can be started off with NewGraph(id).
     * Given a SubGraph, is the Query field empty? [Step a]
     *   - If no, run (or send it to server serving the attribute) query
     *     and populate result.
     * Iterate over children and copy Result Uids to child Query Uids.
     *     Set Attr. Then for each child, use goroutine to run Step:a.
     * Wait for goroutines to finish.
     * Return errors, if any.
     */
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	Start          time.Time     `json:"-"`
    	Parsing        time.Duration `json:"query_parsing"`
    	Processing     time.Duration `json:"processing"`
    	Json           time.Duration `json:"json_conversion"`
    	ProtocolBuffer time.Duration `json:"pb_conversion"`
    
    }
    
    func (l *Latency) ToMap() map[string]string {
    	m := make(map[string]string)
    	j := time.Since(l.Start) - l.Processing - l.Parsing
    	m["parsing"] = l.Parsing.String()
    	m["processing"] = l.Processing.String()
    	m["json"] = j.String()
    	m["total"] = time.Since(l.Start).String()
    	return m
    }
    
    
    // SubGraph is the way to represent data internally. It contains both the
    // query and the response. Once generated, this can then be encoded to other
    // client convenient formats, like GraphQL / JSON.
    
    Manish R Jain's avatar
    Manish R Jain committed
    	Count    int
    	Offset   int
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	Query  []byte
    	Result []byte
    
    func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} {
    
    Manish R Jain's avatar
    Manish R Jain committed
    	switch i1.(type) {
    	case map[string]interface{}:
    		m1 := i1.(map[string]interface{})
    		if m2, ok := i2.(map[string]interface{}); ok {
    			for k1, v1 := range m1 {
    				m2[k1] = v1
    			}
    			return m2
    		}
    		break
    	}
    	glog.Debugf("Got type: %v %v", reflect.TypeOf(i1), reflect.TypeOf(i2))
    	glog.Debugf("Got values: %v %v", i1, i2)
    
    
    	return []interface{}{i1, i2}
    }
    
    func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) {
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	if len(g.Query) == 0 {
    
    		return result, nil
    	}
    
    
    	result = make(map[uint64]interface{})
    	// Get results from all children first.
    	cResult := make(map[uint64]interface{})
    
    	for _, child := range g.Children {
    		m, err := postTraverse(child)
    		if err != nil {
    			x.Err(glog, err).Error("Error while traversal")
    			return result, err
    		}
    		// Merge results from all children, one by one.
    		for k, v := range m {
    			if val, present := cResult[k]; !present {
    				cResult[k] = v
    			} else {
    				cResult[k] = mergeInterfaces(val, v)
    			}
    		}
    	}
    
    	// Now read the query and results at current node.
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	uo := flatbuffers.GetUOffsetT(g.Query)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	q.Init(g.Query, uo)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	ro := flatbuffers.GetUOffsetT(g.Result)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	r.Init(g.Result, ro)
    
    		glog.Fatalf("Result uidmatrixlength: %v. Query uidslength: %v",
    
    			r.UidmatrixLength(), q.UidsLength())
    	}
    	if q.UidsLength() != r.ValuesLength() {
    		glog.Fatalf("Result valuelength: %v. Query uidslength: %v",
    			r.ValuesLength(), q.UidsLength())
    	}
    
    	var ul task.UidList
    	for i := 0; i < r.UidmatrixLength(); i++ {
    		if ok := r.Uidmatrix(&ul, i); !ok {
    			return result, fmt.Errorf("While parsing UidList")
    		}
    		l := make([]interface{}, ul.UidsLength())
    		for j := 0; j < ul.UidsLength(); j++ {
    			uid := ul.Uids(j)
    
    Manish R Jain's avatar
    Manish R Jain committed
    			m := make(map[string]interface{})
    
    			if ival, present := cResult[uid]; !present {
    
    Manish R Jain's avatar
    Manish R Jain committed
    				l[j] = m
    
    Manish R Jain's avatar
    Manish R Jain committed
    				l[j] = mergeInterfaces(m, ival)
    
    Manish R Jain's avatar
    Manish R Jain committed
    			m := make(map[string]interface{})
    			m[g.Attr] = l
    			result[q.Uids(i)] = m
    
    		}
    	}
    
    	var tv task.Value
    	for i := 0; i < r.ValuesLength(); i++ {
    		if ok := r.Values(&tv, i); !ok {
    			return result, fmt.Errorf("While parsing value")
    		}
    		var ival interface{}
    
    Manish R Jain's avatar
    Manish R Jain committed
    		if err := posting.ParseValue(&ival, tv.ValBytes()); err != nil {
    
    Manish R Jain's avatar
    Manish R Jain committed
    		if ival == nil {
    			continue
    		}
    
    
    		if pval, present := result[q.Uids(i)]; present {
    
    Manish R Jain's avatar
    Manish R Jain committed
    			glog.WithField("prev", pval).
    
    Manish R Jain's avatar
    Manish R Jain committed
    				WithField("new", ival).
    				Fatal("Previous value detected.")
    
    		m["_uid_"] = fmt.Sprintf("%#x", q.Uids(i))
    
    Manish R Jain's avatar
    Manish R Jain committed
    		glog.WithFields(logrus.Fields{
    
    Manish R Jain's avatar
    Manish R Jain committed
    		}).Debug("Got value")
    
    func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) {
    
    	r, err := postTraverse(g)
    	if err != nil {
    		x.Err(glog, err).Error("While doing traversal")
    		return js, err
    	}
    
    	l.Json = time.Since(l.Start) - l.Parsing - l.Processing
    
    			var m map[string]interface{}
    			if ival != nil {
    				m = ival.(map[string]interface{})
    			}
    
    			m["server_latency"] = l.ToMap()
    			return json.Marshal(m)
    
    		}
    	} else {
    		glog.Fatal("We don't currently support more than 1 uid at root.")
    	}
    
    
    Manish R Jain's avatar
    Manish R Jain committed
    	glog.Fatal("Shouldn't reach here.")
    
    // This function performs a binary search on the uids slice and returns the
    // index at which it finds the uid, else returns -1
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    func indexOf(uid uint64, q *task.Query) int {
    	low, mid, high := 0, 0, q.UidsLength()-1
    
    	for low <= high {
    		mid = (low + high) / 2
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		if q.Uids(mid) == uid {
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		} else if q.Uids(mid) > uid {
    
    			high = mid - 1
    		} else {
    			low = mid + 1
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    // This method gets the values and children for a subgraph.
    
    func (g *SubGraph) preTraverse(uid uint64, dst *graph.Node) error {
    
    	var properties []*graph.Property
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	var children []*graph.Node
    
    	// We go through all predicate children of the subgraph.
    	for _, pc := range g.Children {
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		ro := flatbuffers.GetUOffsetT(pc.Result)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		r.Init(pc.Result, ro)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		uo := flatbuffers.GetUOffsetT(pc.Query)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		q.Init(pc.Query, uo)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		idx := indexOf(uid, q)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		if idx == -1 {
    			glog.WithFields(logrus.Fields{
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    				"uid":            uid,
    				"attribute":      g.Attr,
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    			}).Fatal("Attribute with uid not found in child Query uids")
    
    			return fmt.Errorf("Attribute with uid not found")
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		var ul task.UidList
    		var tv task.Value
    		if ok := r.Uidmatrix(&ul, idx); !ok {
    
    			return fmt.Errorf("While parsing UidList")
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		if ul.UidsLength() > 0 {
    			// We create as many predicate entity children as the length of uids for
    			// this predicate.
    			for i := 0; i < ul.UidsLength(); i++ {
    				uid := ul.Uids(i)
    
    				uc := new(graph.Node)
    				uc.Attribute = pc.Attr
    				uc.Uid = uid
    				if rerr := pc.preTraverse(uid, uc); rerr != nil {
    
    					x.Err(glog, rerr).Error("Error while traversal")
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    			}
    		} else {
    			v := new(graph.Value)
    			if ok := r.Values(&tv, idx); !ok {
    
    				return fmt.Errorf("While parsing value")
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    			}
    
    			var ival interface{}
    			if err := posting.ParseValue(&ival, tv.ValBytes()); err != nil {
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    
    			if ival == nil {
    				ival = ""
    			}
    
    
    			if pc.Attr == "_xid_" {
    				dst.Xid = v.Str
    			} else {
    				p := &graph.Property{Prop: pc.Attr, Val: v}
    				properties = append(properties, p)
    			}
    
    
    	}
    	dst.Properties, dst.Children = properties, children
    	return nil
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    }
    
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    // This method transforms the predicate based subgraph to an
    // predicate-entity based protocol buffer subgraph.
    func (g *SubGraph) ToProtocolBuffer(l *Latency) (n *graph.Node, rerr error) {
    	n = &graph.Node{}
    	n.Attribute = g.Attr
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	if len(g.Query) == 0 {
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		return n, nil
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	}
    
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	ro := flatbuffers.GetUOffsetT(g.Result)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	r := new(task.Result)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	r.Init(g.Result, ro)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	var ul task.UidList
    	r.Uidmatrix(&ul, 0)
    	n.Uid = ul.Uids(0)
    
    	if rerr = g.preTraverse(n.Uid, n); rerr != nil {
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		x.Err(glog, rerr).Error("Error while traversal")
    		return n, rerr
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    
    	l.ProtocolBuffer = time.Since(l.Start) - l.Parsing - l.Processing
    	return n, nil
    
    func treeCopy(gq *gql.GraphQuery, sg *SubGraph) {
    
    Manish R Jain's avatar
    Manish R Jain committed
    	// Typically you act on the current node, and leave recursion to deal with
    	// children. But, in this case, we don't want to muck with the current
    	// node, because of the way we're dealing with the root node.
    	// So, we work on the children, and then recurse for grand children.
    
    	for _, gchild := range gq.Children {
    		dst := new(SubGraph)
    		dst.Attr = gchild.Attr
    
    Manish R Jain's avatar
    Manish R Jain committed
    		dst.Count = gchild.First
    
    		sg.Children = append(sg.Children, dst)
    		treeCopy(gchild, dst)
    	}
    }
    
    
    func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) {
    	sg, err := newGraph(gq.UID, gq.XID)
    
    	if err != nil {
    		return nil, err
    	}
    	treeCopy(gq, sg)
    	return sg, nil
    }
    
    
    func newGraph(euid uint64, exid string) (*SubGraph, error) {
    
    	// This would set the Result field in SubGraph,
    	// and populate the children for attributes.
    
    		xidToUid := make(map[string]uint64)
    		xidToUid[exid] = 0
    		if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
    			glog.WithError(err).Error("While getting uids over network")
    
    
    		euid = xidToUid[exid]
    		glog.WithField("xid", exid).WithField("uid", euid).Debug("GetOrAssign")
    
    		err := fmt.Errorf("Query internal id is zero")
    
    		x.Err(glog, err).Error("Invalid query")
    
    	// Encode uid into result flatbuffer.
    	b := flatbuffers.NewBuilder(0)
    
    	omatrix := x.UidlistOffset(b, []uint64{euid})
    
    
    	// Also need to add nil value to keep this consistent.
    	var voffset flatbuffers.UOffsetT
    	{
    		bvo := b.CreateByteVector(x.Nilbyte)
    
    	task.ResultStartUidmatrixVector(b, 1)
    	b.PrependUOffsetT(omatrix)
    	mend := b.EndVector(1)
    
    
    	task.ResultStartValuesVector(b, 1)
    	b.PrependUOffsetT(voffset)
    	vend := b.EndVector(1)
    
    
    	rend := task.ResultEnd(b)
    	b.Finish(rend)
    
    	sg := new(SubGraph)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	sg.Result = b.Bytes[b.Head():]
    
    	// Also add query for consistency and to allow for ToJson() later.
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	sg.Query = createTaskQuery(sg, []uint64{euid})
    
    // createTaskQuery generates the query buffer.
    
    Manish R Jain's avatar
    Manish R Jain committed
    func createTaskQuery(sg *SubGraph, sorted []uint64) []byte {
    
    	b := flatbuffers.NewBuilder(0)
    
    Manish R Jain's avatar
    Manish R Jain committed
    	ao := b.CreateString(sg.Attr)
    
    	task.QueryStartUidsVector(b, len(sorted))
    	for i := len(sorted) - 1; i >= 0; i-- {
    		b.PrependUint64(sorted[i])
    
    
    	task.QueryStart(b)
    	task.QueryAddAttr(b, ao)
    	task.QueryAddUids(b, vend)
    
    Manish R Jain's avatar
    Manish R Jain committed
    	task.QueryAddCount(b, int32(sg.Count))
    
    
    	qend := task.QueryEnd(b)
    	b.Finish(qend)
    	return b.Bytes[b.Head():]
    }
    
    
    type ListChannel struct {
    	TList *task.UidList
    	Idx   int
    }
    
    func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
    	// Let's serialize the matrix of uids in result to a
    	// sorted unique list of uids.
    	h := &x.Uint64Heap{}
    	heap.Init(h)
    
    	channels := make([]*ListChannel, r.UidmatrixLength())
    	for i := 0; i < r.UidmatrixLength(); i++ {
    		tlist := new(task.UidList)
    		if ok := r.Uidmatrix(tlist, i); !ok {
    			return sorted, fmt.Errorf("While parsing Uidmatrix")
    		}
    		if tlist.UidsLength() > 0 {
    			e := x.Elem{
    				Uid: tlist.Uids(0),
    				Idx: i,
    			}
    			heap.Push(h, e)
    		}
    		channels[i] = &ListChannel{TList: tlist, Idx: 1}
    	}
    
    	// The resulting list of uids will be stored here.
    	sorted = make([]uint64, 100)
    	sorted = sorted[:0]
    
    	var last uint64
    	last = 0
    	// Itearate over the heap.
    	for h.Len() > 0 {
    		me := (*h)[0] // Peek at the top element in heap.
    		if me.Uid != last {
    			sorted = append(sorted, me.Uid) // Add if unique.
    			last = me.Uid
    		}
    		lc := channels[me.Idx]
    		if lc.Idx >= lc.TList.UidsLength() {
    			heap.Pop(h)
    
    		} else {
    			uid := lc.TList.Uids(lc.Idx)
    			lc.Idx += 1
    
    			me.Uid = uid
    			(*h)[0] = me
    			heap.Fix(h, 0) // Faster than Pop() followed by Push().
    		}
    	}
    	return sorted, nil
    }
    
    
    func ProcessGraph(sg *SubGraph, rch chan error, td time.Duration) {
    	timeout := time.Now().Add(td)
    
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	if len(sg.Query) > 0 && sg.Attr != "_root_" {
    		sg.Result, err = worker.ProcessTaskOverNetwork(sg.Query)
    
    			x.Err(glog, err).Error("While processing task.")
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	uo := flatbuffers.GetUOffsetT(sg.Result)
    
    	r := new(task.Result)
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	r.Init(sg.Result, uo)
    
    	if r.ValuesLength() > 0 {
    		var v task.Value
    		if r.Values(&v, 0) {
    			glog.WithField("attr", sg.Attr).WithField("val", string(v.ValBytes())).
    				Info("Sample value")
    		}
    	}
    
    
    	sorted, err := sortedUniqueUids(r)
    	if err != nil {
    		x.Err(glog, err).Error("While processing task.")
    		rch <- err
    		return
    	}
    
    	if len(sorted) == 0 {
    
    		// Looks like we're done here.
    		if len(sg.Children) > 0 {
    
    			glog.Debugf("Have some children but no results. Life got cut short early."+
    				"Current attribute: %q", sg.Attr)
    		} else {
    			glog.Debugf("No more things to process for Attr: %v", sg.Attr)
    
    	timeleft := timeout.Sub(time.Now())
    	if timeleft < 0 {
    		glog.WithField("attr", sg.Attr).Error("Query timeout before children")
    		rch <- fmt.Errorf("Query timeout before children")
    		return
    	}
    
    
    	// Let's execute it in a tree fashion. Each SubGraph would break off
    	// as many goroutines as it's children; which would then recursively
    	// do the same thing.
    	// Buffered channel to ensure no-blockage.
    	childchan := make(chan error, len(sg.Children))
    	for i := 0; i < len(sg.Children); i++ {
    		child := sg.Children[i]
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    		child.Query = createTaskQuery(child, sorted)
    
    		go ProcessGraph(child, childchan, timeleft)
    
    	tchan := time.After(timeleft)
    
    	// Now get all the results back.
    	for i := 0; i < len(sg.Children); i++ {
    
    		select {
    		case err = <-childchan:
    			glog.WithFields(logrus.Fields{
    				"num_children": len(sg.Children),
    				"index":        i,
    				"attr":         sg.Children[i].Attr,
    				"err":          err,
    			}).Debug("Reply from child")
    			if err != nil {
    				x.Err(glog, err).Error("While processing child task.")
    				rch <- err
    				return
    			}
    		case <-tchan:
    			glog.WithField("attr", sg.Attr).Error("Query timeout after children")
    			rch <- fmt.Errorf("Query timeout after children")