diff --git a/loader/loader.go b/loader/loader.go index a3ccf36cb8a5d71e3c570e26473f21c28bac4ecd..5fa1e7999b5649a56602333a066c5fddb86e4190 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -43,11 +43,11 @@ type counters struct { } type state struct { - input chan string - cnq chan rdf.NQuad - ctr *counters - instanceIdx uint64 - numInstance uint64 + input chan string + cnq chan rdf.NQuad + ctr *counters + instanceIdx uint64 + numInstances uint64 } func (s *state) printCounters(ticker *time.Ticker) { @@ -122,7 +122,7 @@ func (s *state) parseStream(done chan error) { func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { - edge, err := nq.ToEdge(s.instanceIdx, s.numInstance) + edge, err := nq.ToEdge(s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -133,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge(s.instanceIdx, s.numInstance) + edge, err = nq.ToEdge(s.instanceIdx, s.numInstances) } key := posting.Key(edge.Entity, edge.Attribute) @@ -146,11 +146,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { - if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.instanceIdx { + if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx { // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - _, err := rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance) + _, err := rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -162,15 +162,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { Error("While getting UID") return } - _, err = rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance) + _, err = rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstances) } } - if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.instanceIdx { + if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx { // This instance shouldnt or cant assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) + _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -182,7 +182,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { Error("While getting UID") return } - _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) + _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstances) } } } @@ -190,7 +190,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { } // Blocking function. -func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) { +func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -198,7 +198,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) ( // Producer: Start buffering input to channel. s.instanceIdx = instanceIdx - s.numInstance = numInstance + s.numInstances = numInstances s.input = make(chan string, 10000) go s.readLines(reader) @@ -232,7 +232,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) ( } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) { +func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -240,7 +240,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstanc // Producer: Start buffering input to channel. s.instanceIdx = instanceIdx - s.numInstance = numInstance + s.numInstances = numInstances s.input = make(chan string, 10000) go s.readLines(reader) diff --git a/rdf/parse.go b/rdf/parse.go index 632b3b47d846ecfef6073b2002f8088d0ff0e28e..d56471ac5de95cc82fb460be0237fbb80152a855 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,21 +36,21 @@ type NQuad struct { Language string } -func GetUid(s string, instanceIdx uint64, numInst uint64) (uint64, error) { +func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } - return uid.GetOrAssign(s, instanceIdx, numInst) + return uid.GetOrAssign(s, instanceIdx, numInstances) } -func (nq NQuad) ToEdge(instanceIdx, numInst uint64) (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject, instanceIdx, numInst) +func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) { + sid, err := GetUid(nq.Subject, instanceIdx, numInstances) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId, instanceIdx, numInst) + oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances) if err != nil { return result, err } diff --git a/server/loader/main.go b/server/loader/main.go index 361c9c9021fcde830e0a6df47aa54453e8abcccc..3871936f01cec1d86e0d9ab4d49169563d1b4607 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -36,7 +36,7 @@ var glog = x.Log("loader_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") -var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") +var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") var postingDir = flag.String("postings", "", "Directory to store posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -87,7 +87,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstance) + count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index 4df49cae359b9cfa658bdeada896334f6baf1e63..c9ab1d6004cbaa93ddc54c549930e82383c6789f 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -20,7 +20,7 @@ var glog = x.Log("uidassigner_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") -var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") +var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -72,7 +72,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstance) + count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/uid/assigner.go b/uid/assigner.go index bbb889fa707012c7e5a8fe504e96513df4996178..8f8c5f3c67024923995c8cf0a5b16f0b20aa6186 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -92,10 +92,10 @@ func init() { // go lmgr.clean() } -func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { +func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { - minIdx := instanceIdx * math.MaxUint64 / numInst - mod := math.MaxUint64 / numInst + minIdx := instanceIdx * math.MaxUint64 / numInstances + mod := math.MaxUint64 / numInstances for sp := ""; ; sp += " " { txid := xid + sp @@ -140,7 +140,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -158,7 +158,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64) } // No current id exists. Create one. - uid, err := allocateUniqueUid(xid, instanceIdx, numInst) + uid, err := allocateUniqueUid(xid, instanceIdx, numInstances) if err != nil { return 0, err } @@ -180,11 +180,11 @@ func stringKey(xid string) []byte { return buf.Bytes() } -func GetOrAssign(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { +func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { key := stringKey(xid) pl := posting.GetOrCreate(key) if pl.Length() == 0 { - return assignNew(pl, xid, instanceIdx, numInst) + return assignNew(pl, xid, instanceIdx, numInstances) } else if pl.Length() > 1 { glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)