diff --git a/loader/loader.go b/loader/loader.go index bbf3309be25073d765f64919d0b7023558b778fd..a3953e117f18bbd86f2258d263f0c1def688442d 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -43,10 +43,11 @@ type counters struct { } type state struct { - input chan string - cnq chan rdf.NQuad - ctr *counters - mod uint64 + input chan string + cnq chan rdf.NQuad + ctr *counters + instanceIdx uint64 + numInstances uint64 } func (s *state) printCounters(ticker *time.Ticker) { @@ -121,13 +122,7 @@ func (s *state) parseStream(done chan error) { func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { - if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { - // Ignore due to mod sampling. - atomic.AddUint64(&s.ctr.ignored, 1) - continue - } - - edge, err := nq.ToEdge() + edge, err := nq.ToEdge(s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -138,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge() + edge, err = nq.ToEdge(s.instanceIdx, s.numInstances) } key := posting.Key(edge.Entity, edge.Attribute) @@ -149,41 +144,53 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { wg.Done() } +func (s *state) getUidForString(str string) { + _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + glog.WithError(err).WithField("nq.Subject", str). + Error("Temporary error") + } else { + glog.WithError(err).WithField("nq.Subject", str). + Error("While getting UID") + return + } + _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) + } +} + func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { - if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { - // Ignore due to mod sampling. + if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx { + // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) - continue + } else { + s.getUidForString(nq.Subject) } - edge, err := nq.ToEdge() - for err != nil { - // Just put in a retry loop to tackle temporary errors. - if err == posting.E_TMP_ERROR { - time.Sleep(time.Microsecond) - - } else { - glog.WithError(err).WithField("nq", nq). - Error("While converting to edge") - return - } - edge, err = nq.ToEdge() + if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx { + // This instance shouldnt or cant assign UID to this string + atomic.AddUint64(&s.ctr.ignored, 1) + } else { + s.getUidForString(nq.ObjectId) } - glog.Info(edge); } + wg.Done() } // Blocking function. -func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { +func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) go s.printCounters(ticker) // Producer: Start buffering input to channel. - s.mod = mod + s.instanceIdx = instanceIdx + s.numInstances = numInstances s.input = make(chan string, 10000) go s.readLines(reader) @@ -217,14 +224,15 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { +func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) go s.printCounters(ticker) // Producer: Start buffering input to channel. - s.mod = mod + s.instanceIdx = instanceIdx + s.numInstances = numInstances s.input = make(chan string, 10000) go s.readLines(reader) @@ -256,4 +264,3 @@ func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { ticker.Stop() return atomic.LoadUint64(&s.ctr.processed), nil } - diff --git a/query/query.go b/query/query.go index 2d1ccb2094e6e89c72c2d6fde8e00cf2e5839e32..4d2e28fe5d8e352c6fca59296918fc833b84fd58 100644 --- a/query/query.go +++ b/query/query.go @@ -255,7 +255,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid) + u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") diff --git a/rdf/parse.go b/rdf/parse.go index af27e4aa555440582a99d7805fef4dcaafce66b6..d56471ac5de95cc82fb460be0237fbb80152a855 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,21 +36,21 @@ type NQuad struct { Language string } -func getUid(s string) (uint64, error) { +func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } - return uid.GetOrAssign(s) + return uid.GetOrAssign(s, instanceIdx, numInstances) } -func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := getUid(nq.Subject) +func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) { + sid, err := GetUid(nq.Subject, instanceIdx, numInstances) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := getUid(nq.ObjectId) + oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances) if err != nil { return result, err } diff --git a/server/loader/main.go b/server/loader/main.go index bb28ea70aceeb80faec07b3c2ca0aec8ff28a88a..3871936f01cec1d86e0d9ab4d49169563d1b4607 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -35,7 +35,8 @@ var glog = x.Log("loader_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") -var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") +var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") +var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") var postingDir = flag.String("postings", "", "Directory to store posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -86,7 +87,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *mod) + count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/main_test.go b/server/main_test.go index 268427dce6657957162f960a09cfd532d7c637dc..369700266c50773bf611f3c4e2b0746b37d17d20 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -66,7 +66,7 @@ func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) { return dir1, dir2, clog, err } defer f.Close() - _, err = loader.HandleRdfReader(f, 1) + _, err = loader.HandleRdfReader(f, 0, 1) if err != nil { return dir1, dir2, clog, err } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index c9babee4daae968ab8415aef04c06301459b5dab..3d307b99892b064fb8ec3d8c14fcf161cb424ebf 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -1,43 +1,43 @@ package main import ( - "compress/gzip" - "flag" - "os" - "runtime" - "runtime/pprof" - "strings" + "compress/gzip" + "flag" + "os" + "runtime" + "runtime/pprof" + "strings" - "github.com/Sirupsen/logrus" - "github.com/dgraph-io/dgraph/loader" - "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/store" - "github.com/dgraph-io/dgraph/x" + "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/loader" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/x" ) var glog = x.Log("uidassigner_main") var rdfGzips = flag.String("rdfgzips", "", - "Comma separated gzip files containing RDF data") -var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") + "Comma separated gzip files containing RDF data") +var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") +var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") func main() { - flag.Parse() - if !flag.Parsed() { - glog.Fatal("Unable to parse flags") - } - if len(*cpuprofile) > 0 { - f, err := os.Create(*cpuprofile) - if err != nil { - glog.Fatal(err) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - + flag.Parse() + if !flag.Parsed() { + glog.Fatal("Unable to parse flags") + } + if len(*cpuprofile) > 0 { + f, err := os.Create(*cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } logrus.SetLevel(logrus.InfoLevel) numCpus := *numcpu @@ -46,6 +46,10 @@ func main() { WithField("prev_maxprocs", prevProcs). Info("Set max procs to num cpus") + glog.WithField("instanceIdx", *instanceIdx). + WithField("numInstances", *numInstances). + Info("Only those XIDs which satisfy FP(xid) % numInstance == instanceIdx will be given UID") + if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") } @@ -72,7 +76,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReaderWhileAssign(r, *mod) + count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } @@ -83,4 +87,3 @@ func main() { glog.Info("Calling merge lists") posting.MergeLists(100 * numCpus) // 100 per core. } - diff --git a/server/uidassigner/uidassigner b/server/uidassigner/uidassigner new file mode 100755 index 0000000000000000000000000000000000000000..1628e2e51e153a71e5eff9b79b7c2ebf26654e0b Binary files /dev/null and b/server/uidassigner/uidassigner differ diff --git a/uid/assigner.go b/uid/assigner.go index 85ad39500040d0946f048c404130d9849269b85c..8fb105b07cea8383738513426f40b0838c239f30 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -92,10 +92,17 @@ func init() { // go lmgr.clean() } -func allocateUniqueUid(xid string) (uid uint64, rerr error) { +func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { + + mod := math.MaxUint64 / numInstances + minIdx := instanceIdx * mod + for sp := ""; ; sp += " " { txid := xid + sp - uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. + + uid1 := farm.Fingerprint64([]byte(txid)) // Generate from hash. + uid = (uid1 % mod) + minIdx + glog.WithField("txid", txid).WithField("uid", uid).Debug("Generated") if uid == math.MaxUint64 { glog.Debug("Hit uint64max while generating fingerprint. Ignoring...") @@ -133,7 +140,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) { " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -151,7 +158,7 @@ func assignNew(pl *posting.List, xid string) (uint64, error) { } // No current id exists. Create one. - uid, err := allocateUniqueUid(xid) + uid, err := allocateUniqueUid(xid, instanceIdx, numInstances) if err != nil { return 0, err } @@ -173,11 +180,11 @@ func stringKey(xid string) []byte { return buf.Bytes() } -func GetOrAssign(xid string) (uid uint64, rerr error) { +func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { key := stringKey(xid) pl := posting.GetOrCreate(key) if pl.Length() == 0 { - return assignNew(pl, xid) + return assignNew(pl, xid, instanceIdx, numInstances) } else if pl.Length() > 1 { glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) diff --git a/uid/assigner_test.go b/uid/assigner_test.go index bf6d733168366bb928bef821140c7dae30c8a3d9..826c88581381a67e47204ff29aa80a0ae654c77c 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -46,7 +46,7 @@ func TestGetOrAssign(t *testing.T) { var u1, u2 uint64 { - uid, err := GetOrAssign("externalid0") + uid, err := GetOrAssign("externalid0", 0, 1) if err != nil { t.Error(err) } @@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) { } { - uid, err := GetOrAssign("externalid1") + uid, err := GetOrAssign("externalid1", 0, 1) if err != nil { t.Error(err) } @@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) { // return { - uid, err := GetOrAssign("externalid0") + uid, err := GetOrAssign("externalid0", 0, 1) if err != nil { t.Error(err) }