Skip to content
Snippets Groups Projects
Commit 4e6355ff authored by Manish R Jain's avatar Manish R Jain
Browse files

Loader shouldn't cause uid assignment. Add a Get() function in uid, and avoid...

Loader shouldn't cause uid assignment. Add a Get() function in uid, and avoid calling rdf.GetUid. Plus, other changes across the entire code base to accommodate this.
parent 9982d13c
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,7 @@ import (
"io"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
......@@ -30,6 +31,7 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
)
......@@ -129,7 +131,7 @@ func (s *state) parseStream(done chan error) {
func (s *state) handleNQuads(wg *sync.WaitGroup) {
for nq := range s.cnq {
edge, err := nq.ToEdge(s.instanceIdx, s.numInstances)
edge, err := nq.ToEdge()
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
......@@ -140,7 +142,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge")
return
}
edge, err = nq.ToEdge(s.instanceIdx, s.numInstances)
edge, err = nq.ToEdge()
}
// Only handle this edge if the attribute satisfies the modulo rule
......@@ -158,20 +160,25 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
wg.Done()
}
func (s *state) getUidForString(str string) error {
_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances)
func (s *state) assignUid(xid string) error {
if strings.HasPrefix(xid, "_uid_:") {
_, err := strconv.ParseUint(xid[6:], 0, 64)
return err
}
_, err := uid.GetOrAssign(xid, s.instanceIdx, s.numInstances)
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
time.Sleep(time.Microsecond)
glog.WithError(err).WithField("nq.Subject", str).
glog.WithError(err).WithField("xid", xid).
Debug("Temporary error")
} else {
glog.WithError(err).WithField("nq.Subject", str).
glog.WithError(err).WithField("xid", xid).
Error("While getting UID")
return err
}
_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances)
_, err = uid.GetOrAssign(xid, s.instanceIdx, s.numInstances)
}
return nil
}
......@@ -182,7 +189,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
for nq := range s.cnq {
ignored := true
if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx {
if err := s.getUidForString(nq.Subject); err != nil {
if err := s.assignUid(nq.Subject); err != nil {
glog.WithError(err).Fatal("While assigning Uid to subject.")
}
ignored = false
......@@ -190,7 +197,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
if len(nq.ObjectId) > 0 &&
farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx {
if err := s.getUidForString(nq.ObjectId); err != nil {
if err := s.assignUid(nq.ObjectId); err != nil {
glog.WithError(err).Fatal("While assigning Uid to object.")
}
ignored = false
......@@ -205,7 +212,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
}
// Blocking function.
func HandleRdfReader(reader io.Reader, instanceIdx uint64,
func LoadEdges(reader io.Reader, instanceIdx uint64,
numInstances uint64) (uint64, error) {
s := new(state)
......
......@@ -36,23 +36,26 @@ type NQuad struct {
Language string
}
func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64)
func getUid(xid string) (uint64, error) {
if strings.HasPrefix(xid, "_uid_:") {
return strconv.ParseUint(xid[6:], 0, 64)
}
return uid.GetOrAssign(s, instanceIdx, numInstances)
return uid.Get(xid)
}
func (nq NQuad) ToEdge(instanceIdx,
numInstances uint64) (result x.DirectedEdge, rerr error) {
// ToEdge is useful when you want to find the UID corresponding to XID for
// just one edge. ToEdgeUsing(map) is useful when you do this conversion
// in bulk, say over a network call. None of these methods generate a UID
// for an XID.
func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInstances)
sid, err := getUid(nq.Subject)
if err != nil {
return result, err
}
result.Entity = sid
if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances)
oid, err := getUid(nq.ObjectId)
if err != nil {
return result, err
}
......
......@@ -99,7 +99,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.")
}
count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances)
count, err := loader.LoadEdges(r, *instanceIdx, *numInstances)
if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.")
}
......
......@@ -39,16 +39,58 @@ func TestQuery(t *testing.T) {
uid.Init(ps)
loader.Init(ps, ps1)
f, err := os.Open("test_input")
r := bufio.NewReader(f)
count, err := loader.HandleRdfReader(r, 1, 2)
t.Logf("count", count)
var count uint64
{
f, err := os.Open("test_input")
if err != nil {
t.Error(err)
t.Fail()
}
r := bufio.NewReader(f)
count, err = loader.AssignUids(r, 0, 1) // Assign uids for everything.
t.Logf("count: %v", count)
f.Close()
posting.MergeLists(100)
}
{
f, err := os.Open("test_input")
if err != nil {
t.Error(err)
t.Fail()
}
r := bufio.NewReader(f)
count, err = loader.LoadEdges(r, 1, 2)
t.Logf("count: %v", count)
f.Close()
posting.MergeLists(100)
}
posting.MergeLists(100)
if farm.Fingerprint64([]byte("follows"))%2 != 1 {
t.Error("Expected fp to be 1.")
t.Fail()
}
if count != 4 {
t.Error("loader assignment not as expected")
}
if farm.Fingerprint64([]byte("follows"))%2 == 1 {
if count != 4 {
t.Error("loader assignment not as expected")
{
f, err := os.Open("test_input")
if err != nil {
t.Error(err)
t.Fail()
}
r := bufio.NewReader(f)
count, err = loader.LoadEdges(r, 0, 2)
t.Logf("count: %v", count)
f.Close()
posting.MergeLists(100)
}
if farm.Fingerprint64([]byte("enemy"))%2 != 0 {
t.Error("Expected fp to be 0.")
t.Fail()
}
if count != 4 {
t.Error("loader assignment not as expected")
}
}
`_:alice1 <follows> _:bob0 .`
`_:alice2 <follows> _:bob1 .`
`_:alice3 <follows> _:bob2 .`
`_:alice4 <follows> _:bob3 .`
`_:alice1 <friend1> _:bob5 .`
`_:alice2 <friend1> _:bob6 .`
`_:alice3 <friend1> _:bob7 .`
<alice> <follows> <bob0> .
<alice> <enemy> <bob1> .
<alice> <follows> <bob2> .
<alice> <enemy> <bob3> .
<alice> <enemy> <bob4> .
<alice> <follows> <bob5> .
<alice> <enemy> <bob6> .
<alice> <follows> <bob7> .
......@@ -66,15 +66,31 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
uid.Init(ps)
loader.Init(ps, ps)
f, err := os.Open("testdata.nq")
if err != nil {
return dir1, dir2, nil, clog, err
{
// Assign Uids first.
f, err := os.Open("testdata.nq")
if err != nil {
return dir1, dir2, nil, clog, err
}
_, err = loader.AssignUids(f, 0, 1)
f.Close()
if err != nil {
return dir1, dir2, nil, clog, err
}
}
defer f.Close()
_, err = loader.HandleRdfReader(f, 0, 1)
if err != nil {
return dir1, dir2, nil, clog, err
{
// Then load data.
f, err := os.Open("testdata.nq")
if err != nil {
return dir1, dir2, nil, clog, err
}
_, err = loader.LoadEdges(f, 0, 1)
f.Close()
if err != nil {
return dir1, dir2, nil, clog, err
}
}
return dir1, dir2, ps, clog, nil
}
......
......@@ -9,7 +9,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgryski/go-farm"
......@@ -41,18 +40,18 @@ func TestQuery(t *testing.T) {
list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"}
for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 0 {
uid, err := rdf.GetUid(str, 0, numInstances)
if uid < minIdx0 || uid > minIdx0+mod-1 {
u, err := uid.GetOrAssign(str, 0, numInstances)
if u < minIdx0 || u > minIdx0+mod-1 {
t.Error("Not the correct UID", err)
}
t.Logf("Instance-0 Correct UID", str, uid)
t.Logf("Instance-0 Correct UID", str, u)
} else {
uid, err := rdf.GetUid(str, 1, numInstances)
if uid < minIdx1 || uid > minIdx1+mod-1 {
u, err := uid.GetOrAssign(str, 1, numInstances)
if u < minIdx1 || u > minIdx1+mod-1 {
t.Error("Not the correct UID", err)
}
t.Logf("Instance-1 Correct UID", str, uid)
t.Logf("Instance-1 Correct UID", str, u)
}
}
}
......@@ -18,6 +18,7 @@ package uid
import (
"errors"
"fmt"
"math"
"sync"
"time"
......@@ -190,6 +191,22 @@ func stringKey(xid string) []byte {
return []byte("_uid_|" + xid)
}
func Get(xid string) (uid uint64, rerr error) {
key := stringKey(xid)
pl := posting.GetOrCreate(key, uidStore)
if pl.Length() == 0 {
return 0, fmt.Errorf("xid: %v doesn't have any uid assigned.", xid)
}
if pl.Length() > 1 {
glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
}
var p types.Posting
if ok := pl.Get(&p, 0); !ok {
return 0, fmt.Errorf("While retrieving entry from posting list")
}
return p.Uid(), nil
}
func GetOrAssign(xid string, instanceIdx uint64,
numInstances uint64) (uid uint64, rerr error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment