diff --git a/posting/list.go b/posting/list.go index c4e1830ed996281a3018985d12d0184aaadf3e15..a1a658af0471ea30649efd55bb59f68eec2b8643 100644 --- a/posting/list.go +++ b/posting/list.go @@ -19,7 +19,7 @@ package posting import ( "bytes" "encoding/binary" - "encoding/gob" + "encoding/json" "errors" "fmt" "math" @@ -84,13 +84,12 @@ func addEdgeToPosting(b *flatbuffers.Builder, if t.ValueId != math.MaxUint64 { log.Fatal("This should have already been set by the caller.") } - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(t.Value); err != nil { - x.Err(log, err).Fatal("Unable to encode interface") + bytes, err := json.Marshal(t.Value) + if err != nil { + x.Err(log, err).Fatal("Unable to marshal value") return 0 } - bo = b.CreateByteVector(buf.Bytes()) + bo = b.CreateByteVector(bytes) } so := b.CreateString(t.Source) // Do this before posting start. @@ -149,14 +148,17 @@ func init() { len(empty), len(emptyPosting)) } -func ParseValue(i interface{}, value []byte) error { +func ParseValue(i *interface{}, value []byte) error { if len(value) == 0 { return errors.New("No value found in posting") } - var buf bytes.Buffer - buf.Write(value) - dec := gob.NewDecoder(&buf) - return dec.Decode(i) + + if len(value) == 1 && value[0] == 0x00 { + i = nil + return nil + } + + return json.Unmarshal(value, i) } func (l *List) init(key []byte, pstore, mstore *store.Store) { diff --git a/query/query.go b/query/query.go index ce413602b905d537613014cc3988afc108c06259..5ba0576bf0c8ae2208af6b9696f4b0e959418d65 100644 --- a/query/query.go +++ b/query/query.go @@ -20,6 +20,7 @@ import ( "container/heap" "encoding/json" "fmt" + "reflect" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/posting" @@ -96,6 +97,21 @@ type SubGraph struct { } func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} { + switch i1.(type) { + case map[string]interface{}: + glog.Debug("Got map[string] interface") + m1 := i1.(map[string]interface{}) + if m2, ok := i2.(map[string]interface{}); ok { + for k1, v1 := range m1 { + m2[k1] = v1 + } + return m2 + } + break + } + glog.Debugf("Got type: %v %v", reflect.TypeOf(i1), reflect.TypeOf(i2)) + glog.Debugf("Got values: %v %v", i1, i2) + return []interface{}{i1, i2} } @@ -138,7 +154,6 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { r.ValuesLength(), q.UidsLength()) } - var empty map[string]bool var ul task.UidList for i := 0; i < r.UidmatrixLength(); i++ { if ok := r.Uidmatrix(&ul, i); !ok { @@ -147,14 +162,18 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { l := make([]interface{}, ul.UidsLength()) for j := 0; j < ul.UidsLength(); j++ { uid := ul.Uids(j) + m := make(map[string]interface{}) + m["uid"] = uid if ival, present := cResult[uid]; !present { - l[j] = empty + l[j] = m } else { - l[j] = ival + l[j] = mergeInterfaces(m, ival) } } if len(l) > 0 { - result[q.Uids(i)] = l + m := make(map[string]interface{}) + m[g.Attr] = l + result[q.Uids(i)] = m } } @@ -163,18 +182,26 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { if ok := r.Values(&tv, i); !ok { return result, fmt.Errorf("While parsing value") } - if tv.ValLength() == 1 && tv.ValBytes()[0] == 0x00 { - continue - } var ival interface{} - if err := posting.ParseValue(ival, tv.ValBytes()); err != nil { + if err := posting.ParseValue(&ival, tv.ValBytes()); err != nil { return result, err } + if ival == nil { + continue + } + if pval, present := result[q.Uids(i)]; present { - glog.WithField("prev", pval).Fatal("Previous value detected.") + glog.WithField("prev", pval). + WithField("uid", q.Uids(i)). + WithField("new", ival). + Fatal("Previous value detected.") } m := make(map[string]interface{}) m["uid"] = q.Uids(i) + glog.WithFields(logrus.Fields{ + "uid": q.Uids(i), + "val": ival, + }).Debug("Got value") m[g.Attr] = ival result[q.Uids(i)] = m } @@ -195,6 +222,7 @@ func (g *SubGraph) ToJson() (js []byte, rerr error) { glog.Fatal("We don't currently support more than 1 uid at root.") } + glog.Fatal("Shouldn't reach here.") return json.Marshal(r) } @@ -458,6 +486,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) { "num_children": len(sg.Children), "index": i, "attr": sg.Children[i].Attr, + "err": err, }).Debug("Reply from child") if err != nil { x.Err(glog, err).Error("While processing child task.") diff --git a/query/query_test.go b/query/query_test.go index 02535158b81a47e912a809320ef79cd55edb9776..5e60b9e7ef9e7848fec9b5873fb1626306aa60f0 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -138,10 +138,11 @@ func checkName(t *testing.T, r *task.Result, idx int, expected string) { if ok := r.Values(&tv, idx); !ok { t.Error("Unable to retrieve value") } - var name string - if err := posting.ParseValue(&name, tv.ValBytes()); err != nil { + var iname interface{} + if err := posting.ParseValue(&iname, tv.ValBytes()); err != nil { t.Error(err) } + name := iname.(string) if name != expected { t.Errorf("Expected: %v. Got: %v", expected, name) } @@ -198,7 +199,7 @@ func TestNewGraph(t *testing.T) { } } -func TestProcessGraph(t *testing.T) { +func populateGraph(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) pdir := NewStore(t) @@ -256,6 +257,13 @@ func TestProcessGraph(t *testing.T) { edge.Value = "Andrea" addEdge(t, edge, posting.Get(posting.Key(31, "name"))) + edge.Value = "The Russian Mafia" + addEdge(t, edge, posting.Get(posting.Key(101, "name"))) +} + +func TestProcessGraph(t *testing.T) { + populateGraph(t) + // Alright. Now we have everything set up. Let's create the query. sg, err := NewGraph(1, "") if err != nil { @@ -344,6 +352,41 @@ func TestProcessGraph(t *testing.T) { checkSingleValue(t, sg.Children[1], "name", "Michonne") checkSingleValue(t, sg.Children[2], "gender", "female") checkSingleValue(t, sg.Children[3], "status", "alive") +} + +func TestToJson(t *testing.T) { + populateGraph(t) + + // Alright. Now we have everything set up. Let's create the query. + sg, err := NewGraph(1, "") + if err != nil { + t.Error(err) + } + + // Retireve profile information for uid:1. + csg := new(SubGraph) + csg.Attr = "name" + sg.Children = append(sg.Children, csg) + csg = new(SubGraph) + csg.Attr = "gender" + sg.Children = append(sg.Children, csg) + csg = new(SubGraph) + csg.Attr = "status" + sg.Children = append(sg.Children, csg) + + gsg := new(SubGraph) + gsg.Attr = "name" + csg = new(SubGraph) + csg.Attr = "friend" + csg.Children = append(csg.Children, gsg) + sg.Children = append(sg.Children, csg) + + ch := make(chan error) + go ProcessGraph(sg, ch) + err = <-ch + if err != nil { + t.Error(err) + } js, err := sg.ToJson() if err != nil { diff --git a/uid/assigner.go b/uid/assigner.go index 38f91cc61d050a1368a812714e07c00457f8d921..277f20ccfad33c53510a912bf8de14c8412b6010 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -22,10 +22,10 @@ import ( "math" "time" - "github.com/dgryski/go-farm" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgryski/go-farm" ) var log = x.Log("uid") @@ -49,9 +49,9 @@ func allocateNew(xid string) (uid uint64, rerr error) { var p types.Posting pl.Get(&p, 0) - var tmp string + var tmp interface{} posting.ParseValue(&tmp, p.ValueBytes()) - log.Debug("Found existing xid: [%q]. Continuing...", tmp) + log.Debug("Found existing xid: [%q]. Continuing...", tmp.(string)) continue } @@ -136,6 +136,8 @@ func ExternalId(uid uint64) (xid string, rerr error) { if p.Uid() != math.MaxUint64 { log.WithField("uid", uid).Fatal("Value uid must be MaxUint64.") } - rerr = posting.ParseValue(&xid, p.ValueBytes()) + var t interface{} + rerr = posting.ParseValue(&t, p.ValueBytes()) + xid = t.(string) return xid, rerr }