From eea226155109189b971c64995144632cae7f1337 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Wed, 21 Oct 2015 17:49:09 +1100
Subject: [PATCH] Initial pass at query. Singleton for posting lists.

---
 posting/list.go      |  20 +++++--
 posting/list_test.go |   6 +--
 posting/lists.go     |  62 +++++++++++++++++++++
 query/query.go       | 108 ++++++++++++++++++++++++++++++++++---
 query/query_test.go  | 125 +++++++++++++++++++++++++++++++++++++++++++
 query/result.fbs     |   7 +++
 query/result/Uids.go |  45 ++++++++++++++++
 store/store.go       |  13 -----
 uid/assigner.go      |  49 +++++++----------
 uid/assigner_test.go |  15 +++---
 10 files changed, 385 insertions(+), 65 deletions(-)
 create mode 100644 posting/lists.go
 create mode 100644 query/query_test.go
 create mode 100644 query/result.fbs
 create mode 100644 query/result/Uids.go

diff --git a/posting/list.go b/posting/list.go
index cf87f280..c2a140a5 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -18,6 +18,7 @@ package posting
 
 import (
 	"bytes"
+	"encoding/binary"
 	"encoding/gob"
 	"errors"
 	"math"
@@ -64,6 +65,16 @@ func (pa ByUid) Len() int           { return len(pa) }
 func (pa ByUid) Swap(i, j int)      { pa[i], pa[j] = pa[j], pa[i] }
 func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }
 
