diff --git a/loader/loader.go b/loader/loader.go index 8ac24252f2bd59ab758ab24d011d237c438e49b7..a3ccf36cb8a5d71e3c570e26473f21c28bac4ecd 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -43,10 +43,10 @@ type counters struct { } type state struct { - input chan string - cnq chan rdf.NQuad - ctr *counters - instanceIdx uint64 + input chan string + cnq chan rdf.NQuad + ctr *counters + instanceIdx uint64 numInstance uint64 } @@ -156,7 +156,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { if err == posting.E_TMP_ERROR { time.Sleep(time.Microsecond) glog.WithError(err).WithField("nq.Subject", nq.Subject). - Error("Temporary error") + Error("Temporary error") } else { glog.WithError(err).WithField("nq.Subject", nq.Subject). Error("While getting UID") @@ -167,30 +167,30 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { } if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.instanceIdx { - // This instance shouldnt or cant assign UID to this string - atomic.AddUint64(&s.ctr.ignored, 1) - } else { - _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) - for err != nil { - // Just put in a retry loop to tackle temporary errors. - if err == posting.E_TMP_ERROR { - time.Sleep(time.Microsecond) + // This instance shouldnt or cant assign UID to this string + atomic.AddUint64(&s.ctr.ignored, 1) + } else { + _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) glog.WithError(err).WithField("nq.Subject", nq.Subject). - Error("Temporary error") + Error("Temporary error") } else { - glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId). - Error("While getting UID") - return - } - _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) - } - } + glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId). + Error("While getting UID") + return + } + _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) + } + } } wg.Done() } // Blocking function. -func HandleRdfReader(reader io.Reader, instanceIdx uint64) (uint64, error) { +func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -198,6 +198,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64) (uint64, error) { // Producer: Start buffering input to channel. s.instanceIdx = instanceIdx + s.numInstance = numInstance s.input = make(chan string, 10000) go s.readLines(reader) @@ -271,4 +272,3 @@ func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstanc ticker.Stop() return atomic.LoadUint64(&s.ctr.processed), nil } - diff --git a/server/loader/main.go b/server/loader/main.go index bb28ea70aceeb80faec07b3c2ca0aec8ff28a88a..361c9c9021fcde830e0a6df47aa54453e8abcccc 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -35,7 +35,8 @@ var glog = x.Log("loader_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") -var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") +var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") +var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var postingDir = flag.String("postings", "", "Directory to store posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -86,7 +87,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *mod) + count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstance) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/main_test.go b/server/main_test.go index 268427dce6657957162f960a09cfd532d7c637dc..369700266c50773bf611f3c4e2b0746b37d17d20 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -66,7 +66,7 @@ func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) { return dir1, dir2, clog, err } defer f.Close() - _, err = loader.HandleRdfReader(f, 1) + _, err = loader.HandleRdfReader(f, 0, 1) if err != nil { return dir1, dir2, clog, err } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index d57299fd07d989b044183c4603c7f324ecc5a764..4df49cae359b9cfa658bdeada896334f6baf1e63 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -1,44 +1,43 @@ package main import ( - "compress/gzip" - "flag" - "os" - "runtime" - "runtime/pprof" - "strings" + "compress/gzip" + "flag" + "os" + "runtime" + "runtime/pprof" + "strings" - "github.com/Sirupsen/logrus" - "github.com/dgraph-io/dgraph/loader" - "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/store" - "github.com/dgraph-io/dgraph/x" + "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/loader" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/x" ) var glog = x.Log("uidassigner_main") var rdfGzips = flag.String("rdfgzips", "", - "Comma separated gzip files containing RDF data") -var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") + "Comma separated gzip files containing RDF data") +var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") func main() { - flag.Parse() - if !flag.Parsed() { - glog.Fatal("Unable to parse flags") - } - if len(*cpuprofile) > 0 { - f, err := os.Create(*cpuprofile) - if err != nil { - glog.Fatal(err) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - + flag.Parse() + if !flag.Parsed() { + glog.Fatal("Unable to parse flags") + } + if len(*cpuprofile) > 0 { + f, err := os.Create(*cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } logrus.SetLevel(logrus.InfoLevel) numCpus := *numcpu @@ -84,4 +83,3 @@ func main() { glog.Info("Calling merge lists") posting.MergeLists(100 * numCpus) // 100 per core. } - diff --git a/uid/assigner.go b/uid/assigner.go index 3d522f6426adba7024af0298116292df9cd2eebd..bbb889fa707012c7e5a8fe504e96513df4996178 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -93,9 +93,16 @@ func init() { } func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { + + minIdx := instanceIdx * math.MaxUint64 / numInst + mod := math.MaxUint64 / numInst + for sp := ""; ; sp += " " { txid := xid + sp - uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. + + uid1 := farm.Fingerprint64([]byte(txid)) // Generate from hash. + uid = (uid1 % mod) + minIdx + glog.WithField("txid", txid).WithField("uid", uid).Debug("Generated") if uid == math.MaxUint64 { glog.Debug("Hit uint64max while generating fingerprint. Ignoring...")