Skip to content
Snippets Groups Projects
Commit eea22615 authored by Manish R Jain's avatar Manish R Jain
Browse files

Initial pass at query. Singleton for posting lists.

parent 64bfe350
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ package posting
import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"math"
......@@ -64,6 +65,16 @@ func (pa ByUid) Len() int { return len(pa) }
func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] }
func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }
// key = (entity id, attribute)
func Key(eid uint64, attr string) []byte {
buf := new(bytes.Buffer)
buf.WriteString(attr)
if err := binary.Write(buf, binary.LittleEndian, eid); err != nil {
log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid)
}
return buf.Bytes()
}
func addTripleToPosting(b *flatbuffers.Builder,
t x.Triple, op byte) flatbuffers.UOffsetT {
......@@ -94,7 +105,6 @@ func addTripleToPosting(b *flatbuffers.Builder,
}
func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
so := b.CreateByteString(p.Source()) // Do this before posting start.
var bo flatbuffers.UOffsetT
if p.ValueLength() > 0 {
......@@ -138,17 +148,17 @@ func init() {
len(empty), len(emptyPosting))
}
func ParseValue(i interface{}, p types.Posting) error {
if p.ValueLength() == 0 {
func ParseValue(i interface{}, value []byte) error {
if len(value) == 0 {
return errors.New("No value found in posting")
}
var buf bytes.Buffer
buf.Write(p.ValueBytes())
buf.Write(value)
dec := gob.NewDecoder(&buf)
return dec.Decode(i)
}
func (l *List) Init(key []byte, pstore, mstore *store.Store) {
func (l *List) init(key []byte, pstore, mstore *store.Store) {
l.mutex.Lock()
defer l.mutex.Unlock()
......
......@@ -68,7 +68,7 @@ func TestAddMutation(t *testing.T) {
ms := new(store.Store)
ms.Init(mdir)
l.Init(key, ps, ms)
l.init(key, ps, ms)
triple := x.Triple{
ValueId: 9,
......@@ -179,7 +179,7 @@ func TestAddMutation(t *testing.T) {
*/
// Try reading the same data in another PostingList.
var dl List
dl.Init(key, ps, ms)
dl.init(key, ps, ms)
if err := checkUids(t, dl, uids...); err != nil {
t.Error(err)
}
......@@ -205,7 +205,7 @@ func TestAddMutation_Value(t *testing.T) {
ms := new(store.Store)
ms.Init(mdir)
ol.Init(key, ps, ms)
ol.init(key, ps, ms)
triple := x.Triple{
Value: "oh hey there",
......
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
"sync"
"github.com/dgryski/go-farm"
"github.com/manishrjain/dgraph/store"
)
var lmutex sync.RWMutex
var lcache map[uint64]*List
var pstore *store.Store
var mstore *store.Store
func Init(posting *store.Store, mutation *store.Store) {
lmutex.Lock()
defer lmutex.Unlock()
lcache = make(map[uint64]*List)
pstore = posting
mstore = mutation
}
func Get(key []byte) *List {
// Acquire read lock and check if list is available.
lmutex.RLock()
uid := farm.Fingerprint64(key)
if list, ok := lcache[uid]; ok {
lmutex.RUnlock()
return list
}
lmutex.RUnlock()
// Couldn't find it. Acquire write lock.
lmutex.Lock()
defer lmutex.Unlock()
// Check again after acquiring write lock.
if list, ok := lcache[uid]; ok {
return list
}
list := new(List)
list.init(key, pstore, mstore)
lcache[uid] = list
return list
}
......@@ -16,13 +16,109 @@
package query
type QAttribute struct {
Attr string
Query *Query
import (
"fmt"
"math"
"github.com/google/flatbuffers/go"
"github.com/manishrjain/dgraph/posting"
"github.com/manishrjain/dgraph/posting/types"
"github.com/manishrjain/dgraph/query/result"
"github.com/manishrjain/dgraph/uid"
"github.com/manishrjain/dgraph/x"
)
var log = x.Log("query")
type Mattr struct {
Attr string
Msg *Message
ResultUids []byte // Flatbuffer result.Uids
ResultValue []byte // gob.Encode
}
type Query struct {
type Message struct {
Id uint64 // Dgraph Id
Eid string // External Id
Attrs []QAttribute
Xid string // External Id
Attrs []Mattr
}
type Node struct {
Id uint64
Xid string
}
func extract(l *posting.List, uids *[]byte, value *[]byte) error {
b := flatbuffers.NewBuilder(0)
var p types.Posting
llen := l.Length()
if ok := l.Get(&p, l.Length()-1); ok {
if p.Uid() == math.MaxUint64 {
// Contains a value posting, not useful for Uids vector.
llen -= 1
}
}
result.UidsStartUidVector(b, llen)
for i := l.Length() - 1; i >= 0; i-- {
if ok := l.Get(&p, i); !ok {
return fmt.Errorf("While retrieving posting")
}
if p.Uid() == math.MaxUint64 {
*value = make([]byte, p.ValueLength())
copy(*value, p.ValueBytes())
} else {
b.PrependUint64(p.Uid())
}
}
vend := b.EndVector(llen)
result.UidsStart(b)
result.UidsAddUid(b, vend)
end := result.UidsEnd(b)
b.Finish(end)
buf := b.Bytes[b.Head():]
*uids = make([]byte, len(buf))
copy(*uids, buf)
return nil
}
func Run(m *Message) error {
if len(m.Xid) > 0 {
u, err := uid.GetOrAssign(m.Xid)
if err != nil {
x.Err(log, err).WithField("xid", m.Xid).Error(
"While GetOrAssign uid from external id")
return err
}
log.WithField("xid", m.Xid).WithField("uid", u).Debug("GetOrAssign")
m.Id = u
}
if m.Id == 0 {
err := fmt.Errorf("Query internal id is zero")
x.Err(log, err).Error("Invalid query")
return err
}
for idx := range m.Attrs {
mattr := &m.Attrs[idx]
key := posting.Key(m.Id, mattr.Attr)
pl := posting.Get(key)
if err := extract(pl, &mattr.ResultUids, &mattr.ResultValue); err != nil {
x.Err(log, err).WithField("uid", m.Id).WithField("attr", mattr.Attr).
Error("While extracting data from posting list")
}
if mattr.Msg != nil {
if err := Run(mattr.Msg); err != nil {
return err
}
}
}
return nil
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package query
import (
"io/ioutil"
"math"
"os"
"testing"
"time"
"github.com/Sirupsen/logrus"
"github.com/manishrjain/dgraph/posting"
"github.com/manishrjain/dgraph/query/result"
"github.com/manishrjain/dgraph/store"
"github.com/manishrjain/dgraph/x"
)
func setErr(err *error, nerr error) {
if err != nil {
return
}
*err = nerr
}
func populateList(key []byte) error {
pl := posting.Get(key)
t := x.Triple{
ValueId: 9,
Source: "query_test",
Timestamp: time.Now(),
}
var err error
setErr(&err, pl.AddMutation(t, posting.Set))
t.ValueId = 19
setErr(&err, pl.AddMutation(t, posting.Set))
t.ValueId = 29
setErr(&err, pl.AddMutation(t, posting.Set))
t.Value = "abracadabra"
setErr(&err, pl.AddMutation(t, posting.Set))
return err
}
func NewStore(t *testing.T) string {
path, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
t.Fail()
return ""
}
return path
}
func TestRun(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
pdir := NewStore(t)
defer os.RemoveAll(pdir)
ps := new(store.Store)
ps.Init(pdir)
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
posting.Init(ps, ms)
key := posting.Key(11, "testing")
if err := populateList(key); err != nil {
t.Error(err)
}
key = posting.Key(9, "name")
m := Message{Id: 11}
ma := Mattr{Attr: "testing"}
m.Attrs = append(m.Attrs, ma)
if err := Run(&m); err != nil {
t.Error(err)
}
ma = m.Attrs[0]
uids := result.GetRootAsUids(ma.ResultUids, 0)
if uids.UidLength() != 3 {
t.Errorf("Expected 3. Got: %v", uids.UidLength())
}
var v uint64
v = 9
for i := 0; i < uids.UidLength(); i++ {
if uids.Uid(i) == math.MaxUint64 {
t.Error("Value posting encountered at index:", i)
}
if v != uids.Uid(i) {
t.Errorf("Expected: %v. Got: %v", v, uids.Uid(i))
}
v += 10
}
log.Debugf("ResultUid buffer size: %v", len(ma.ResultUids))
var val string
if err := posting.ParseValue(&val, ma.ResultValue); err != nil {
t.Error(err)
}
if val != "abracadabra" {
t.Errorf("Expected abracadabra. Got: [%q]", val)
}
}
namespace result;
table Uids {
uid:[ulong];
}
root_type Uids;
// automatically generated, do not modify
package result
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type Uids struct {
_tab flatbuffers.Table
}
func GetRootAsUids(buf []byte, offset flatbuffers.UOffsetT) *Uids {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &Uids{}
x.Init(buf, n + offset)
return x
}
func (rcv *Uids) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *Uids) Uid(j int) uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetUint64(a + flatbuffers.UOffsetT(j * 8))
}
return 0
}
func (rcv *Uids) UidLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func UidsStart(builder *flatbuffers.Builder) { builder.StartObject(1) }
func UidsAddUid(builder *flatbuffers.Builder, uid flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(uid), 0) }
func UidsStartUidVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(8, numElems, 8)
}
func UidsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() }
......@@ -17,9 +17,6 @@
package store
import (
"bytes"
"encoding/binary"
"github.com/manishrjain/gocrud/x"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
......@@ -46,16 +43,6 @@ func (s *Store) IsNew(id uint64) bool {
return false
}
// key = (attribute, entity id)
func Key(attr string, eid uint64) []byte {
buf := new(bytes.Buffer)
buf.WriteString(attr)
if err := binary.Write(buf, binary.LittleEndian, eid); err != nil {
log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid)
}
return buf.Bytes()
}
func (s *Store) Get(k []byte) (val []byte, rerr error) {
return s.db.Get(k, nil)
}
......
......@@ -25,34 +25,24 @@ import (
"github.com/dgryski/go-farm"
"github.com/manishrjain/dgraph/posting"
"github.com/manishrjain/dgraph/posting/types"
"github.com/manishrjain/dgraph/store"
"github.com/manishrjain/dgraph/x"
)
var log = x.Log("uid")
type Assigner struct {
mstore *store.Store
pstore *store.Store
}
func (a *Assigner) Init(pstore, mstore *store.Store) {
a.mstore = mstore
a.pstore = pstore
}
func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) {
func allocateNew(xid string) (uid uint64, rerr error) {
for sp := ""; ; sp += " " {
txid := xid + sp
uid = farm.Fingerprint64([]byte(txid)) // Generate from hash.
log.Debugf("txid: [%q] uid: [%x]", txid, uid)
log.WithField("txid", txid).WithField("uid", uid).Debug("Generated")
if uid == math.MaxUint64 {
log.Debug("Hit uint64max while generating fingerprint. Ignoring...")
continue
}
// Check if this uid has already been allocated.
// TODO: Posting List shouldn't be created here.
// Possibly, use some singular class to serve all the posting lists.
pl := new(posting.List)
key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid
pl.Init(key, a.pstore, a.mstore)
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.Get(key)
if pl.Length() > 0 {
// Something already present here.
......@@ -60,7 +50,7 @@ func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) {
pl.Get(&p, 0)
var tmp string
posting.ParseValue(&tmp, p)
posting.ParseValue(&tmp, p.ValueBytes())
log.Debug("Found existing xid: [%q]. Continuing...", tmp)
continue
}
......@@ -84,7 +74,7 @@ func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) {
" Wake the stupid developer up.")
}
func Key(xid string) []byte {
func stringKey(xid string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("_uid_")
buf.WriteString("|")
......@@ -92,13 +82,13 @@ func Key(xid string) []byte {
return buf.Bytes()
}
func (a *Assigner) GetOrAssign(xid string) (uid uint64, rerr error) {
key := Key(xid)
pl := new(posting.List)
pl.Init(key, a.pstore, a.mstore)
// TODO: Currently one posting list is modified after another, without
func GetOrAssign(xid string) (uid uint64, rerr error) {
key := stringKey(xid)
pl := posting.Get(key)
if pl.Length() == 0 {
// No current id exists. Create one.
uid, err := a.allocateNew(xid)
uid, err := allocateNew(xid)
if err != nil {
return 0, err
}
......@@ -125,10 +115,9 @@ func (a *Assigner) GetOrAssign(xid string) (uid uint64, rerr error) {
" Wake the stupid developer up.")
}
func (a *Assigner) ExternalId(uid uint64) (xid string, rerr error) {
pl := new(posting.List)
key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid
pl.Init(key, a.pstore, a.mstore)
func ExternalId(uid uint64) (xid string, rerr error) {
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.Get(key)
if pl.Length() == 0 {
return "", errors.New("NO external id")
}
......@@ -147,6 +136,6 @@ func (a *Assigner) ExternalId(uid uint64) (xid string, rerr error) {
if p.Uid() != math.MaxUint64 {
log.WithField("uid", uid).Fatal("Value uid must be MaxUint64.")
}
rerr = posting.ParseValue(&xid, p)
rerr = posting.ParseValue(&xid, p.ValueBytes())
return xid, rerr
}
......@@ -22,6 +22,7 @@ import (
"testing"
"github.com/Sirupsen/logrus"
"github.com/manishrjain/dgraph/posting"
"github.com/manishrjain/dgraph/store"
)
......@@ -47,13 +48,11 @@ func TestGetOrAssign(t *testing.T) {
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
var a Assigner
a.Init(ps, ms)
posting.Init(ps, ms)
var u1, u2 uint64
{
uid, err := a.GetOrAssign("externalid0")
uid, err := GetOrAssign("externalid0")
if err != nil {
t.Error(err)
}
......@@ -62,7 +61,7 @@ func TestGetOrAssign(t *testing.T) {
}
{
uid, err := a.GetOrAssign("externalid1")
uid, err := GetOrAssign("externalid1")
if err != nil {
t.Error(err)
}
......@@ -76,7 +75,7 @@ func TestGetOrAssign(t *testing.T) {
// return
{
uid, err := a.GetOrAssign("externalid0")
uid, err := GetOrAssign("externalid0")
if err != nil {
t.Error(err)
}
......@@ -88,7 +87,7 @@ func TestGetOrAssign(t *testing.T) {
// return
{
xid, err := a.ExternalId(u1)
xid, err := ExternalId(u1)
if err != nil {
t.Error(err)
}
......@@ -98,7 +97,7 @@ func TestGetOrAssign(t *testing.T) {
}
return
{
xid, err := a.ExternalId(u2)
xid, err := ExternalId(u2)
if err != nil {
t.Error(err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment