From 4e6355ffb80667112b4651e0d490ace76f99d423 Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Thu, 3 Mar 2016 17:05:47 +1100 Subject: [PATCH] Loader shouldn't cause uid assignment. Add a Get() function in uid, and avoid calling rdf.GetUid. Plus, other changes across the entire code base to accommodate this. --- loader/loader.go | 27 +++++++++------ rdf/parse.go | 19 ++++++----- server/loader/main.go | 2 +- server/loader/main_test.go | 58 ++++++++++++++++++++++++++++----- server/loader/test_input | 15 +++++---- server/main_test.go | 30 +++++++++++++---- server/uidassigner/main_test.go | 13 ++++---- uid/assigner.go | 17 ++++++++++ 8 files changed, 133 insertions(+), 48 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 64fd1b66..3ea2088f 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -21,6 +21,7 @@ import ( "io" "math/rand" "runtime" + "strconv" "strings" "sync" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" ) @@ -129,7 +131,7 @@ func (s *state) parseStream(done chan error) { func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { - edge, err := nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err := nq.ToEdge() for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -140,7 +142,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err = nq.ToEdge() } // Only handle this edge if the attribute satisfies the modulo rule @@ -158,20 +160,25 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { wg.Done() } -func (s *state) getUidForString(str string) error { - _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) +func (s *state) assignUid(xid string) error { + if strings.HasPrefix(xid, "_uid_:") { + _, err := strconv.ParseUint(xid[6:], 0, 64) + return err + } + + _, err := uid.GetOrAssign(xid, s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { time.Sleep(time.Microsecond) - glog.WithError(err).WithField("nq.Subject", str). + glog.WithError(err).WithField("xid", xid). Debug("Temporary error") } else { - glog.WithError(err).WithField("nq.Subject", str). + glog.WithError(err).WithField("xid", xid). Error("While getting UID") return err } - _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) + _, err = uid.GetOrAssign(xid, s.instanceIdx, s.numInstances) } return nil } @@ -182,7 +189,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { for nq := range s.cnq { ignored := true if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx { - if err := s.getUidForString(nq.Subject); err != nil { + if err := s.assignUid(nq.Subject); err != nil { glog.WithError(err).Fatal("While assigning Uid to subject.") } ignored = false @@ -190,7 +197,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { if len(nq.ObjectId) > 0 && farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx { - if err := s.getUidForString(nq.ObjectId); err != nil { + if err := s.assignUid(nq.ObjectId); err != nil { glog.WithError(err).Fatal("While assigning Uid to object.") } ignored = false @@ -205,7 +212,7 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { } // Blocking function. -func HandleRdfReader(reader io.Reader, instanceIdx uint64, +func LoadEdges(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) diff --git a/rdf/parse.go b/rdf/parse.go index 732e851b..3afe48bc 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,23 +36,26 @@ type NQuad struct { Language string } -func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) { - if strings.HasPrefix(s, "_uid_:") { - return strconv.ParseUint(s[6:], 0, 64) +func getUid(xid string) (uint64, error) { + if strings.HasPrefix(xid, "_uid_:") { + return strconv.ParseUint(xid[6:], 0, 64) } - return uid.GetOrAssign(s, instanceIdx, numInstances) + return uid.Get(xid) } -func (nq NQuad) ToEdge(instanceIdx, - numInstances uint64) (result x.DirectedEdge, rerr error) { +// ToEdge is useful when you want to find the UID corresponding to XID for +// just one edge. ToEdgeUsing(map) is useful when you do this conversion +// in bulk, say over a network call. None of these methods generate a UID +// for an XID. +func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject, instanceIdx, numInstances) + sid, err := getUid(nq.Subject) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances) + oid, err := getUid(nq.ObjectId) if err != nil { return result, err } diff --git a/server/loader/main.go b/server/loader/main.go index 63302a10..24d00334 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -99,7 +99,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances) + count, err := loader.LoadEdges(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/loader/main_test.go b/server/loader/main_test.go index bbbe8aff..1e73a3a8 100644 --- a/server/loader/main_test.go +++ b/server/loader/main_test.go @@ -39,16 +39,58 @@ func TestQuery(t *testing.T) { uid.Init(ps) loader.Init(ps, ps1) - f, err := os.Open("test_input") - r := bufio.NewReader(f) - count, err := loader.HandleRdfReader(r, 1, 2) - t.Logf("count", count) + var count uint64 + { + f, err := os.Open("test_input") + if err != nil { + t.Error(err) + t.Fail() + } + r := bufio.NewReader(f) + count, err = loader.AssignUids(r, 0, 1) // Assign uids for everything. + t.Logf("count: %v", count) + f.Close() + posting.MergeLists(100) + } + { + f, err := os.Open("test_input") + if err != nil { + t.Error(err) + t.Fail() + } + r := bufio.NewReader(f) + count, err = loader.LoadEdges(r, 1, 2) + t.Logf("count: %v", count) + f.Close() + posting.MergeLists(100) + } - posting.MergeLists(100) + if farm.Fingerprint64([]byte("follows"))%2 != 1 { + t.Error("Expected fp to be 1.") + t.Fail() + } + if count != 4 { + t.Error("loader assignment not as expected") + } - if farm.Fingerprint64([]byte("follows"))%2 == 1 { - if count != 4 { - t.Error("loader assignment not as expected") + { + f, err := os.Open("test_input") + if err != nil { + t.Error(err) + t.Fail() } + r := bufio.NewReader(f) + count, err = loader.LoadEdges(r, 0, 2) + t.Logf("count: %v", count) + f.Close() + posting.MergeLists(100) + } + + if farm.Fingerprint64([]byte("enemy"))%2 != 0 { + t.Error("Expected fp to be 0.") + t.Fail() + } + if count != 4 { + t.Error("loader assignment not as expected") } } diff --git a/server/loader/test_input b/server/loader/test_input index 642eec7f..3ed3dd85 100644 --- a/server/loader/test_input +++ b/server/loader/test_input @@ -1,7 +1,8 @@ -`_:alice1 <follows> _:bob0 .` -`_:alice2 <follows> _:bob1 .` -`_:alice3 <follows> _:bob2 .` -`_:alice4 <follows> _:bob3 .` -`_:alice1 <friend1> _:bob5 .` -`_:alice2 <friend1> _:bob6 .` -`_:alice3 <friend1> _:bob7 .` +<alice> <follows> <bob0> . +<alice> <enemy> <bob1> . +<alice> <follows> <bob2> . +<alice> <enemy> <bob3> . +<alice> <enemy> <bob4> . +<alice> <follows> <bob5> . +<alice> <enemy> <bob6> . +<alice> <follows> <bob7> . diff --git a/server/main_test.go b/server/main_test.go index 962256cd..79064772 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -66,15 +66,31 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er uid.Init(ps) loader.Init(ps, ps) - f, err := os.Open("testdata.nq") - if err != nil { - return dir1, dir2, nil, clog, err + { + // Assign Uids first. + f, err := os.Open("testdata.nq") + if err != nil { + return dir1, dir2, nil, clog, err + } + _, err = loader.AssignUids(f, 0, 1) + f.Close() + if err != nil { + return dir1, dir2, nil, clog, err + } } - defer f.Close() - _, err = loader.HandleRdfReader(f, 0, 1) - if err != nil { - return dir1, dir2, nil, clog, err + { + // Then load data. + f, err := os.Open("testdata.nq") + if err != nil { + return dir1, dir2, nil, clog, err + } + _, err = loader.LoadEdges(f, 0, 1) + f.Close() + if err != nil { + return dir1, dir2, nil, clog, err + } } + return dir1, dir2, ps, clog, nil } diff --git a/server/uidassigner/main_test.go b/server/uidassigner/main_test.go index 85ec4aa6..948a657d 100644 --- a/server/uidassigner/main_test.go +++ b/server/uidassigner/main_test.go @@ -9,7 +9,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/uid" "github.com/dgryski/go-farm" @@ -41,18 +40,18 @@ func TestQuery(t *testing.T) { list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"} for _, str := range list { if farm.Fingerprint64([]byte(str))%numInstances == 0 { - uid, err := rdf.GetUid(str, 0, numInstances) - if uid < minIdx0 || uid > minIdx0+mod-1 { + u, err := uid.GetOrAssign(str, 0, numInstances) + if u < minIdx0 || u > minIdx0+mod-1 { t.Error("Not the correct UID", err) } - t.Logf("Instance-0 Correct UID", str, uid) + t.Logf("Instance-0 Correct UID", str, u) } else { - uid, err := rdf.GetUid(str, 1, numInstances) - if uid < minIdx1 || uid > minIdx1+mod-1 { + u, err := uid.GetOrAssign(str, 1, numInstances) + if u < minIdx1 || u > minIdx1+mod-1 { t.Error("Not the correct UID", err) } - t.Logf("Instance-1 Correct UID", str, uid) + t.Logf("Instance-1 Correct UID", str, u) } } } diff --git a/uid/assigner.go b/uid/assigner.go index d5d84163..ff823934 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -18,6 +18,7 @@ package uid import ( "errors" + "fmt" "math" "sync" "time" @@ -190,6 +191,22 @@ func stringKey(xid string) []byte { return []byte("_uid_|" + xid) } +func Get(xid string) (uid uint64, rerr error) { + key := stringKey(xid) + pl := posting.GetOrCreate(key, uidStore) + if pl.Length() == 0 { + return 0, fmt.Errorf("xid: %v doesn't have any uid assigned.", xid) + } + if pl.Length() > 1 { + glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + } + var p types.Posting + if ok := pl.Get(&p, 0); !ok { + return 0, fmt.Errorf("While retrieving entry from posting list") + } + return p.Uid(), nil +} + func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { -- GitLab