+// key = (entity id, attribute)
+func Key(eid uint64, attr string) []byte {
+	buf := new(bytes.Buffer)
+	buf.WriteString(attr)
+	if err := binary.Write(buf, binary.LittleEndian, eid); err != nil {
+		log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid)
+	}
+	return buf.Bytes()
+}
+
 func addTripleToPosting(b *flatbuffers.Builder,
 	t x.Triple, op byte) flatbuffers.UOffsetT {
 
@@ -94,7 +105,6 @@ func addTripleToPosting(b *flatbuffers.Builder,
 }
 
 func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
-
 	so := b.CreateByteString(p.Source()) // Do this before posting start.
 	var bo flatbuffers.UOffsetT
 	if p.ValueLength() > 0 {
@@ -138,17 +148,17 @@ func init() {
 		len(empty), len(emptyPosting))
 }
 
-func ParseValue(i interface{}, p types.Posting) error {
-	if p.ValueLength() == 0 {
+func ParseValue(i interface{}, value []byte) error {
+	if len(value) == 0 {
 		return errors.New("No value found in posting")
 	}
 	var buf bytes.Buffer
-	buf.Write(p.ValueBytes())
+	buf.Write(value)
 	dec := gob.NewDecoder(&buf)
 	return dec.Decode(i)
 }
 
-func (l *List) Init(key []byte, pstore, mstore *store.Store) {
+func (l *List) init(key []byte, pstore, mstore *store.Store) {
 	l.mutex.Lock()
 	defer l.mutex.Unlock()
 
diff --git a/posting/list_test.go b/posting/list_test.go
index 5c848193..e2dc1c5a 100644
--- a/posting/list_test.go
+++ b/posting/list_test.go
@@ -68,7 +68,7 @@ func TestAddMutation(t *testing.T) {
 	ms := new(store.Store)
 	ms.Init(mdir)
 
-	l.Init(key, ps, ms)
+	l.init(key, ps, ms)
 
 	triple := x.Triple{
 		ValueId:   9,
@@ -179,7 +179,7 @@ func TestAddMutation(t *testing.T) {
 	*/
 	// Try reading the same data in another PostingList.
 	var dl List
-	dl.Init(key, ps, ms)
+	dl.init(key, ps, ms)
 	if err := checkUids(t, dl, uids...); err != nil {
 		t.Error(err)
 	}
@@ -205,7 +205,7 @@ func TestAddMutation_Value(t *testing.T) {
 	ms := new(store.Store)
 	ms.Init(mdir)
 
-	ol.Init(key, ps, ms)
+	ol.init(key, ps, ms)
 
 	triple := x.Triple{
 		Value:     "oh hey there",
diff --git a/posting/lists.go b/posting/lists.go
new file mode 100644
index 00000000..fa68a310
--- /dev/null
+++ b/posting/lists.go
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * 		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package posting
+
+import (
+	"sync"
+
+	"github.com/dgryski/go-farm"
+	"github.com/manishrjain/dgraph/store"
+)
+
+var lmutex sync.RWMutex
+var lcache map[uint64]*List
+var pstore *store.Store
+var mstore *store.Store
+
+func Init(posting *store.Store, mutation *store.Store) {
+	lmutex.Lock()
+	defer lmutex.Unlock()
+
+	lcache = make(map[uint64]*List)
+	pstore = posting
+	mstore = mutation
+}
+
+func Get(key []byte) *List {
+	// Acquire read lock and check if list is available.
+	lmutex.RLock()
+	uid := farm.Fingerprint64(key)
+	if list, ok := lcache[uid]; ok {
+		lmutex.RUnlock()
+		return list
+	}
+	lmutex.RUnlock()
+
+	// Couldn't find it. Acquire write lock.
+	lmutex.Lock()
+	defer lmutex.Unlock()
+	// Check again after acquiring write lock.
+	if list, ok := lcache[uid]; ok {
+		return list
+	}
+
+	list := new(List)
+	list.init(key, pstore, mstore)
+	lcache[uid] = list
+	return list
+}
diff --git a/query/query.go b/query/query.go
index 5461a859..12ef6127 100644
--- a/query/query.go
+++ b/query/query.go
@@ -16,13 +16,109 @@
 
 package query
 
-type QAttribute struct {
-	Attr  string
-	Query *Query
+import (
+	"fmt"
+	"math"
+
+	"github.com/google/flatbuffers/go"
+	"github.com/manishrjain/dgraph/posting"
+	"github.com/manishrjain/dgraph/posting/types"
+	"github.com/manishrjain/dgraph/query/result"
+	"github.com/manishrjain/dgraph/uid"
+	"github.com/manishrjain/dgraph/x"
+)
+
+var log = x.Log("query")
+
+type Mattr struct {
+	Attr string
+	Msg  *Message
+
+	ResultUids  []byte // Flatbuffer result.Uids
+	ResultValue []byte // gob.Encode
 }
 
-type Query struct {
+type Message struct {
 	Id    uint64 // Dgraph Id
-	Eid   string // External Id
-	Attrs []QAttribute
+	Xid   string // External Id
+	Attrs []Mattr
+}
+
+type Node struct {
+	Id  uint64
+	Xid string
+}
+
+func extract(l *posting.List, uids *[]byte, value *[]byte) error {
+	b := flatbuffers.NewBuilder(0)
+	var p types.Posting
+
+	llen := l.Length()
+	if ok := l.Get(&p, l.Length()-1); ok {
+		if p.Uid() == math.MaxUint64 {
+			// Contains a value posting, not useful for Uids vector.
+			llen -= 1
+		}
+	}
+
+	result.UidsStartUidVector(b, llen)
+	for i := l.Length() - 1; i >= 0; i-- {
+		if ok := l.Get(&p, i); !ok {
+			return fmt.Errorf("While retrieving posting")
+		}
+		if p.Uid() == math.MaxUint64 {
+			*value = make([]byte, p.ValueLength())
+			copy(*value, p.ValueBytes())
+
+		} else {
+			b.PrependUint64(p.Uid())
+		}
+	}
+	vend := b.EndVector(llen)
+
+	result.UidsStart(b)
+	result.UidsAddUid(b, vend)
+	end := result.UidsEnd(b)
+	b.Finish(end)
+
+	buf := b.Bytes[b.Head():]
+	*uids = make([]byte, len(buf))
+	copy(*uids, buf)
+	return nil
+}
+
+func Run(m *Message) error {
+	if len(m.Xid) > 0 {
+		u, err := uid.GetOrAssign(m.Xid)
+		if err != nil {
+			x.Err(log, err).WithField("xid", m.Xid).Error(
+				"While GetOrAssign uid from external id")
+			return err
+		}
+		log.WithField("xid", m.Xid).WithField("uid", u).Debug("GetOrAssign")
+		m.Id = u
+	}
+
+	if m.Id == 0 {
+		err := fmt.Errorf("Query internal id is zero")
+		x.Err(log, err).Error("Invalid query")
+		return err
+	}
+
+	for idx := range m.Attrs {
+		mattr := &m.Attrs[idx]
+		key := posting.Key(m.Id, mattr.Attr)
+		pl := posting.Get(key)
+
+		if err := extract(pl, &mattr.ResultUids, &mattr.ResultValue); err != nil {
+			x.Err(log, err).WithField("uid", m.Id).WithField("attr", mattr.Attr).
+				Error("While extracting data from posting list")
+		}
+		if mattr.Msg != nil {
+			if err := Run(mattr.Msg); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
 }
diff --git a/query/query_test.go b/query/query_test.go
new file mode 100644
index 00000000..7e9b6ffe
--- /dev/null
+++ b/query/query_test.go
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2015 Manish R Jain <manishrjain@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * 		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package query
+
+import (
+	"io/ioutil"
+	"math"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/manishrjain/dgraph/posting"
+	"github.com/manishrjain/dgraph/query/result"
+	"github.com/manishrjain/dgraph/store"
+	"github.com/manishrjain/dgraph/x"
+)
+
+func setErr(err *error, nerr error) {
+	if err != nil {
+		return
+	}
+	*err = nerr
+}
+
+func populateList(key []byte) error {
+	pl := posting.Get(key)
+
+	t := x.Triple{
+		ValueId:   9,
+		Source:    "query_test",
+		Timestamp: time.Now(),
+	}
+	var err error
+	setErr(&err, pl.AddMutation(t, posting.Set))
+
+	t.ValueId = 19
+	setErr(&err, pl.AddMutation(t, posting.Set))
+
+	t.ValueId = 29
+	setErr(&err, pl.AddMutation(t, posting.Set))
+
+	t.Value = "abracadabra"
+	setErr(&err, pl.AddMutation(t, posting.Set))
+
+	return err
+}
+
+func NewStore(t *testing.T) string {
+	path, err := ioutil.TempDir("", "storetest_")
+	if err != nil {
+		t.Error(err)
+		t.Fail()
+		return ""
+	}
+	return path
+}
+
+func TestRun(t *testing.T) {
+	logrus.SetLevel(logrus.DebugLevel)
+
+	pdir := NewStore(t)
+	defer os.RemoveAll(pdir)
+	ps := new(store.Store)
+	ps.Init(pdir)
+
+	mdir := NewStore(t)
+	defer os.RemoveAll(mdir)
+	ms := new(store.Store)
+	ms.Init(mdir)
+	posting.Init(ps, ms)
+
+	key := posting.Key(11, "testing")
+	if err := populateList(key); err != nil {
+		t.Error(err)
+	}
+	key = posting.Key(9, "name")
+
+	m := Message{Id: 11}
+	ma := Mattr{Attr: "testing"}
+	m.Attrs = append(m.Attrs, ma)
+
+	if err := Run(&m); err != nil {
+		t.Error(err)
+	}
+	ma = m.Attrs[0]
+	uids := result.GetRootAsUids(ma.ResultUids, 0)
+	if uids.UidLength() != 3 {
+		t.Errorf("Expected 3. Got: %v", uids.UidLength())
+	}
+	var v uint64
+	v = 9
+	for i := 0; i < uids.UidLength(); i++ {
+		if uids.Uid(i) == math.MaxUint64 {
+			t.Error("Value posting encountered at index:", i)
+		}
+		if v != uids.Uid(i) {
+			t.Errorf("Expected: %v. Got: %v", v, uids.Uid(i))
+		}
+		v += 10
+	}
+	log.Debugf("ResultUid buffer size: %v", len(ma.ResultUids))
+
+	var val string
+	if err := posting.ParseValue(&val, ma.ResultValue); err != nil {
+		t.Error(err)
+	}
+	if val != "abracadabra" {
+		t.Errorf("Expected abracadabra. Got: [%q]", val)
+	}
+}
diff --git a/query/result.fbs b/query/result.fbs
new file mode 100644
index 00000000..8517d3fb
--- /dev/null
+++ b/query/result.fbs
@@ -0,0 +1,7 @@
+namespace result;
+
+table Uids {
+	uid:[ulong];
+}
+
+root_type Uids;
diff --git a/query/result/Uids.go b/query/result/Uids.go
new file mode 100644
index 00000000..0e89f6ad
--- /dev/null
+++ b/query/result/Uids.go
@@ -0,0 +1,45 @@
+// automatically generated, do not modify
+
+package result
+
+import (
+	flatbuffers "github.com/google/flatbuffers/go"
+)
+type Uids struct {
+	_tab flatbuffers.Table
+}
+
+func GetRootAsUids(buf []byte, offset flatbuffers.UOffsetT) *Uids {
+	n := flatbuffers.GetUOffsetT(buf[offset:])
+	x := &Uids{}
+	x.Init(buf, n + offset)
+	return x
+}
+
+func (rcv *Uids) Init(buf []byte, i flatbuffers.UOffsetT) {
+	rcv._tab.Bytes = buf
+	rcv._tab.Pos = i
+}
+
+func (rcv *Uids) Uid(j int) uint64 {
+	o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+	if o != 0 {
+		a := rcv._tab.Vector(o)
+		return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8))
+	}
+	return 0
+}
+
+func (rcv *Uids) UidLength() int {
+	o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+	if o != 0 {
+		return rcv._tab.VectorLen(o)
+	}
+	return 0
+}
+
+func UidsStart(builder *flatbuffers.Builder) { builder.StartObject(1) }
+func UidsAddUid(builder *flatbuffers.Builder, uid flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uid), 0) }
+func UidsStartUidVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8)
+}
+func UidsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }
diff --git a/store/store.go b/store/store.go
index 51090b31..33628bce 100644
--- a/store/store.go
+++ b/store/store.go
@@ -17,9 +17,6 @@
 package store
 
 import (
-	"bytes"
-	"encoding/binary"
-
 	"github.com/manishrjain/gocrud/x"
 	"github.com/syndtr/goleveldb/leveldb"
 	"github.com/syndtr/goleveldb/leveldb/opt"
@@ -46,16 +43,6 @@ func (s *Store) IsNew(id uint64) bool {
 	return false
 }
 
-// key = (attribute, entity id)
-func Key(attr string, eid uint64) []byte {
-	buf := new(bytes.Buffer)
-	buf.WriteString(attr)
-	if err := binary.Write(buf, binary.LittleEndian, eid); err != nil {
-		log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid)
-	}
-	return buf.Bytes()
-}
-
 func (s *Store) Get(k []byte) (val []byte, rerr error) {
 	return s.db.Get(k, nil)
 }
diff --git a/uid/assigner.go b/uid/assigner.go
index ad175e37..9c26695d 100644
--- a/uid/assigner.go
+++ b/uid/assigner.go
@@ -25,34 +25,24 @@ import (
 	"github.com/dgryski/go-farm"
 	"github.com/manishrjain/dgraph/posting"
 	"github.com/manishrjain/dgraph/posting/types"
-	"github.com/manishrjain/dgraph/store"
 	"github.com/manishrjain/dgraph/x"
 )
 
 var log = x.Log("uid")
 
-type Assigner struct {
-	mstore *store.Store
-	pstore *store.Store
-}
-
-func (a *Assigner) Init(pstore, mstore *store.Store) {
-	a.mstore = mstore
-	a.pstore = pstore
-}
-
-func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) {
+func allocateNew(xid string) (uid uint64, rerr error) {
 	for sp := ""; ; sp += " " {
 		txid := xid + sp
 		uid = farm.Fingerprint64([]byte(txid)) // Generate from hash.
-		log.Debugf("txid: [%q] uid: [%x]", txid, uid)
+		log.WithField("txid", txid).WithField("uid", uid).Debug("Generated")
+		if uid == math.MaxUint64 {
+			log.Debug("Hit uint64max while generating fingerprint. Ignoring...")
+			continue
+		}
 
 		// Check if this uid has already been allocated.
-		// TODO: Posting List shouldn't be created here.
-		// Possibly, use some singular class to serve all the posting lists.
-		pl := new(posting.List)
-		key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid
-		pl.Init(key, a.pstore, a.mstore)
+		key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
+		pl := posting.Get(key)
 
 		if pl.Length() > 0 {
 			// Something already present here.
@@ -60,7 +50,7 @@ func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) {
 			pl.Get(&p, 0)
 
 			var tmp string
-			posting.ParseValue(&tmp, p)
+			posting.ParseValue(&tmp, p.ValueBytes())
 			log.Debug("Found existing xid: [%q]. Continuing...", tmp)
 			continue
 		}
@@ -84,7 +74,7 @@ func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) {
 		" Wake the stupid developer up.")
 }
 
-func Key(xid string) []byte {
+func stringKey(xid string) []byte {
 	buf := new(bytes.Buffer)
 	buf.WriteString("_uid_")
 	buf.WriteString("|")
@@ -92,13 +82,13 @@ func Key(xid string) []byte {
 	return buf.Bytes()
 }
 
-func (a *Assigner) GetOrAssign(xid string) (uid uint64, rerr error) {
-	key := Key(xid)
-	pl := new(posting.List)
-	pl.Init(key, a.pstore, a.mstore)
+// TODO: Currently one posting list is modified after another, without
+func GetOrAssign(xid string) (uid uint64, rerr error) {
+	key := stringKey(xid)
+	pl := posting.Get(key)
 	if pl.Length() == 0 {
 		// No current id exists. Create one.
-		uid, err := a.allocateNew(xid)
+		uid, err := allocateNew(xid)
 		if err != nil {
 			return 0, err
 		}
@@ -125,10 +115,9 @@ func (a *Assigner) GetOrAssign(xid string) (uid uint64, rerr error) {
 		" Wake the stupid developer up.")
 }
 
-func (a *Assigner) ExternalId(uid uint64) (xid string, rerr error) {
-	pl := new(posting.List)
-	key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid
-	pl.Init(key, a.pstore, a.mstore)
+func ExternalId(uid uint64) (xid string, rerr error) {
+	key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
+	pl := posting.Get(key)
 	if pl.Length() == 0 {
 		return "", errors.New("NO external id")
 	}
@@ -147,6 +136,6 @@ func (a *Assigner) ExternalId(uid uint64) (xid string, rerr error) {
 	if p.Uid() != math.MaxUint64 {
 		log.WithField("uid", uid).Fatal("Value uid must be MaxUint64.")
 	}
-	rerr = posting.ParseValue(&xid, p)
+	rerr = posting.ParseValue(&xid, p.ValueBytes())
 	return xid, rerr
 }
diff --git a/uid/assigner_test.go b/uid/assigner_test.go
index 5d9bd939..715a0217 100644
--- a/uid/assigner_test.go
+++ b/uid/assigner_test.go
@@ -22,6 +22,7 @@ import (
 	"testing"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/manishrjain/dgraph/posting"
 	"github.com/manishrjain/dgraph/store"
 )
 
@@ -47,13 +48,11 @@ func TestGetOrAssign(t *testing.T) {
 	defer os.RemoveAll(mdir)
 	ms := new(store.Store)
 	ms.Init(mdir)
-
-	var a Assigner
-	a.Init(ps, ms)
+	posting.Init(ps, ms)
 
 	var u1, u2 uint64
 	{
-		uid, err := a.GetOrAssign("externalid0")
+		uid, err := GetOrAssign("externalid0")
 		if err != nil {
 			t.Error(err)
 		}
@@ -62,7 +61,7 @@ func TestGetOrAssign(t *testing.T) {
 	}
 
 	{
-		uid, err := a.GetOrAssign("externalid1")
+		uid, err := GetOrAssign("externalid1")
 		if err != nil {
 			t.Error(err)
 		}
@@ -76,7 +75,7 @@ func TestGetOrAssign(t *testing.T) {
 	// return
 
 	{
-		uid, err := a.GetOrAssign("externalid0")
+		uid, err := GetOrAssign("externalid0")
 		if err != nil {
 			t.Error(err)
 		}
@@ -88,7 +87,7 @@ func TestGetOrAssign(t *testing.T) {
 	// return
 
 	{
-		xid, err := a.ExternalId(u1)
+		xid, err := ExternalId(u1)
 		if err != nil {
 			t.Error(err)
 		}
@@ -98,7 +97,7 @@ func TestGetOrAssign(t *testing.T) {
 	}
 	return
 	{
-		xid, err := a.ExternalId(u2)
+		xid, err := ExternalId(u2)
 		if err != nil {
 			t.Error(err)
 		}
-- 
GitLab