diff --git a/loader/loader.go b/loader/loader.go index 4d5fe803a31971b7c170a2132b7647c8ab16dbb2..718ae5fde528cdde6b8279f0359131aa9637ac94 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -35,6 +35,7 @@ import ( ) var glog = x.Log("loader") +var rStore, rwStore *store.Store type counters struct { read uint64 @@ -51,6 +52,11 @@ type state struct { numInstances uint64 } +func Init(rs, rws *store.Store) { + rStore = rs + rwStore = rws +} + func (s *state) printCounters(ticker *time.Ticker) { var prev uint64 for _ = range ticker.C { @@ -121,7 +127,7 @@ func (s *state) parseStream(done chan error) { done <- nil } -func (s *state) handleNQuads(wg *sync.WaitGroup, rwStore, rStore *store.Store) { +func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, rStore) for err != nil { @@ -151,7 +157,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup, rwStore, rStore *store.Store) { wg.Done() } -func (s *state) getUidForString(str string, rwStore *store.Store) { +func (s *state) getUidForString(str string) { _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, rwStore) for err != nil { // Just put in a retry loop to tackle temporary errors. @@ -168,13 +174,13 @@ func (s *state) getUidForString(str string, rwStore *store.Store) { } } -func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup, rwStore *store.Store) { +func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx { // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - s.getUidForString(nq.Subject, rwStore) + s.getUidForString(nq.Subject) } if len(nq.ObjectId) == 0 || @@ -182,7 +188,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup, rwStore *store.Store // This instance shouldnt or cant assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - s.getUidForString(nq.ObjectId, rwStore) + s.getUidForString(nq.ObjectId) } } @@ -190,7 +196,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup, rwStore *store.Store } // Blocking function. -func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64, rwStore, rStore *store.Store) (uint64, error) { +func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -212,7 +218,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64, wg := new(sync.WaitGroup) for i := 0; i < 3000; i++ { wg.Add(1) - go s.handleNQuads(wg, rwStore, rStore) // NQuads --> Posting list [slow]. + go s.handleNQuads(wg) // NQuads --> Posting list [slow]. } // Block until all parseStream goroutines are finished. @@ -232,7 +238,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64, } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64, rwStore *store.Store) (uint64, error) { +func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -254,7 +260,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstanc wg := new(sync.WaitGroup) for i := 0; i < 3000; i++ { wg.Add(1) - go s.handleNQuadsWhileAssign(wg, rwStore) //Different compared to HandleRdfReader + go s.handleNQuadsWhileAssign(wg) //Different compared to HandleRdfReader } // Block until all parseStream goroutines are finished. diff --git a/query/query.go b/query/query.go index e176db9305589ae819f52b7e9343c5401c9a582c..a2a2d981744dc24c93922691c972fa0dba309ee6 100644 --- a/query/query.go +++ b/query/query.go @@ -276,7 +276,7 @@ func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") diff --git a/rdf/parse.go b/rdf/parse.go index 7788553de64b8186575c0417d95336e0111904d6..1b678564c92cc2c183f51dab3d04f6643889833a 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -41,7 +41,7 @@ func GetUid(s string, instanceIdx uint64, numInstances uint64, rStore *store.Sto if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } - return uid.GetOrAssign(s, instanceIdx, numInstances, rStore) + return uid.GetOrAssign(s, instanceIdx, numInstances) } func (nq NQuad) ToEdge(instanceIdx, numInstances uint64, rStore *store.Store) (result x.DirectedEdge, rerr error) { diff --git a/server/loader/main.go b/server/loader/main.go index 9077416d867123be87203fb571c703191dd66680..2edbe972dfbda037d8c239d0824854b96ce6ce46 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/loader" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" ) @@ -75,6 +76,8 @@ func main() { defer uidS.Close() posting.Init(nil) + uid.Init(uidS) + loader.Init(uidS, ps) files := strings.Split(*rdfGzips, ",") for _, path := range files { @@ -92,7 +95,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances, ps, uidS) + count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/main.go b/server/main.go index 71881b74ab1a4b642bb568ffaae6791b5443e5a0..0e62178e48cc0444c42453254847c02eadc87bd1 100644 --- a/server/main.go +++ b/server/main.go @@ -30,6 +30,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" ) @@ -140,6 +141,7 @@ func main() { posting.Init(clog) worker.Init(ps) + uid.Init(ps) http.HandleFunc("/query", queryHandler(ps)) glog.WithField("port", *port).Info("Listening for requests...") diff --git a/server/main_test.go b/server/main_test.go index 684159fc3d310ce75b312e820d2d8ae3806540a4..a99f42554ffad6c6201fd9079ed383ddd09c02ef 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/worker" ) @@ -62,13 +63,15 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er posting.Init(clog) worker.Init(ps) + uid.Init(ps) + loader.Init(ps, ps) f, err := os.Open("testdata.nq") if err != nil { return dir1, dir2, nil, clog, err } defer f.Close() - _, err = loader.HandleRdfReader(f, 0, 1, ps, ps) + _, err = loader.HandleRdfReader(f, 0, 1) if err != nil { return dir1, dir2, nil, clog, err } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index b7d16f9149cfa7d918b0355454f6e07510c7f670..b44938921b4429036bd7e1f7418bef57f38645a8 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -12,6 +12,7 @@ import ( "github.com/dgraph-io/dgraph/loader" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" ) @@ -59,6 +60,8 @@ func main() { defer ps.Close() posting.Init(nil) + uid.Init(ps) + loader.Init(nil, ps) files := strings.Split(*rdfGzips, ",") for _, path := range files { @@ -76,7 +79,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances, ps) + count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/uidassigner/main_test.go b/server/uidassigner/main_test.go index d6ca54de607e04533adb771b0762a4002b951219..269b9a839845955eb9114b67f08f1a2bcf31fd7f 100644 --- a/server/uidassigner/main_test.go +++ b/server/uidassigner/main_test.go @@ -11,6 +11,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgryski/go-farm" ) @@ -35,6 +36,8 @@ func TestQuery(t *testing.T) { defer clog.Close() posting.Init(clog) + uid.Init(ps) + list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"} for _, str := range list { if farm.Fingerprint64([]byte(str))%numInstances == 0 { diff --git a/uid/assigner.go b/uid/assigner.go index 81a5db389ced52db308d54ed44db0857b8b2c05e..e4ce273718b7688d4631983a9ad02894c8e29fe0 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -32,6 +32,7 @@ import ( var glog = x.Log("uid") var lmgr *lockManager +var rStore *store.Store type entry struct { sync.Mutex @@ -93,7 +94,11 @@ func init() { // go lmgr.clean() } -func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uid uint64, rerr error) { +func Init(ps *store.Store) { + rStore = ps +} + +func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { mod := math.MaxUint64 / numInstances minIdx := instanceIdx * mod @@ -141,7 +146,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64, rSto " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -159,7 +164,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances ui } // No current id exists. Create one. - uid, err := allocateUniqueUid(xid, instanceIdx, numInstances, rStore) + uid, err := allocateUniqueUid(xid, instanceIdx, numInstances) if err != nil { return 0, err } @@ -181,11 +186,11 @@ func stringKey(xid string) []byte { return buf.Bytes() } -func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uid uint64, rerr error) { +func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { key := stringKey(xid) pl := posting.GetOrCreate(key, rStore) if pl.Length() == 0 { - return assignNew(pl, xid, instanceIdx, numInstances, rStore) + return assignNew(pl, xid, instanceIdx, numInstances) } else if pl.Length() > 1 { glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) @@ -202,7 +207,7 @@ func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64, rStore *st " Wake the stupid developer up.") } -func ExternalId(uid uint64, rStore *store.Store) (xid string, rerr error) { +func ExternalId(uid uint64) (xid string, rerr error) { key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid pl := posting.GetOrCreate(key, rStore) if pl.Length() == 0 { diff --git a/uid/assigner_test.go b/uid/assigner_test.go index c083cb6ed99a403f71e8bed606cb293948a225d4..b08eed2052db06aebbaecbba3ac17ffc3d3ae41b 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -43,10 +43,11 @@ func TestGetOrAssign(t *testing.T) { defer clog.Close() posting.Init(clog) + Init(ps) var u1, u2 uint64 { - uid, err := GetOrAssign("externalid0", 0, 1, ps) + uid, err := GetOrAssign("externalid0", 0, 1) if err != nil { t.Error(err) } @@ -55,7 +56,7 @@ func TestGetOrAssign(t *testing.T) { } { - uid, err := GetOrAssign("externalid1", 0, 1, ps) + uid, err := GetOrAssign("externalid1", 0, 1) if err != nil { t.Error(err) } @@ -69,7 +70,7 @@ func TestGetOrAssign(t *testing.T) { // return { - uid, err := GetOrAssign("externalid0", 0, 1, ps) + uid, err := GetOrAssign("externalid0", 0, 1) if err != nil { t.Error(err) } @@ -81,7 +82,7 @@ func TestGetOrAssign(t *testing.T) { // return { - xid, err := ExternalId(u1, ps) + xid, err := ExternalId(u1) if err != nil { t.Error(err) } @@ -91,7 +92,7 @@ func TestGetOrAssign(t *testing.T) { } return { - xid, err := ExternalId(u2, ps) + xid, err := ExternalId(u2) if err != nil { t.Error(err) }