diff --git a/loader/loader.go b/loader/loader.go index 99f7e1ca1e46fa98fed7addadde361a026ca7a90..8ac24252f2bd59ab758ab24d011d237c438e49b7 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -46,7 +46,7 @@ type state struct { input chan string cnq chan rdf.NQuad ctr *counters - mod uint64 + instanceIdx uint64 numInstance uint64 } @@ -122,7 +122,7 @@ func (s *state) parseStream(done chan error) { func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { - edge, err := nq.ToEdge(s.mod, s.numInstance) + edge, err := nq.ToEdge(s.instanceIdx, s.numInstance) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -133,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge(s.mod, s.numInstance) + edge, err = nq.ToEdge(s.instanceIdx, s.numInstance) } key := posting.Key(edge.Entity, edge.Attribute) @@ -146,11 +146,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { - if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.mod { + if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.instanceIdx { // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - _, err := rdf.GetUid(nq.Subject, s.mod, s.numInstance) + _, err := rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -162,27 +162,27 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { Error("While getting UID") return } - _, err = rdf.GetUid(nq.Subject, s.mod, s.numInstance) + _, err = rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance) } } - if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.mod { + if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.instanceIdx { // This instance shouldnt or cant assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - _, err := rdf.GetUid(nq.ObjectId, s.mod, s.numInstance) + _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { time.Sleep(time.Microsecond) - glog.WithError(err).WithField("nq.Subject", nq.Subject). + glog.WithError(err).WithField("nq.Subject", nq.Subject). Error("Temporary error") } else { glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId). Error("While getting UID") return } - _, err = rdf.GetUid(nq.ObjectId, s.mod, s.numInstance) + _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) } } } @@ -190,14 +190,14 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { } // Blocking function. -func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { +func HandleRdfReader(reader io.Reader, instanceIdx uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) go s.printCounters(ticker) // Producer: Start buffering input to channel. - s.mod = mod + s.instanceIdx = instanceIdx s.input = make(chan string, 10000) go s.readLines(reader) @@ -231,14 +231,14 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64, numInstance uint64) (uint64, error) { +func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) go s.printCounters(ticker) // Producer: Start buffering input to channel. - s.mod = mod + s.instanceIdx = instanceIdx s.numInstance = numInstance s.input = make(chan string, 10000) go s.readLines(reader) diff --git a/rdf/parse.go b/rdf/parse.go index 58f464d9b84883934bda11b654247e54f0d0c8d5..632b3b47d846ecfef6073b2002f8088d0ff0e28e 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,21 +36,21 @@ type NQuad struct { Language string } -func GetUid(s string, mod uint64, numInst uint64) (uint64, error) { +func GetUid(s string, instanceIdx uint64, numInst uint64) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } - return uid.GetOrAssign(s, mod, numInst) + return uid.GetOrAssign(s, instanceIdx, numInst) } -func (nq NQuad) ToEdge(mod, numInst uint64) (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject, mod, numInst) +func (nq NQuad) ToEdge(instanceIdx, numInst uint64) (result x.DirectedEdge, rerr error) { + sid, err := GetUid(nq.Subject, instanceIdx, numInst) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId, mod, numInst) + oid, err := GetUid(nq.ObjectId, instanceIdx, numInst) if err != nil { return result, err } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index 422c76424a59679949e3dc896554d8f2f382f5f9..d57299fd07d989b044183c4603c7f324ecc5a764 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -19,7 +19,7 @@ var glog = x.Log("uidassigner_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") -var mod = flag.Uint64("machineid", 1, "Only pick entities, where uid % mod == 0.") +var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") @@ -73,7 +73,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReaderWhileAssign(r, *mod, *numInstance) + count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstance) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/uid/assigner.go b/uid/assigner.go index 8e486e2b3e3004fdf2cb53115469a7d1638fa161..3d522f6426adba7024af0298116292df9cd2eebd 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -92,7 +92,7 @@ func init() { // go lmgr.clean() } -func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) { +func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { for sp := ""; ; sp += " " { txid := xid + sp uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. @@ -133,7 +133,7 @@ func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64 } // No current id exists. Create one. - uid, err := allocateUniqueUid(xid, mod, numInst) + uid, err := allocateUniqueUid(xid, instanceIdx, numInst) if err != nil { return 0, err } @@ -173,11 +173,11 @@ func stringKey(xid string) []byte { return buf.Bytes() } -func GetOrAssign(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) { +func GetOrAssign(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { key := stringKey(xid) pl := posting.GetOrCreate(key) if pl.Length() == 0 { - return assignNew(pl, xid, mod, numInst) + return assignNew(pl, xid, instanceIdx, numInst) } else if pl.Length() > 1 { glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)