Skip to content
Snippets Groups Projects
Commit 8f4cb553 authored by Ashwin's avatar Ashwin
Browse files

Rename mod to instanceIdx

parent 4e7b8434
No related branches found
No related tags found
No related merge requests found
...@@ -46,7 +46,7 @@ type state struct { ...@@ -46,7 +46,7 @@ type state struct {
input chan string input chan string
cnq chan rdf.NQuad cnq chan rdf.NQuad
ctr *counters ctr *counters
mod uint64 instanceIdx uint64
numInstance uint64 numInstance uint64
} }
...@@ -122,7 +122,7 @@ func (s *state) parseStream(done chan error) { ...@@ -122,7 +122,7 @@ func (s *state) parseStream(done chan error) {
func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuads(wg *sync.WaitGroup) {
for nq := range s.cnq { for nq := range s.cnq {
edge, err := nq.ToEdge(s.mod, s.numInstance) edge, err := nq.ToEdge(s.instanceIdx, s.numInstance)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -133,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -133,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge") Error("While converting to edge")
return return
} }
edge, err = nq.ToEdge(s.mod, s.numInstance) edge, err = nq.ToEdge(s.instanceIdx, s.numInstance)
} }
key := posting.Key(edge.Entity, edge.Attribute) key := posting.Key(edge.Entity, edge.Attribute)
...@@ -146,11 +146,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -146,11 +146,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
for nq := range s.cnq { for nq := range s.cnq {
if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.mod { if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.instanceIdx {
// This instance shouldnt assign UID to this string // This instance shouldnt assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
} else { } else {
_, err := rdf.GetUid(nq.Subject, s.mod, s.numInstance) _, err := rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -162,27 +162,27 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -162,27 +162,27 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(nq.Subject, s.mod, s.numInstance) _, err = rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance)
} }
} }
if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.mod { if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.instanceIdx {
// This instance shouldnt or cant assign UID to this string // This instance shouldnt or cant assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
} else { } else {
_, err := rdf.GetUid(nq.ObjectId, s.mod, s.numInstance) _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
time.Sleep(time.Microsecond) time.Sleep(time.Microsecond)
glog.WithError(err).WithField("nq.Subject", nq.Subject). glog.WithError(err).WithField("nq.Subject", nq.Subject).
Error("Temporary error") Error("Temporary error")
} else { } else {
glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId). glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId).
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(nq.ObjectId, s.mod, s.numInstance) _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance)
} }
} }
} }
...@@ -190,14 +190,14 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -190,14 +190,14 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
} }
// Blocking function. // Blocking function.
func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { func HandleRdfReader(reader io.Reader, instanceIdx uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
go s.printCounters(ticker) go s.printCounters(ticker)
// Producer: Start buffering input to channel. // Producer: Start buffering input to channel.
s.mod = mod s.instanceIdx = instanceIdx
s.input = make(chan string, 10000) s.input = make(chan string, 10000)
go s.readLines(reader) go s.readLines(reader)
...@@ -231,14 +231,14 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { ...@@ -231,14 +231,14 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
} }
// Blocking function. // Blocking function.
func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64, numInstance uint64) (uint64, error) { func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
go s.printCounters(ticker) go s.printCounters(ticker)
// Producer: Start buffering input to channel. // Producer: Start buffering input to channel.
s.mod = mod s.instanceIdx = instanceIdx
s.numInstance = numInstance s.numInstance = numInstance
s.input = make(chan string, 10000) s.input = make(chan string, 10000)
go s.readLines(reader) go s.readLines(reader)
......
...@@ -36,21 +36,21 @@ type NQuad struct { ...@@ -36,21 +36,21 @@ type NQuad struct {
Language string Language string
} }
func GetUid(s string, mod uint64, numInst uint64) (uint64, error) { func GetUid(s string, instanceIdx uint64, numInst uint64) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") { if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64) return strconv.ParseUint(s[6:], 0, 64)
} }
return uid.GetOrAssign(s, mod, numInst) return uid.GetOrAssign(s, instanceIdx, numInst)
} }
func (nq NQuad) ToEdge(mod, numInst uint64) (result x.DirectedEdge, rerr error) { func (nq NQuad) ToEdge(instanceIdx, numInst uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, mod, numInst) sid, err := GetUid(nq.Subject, instanceIdx, numInst)
if err != nil { if err != nil {
return result, err return result, err
} }
result.Entity = sid result.Entity = sid
if len(nq.ObjectId) > 0 { if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId, mod, numInst) oid, err := GetUid(nq.ObjectId, instanceIdx, numInst)
if err != nil { if err != nil {
return result, err return result, err
} }
......
...@@ -19,7 +19,7 @@ var glog = x.Log("uidassigner_main") ...@@ -19,7 +19,7 @@ var glog = x.Log("uidassigner_main")
var rdfGzips = flag.String("rdfgzips", "", var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data") "Comma separated gzip files containing RDF data")
var mod = flag.Uint64("machineid", 1, "Only pick entities, where uid % mod == 0.") var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
...@@ -73,7 +73,7 @@ func main() { ...@@ -73,7 +73,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.") glog.WithError(err).Fatal("Unable to create gzip reader.")
} }
count, err := loader.HandleRdfReaderWhileAssign(r, *mod, *numInstance) count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstance)
if err != nil { if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.") glog.WithError(err).Fatal("While handling rdf reader.")
} }
......
...@@ -92,7 +92,7 @@ func init() { ...@@ -92,7 +92,7 @@ func init() {
// go lmgr.clean() // go lmgr.clean()
} }
func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) { func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) {
for sp := ""; ; sp += " " { for sp := ""; ; sp += " " {
txid := xid + sp txid := xid + sp
uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. uid = farm.Fingerprint64([]byte(txid)) // Generate from hash.
...@@ -133,7 +133,7 @@ func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr ...@@ -133,7 +133,7 @@ func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr
" Wake the stupid developer up.") " Wake the stupid developer up.")
} }
func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64, error) { func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid) entry := lmgr.newOrExisting(xid)
entry.Lock() entry.Lock()
entry.ts = time.Now() entry.ts = time.Now()
...@@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64 ...@@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64
} }
// No current id exists. Create one. // No current id exists. Create one.
uid, err := allocateUniqueUid(xid, mod, numInst) uid, err := allocateUniqueUid(xid, instanceIdx, numInst)
if err != nil { if err != nil {
return 0, err return 0, err
} }
...@@ -173,11 +173,11 @@ func stringKey(xid string) []byte { ...@@ -173,11 +173,11 @@ func stringKey(xid string) []byte {
return buf.Bytes() return buf.Bytes()
} }
func GetOrAssign(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) { func GetOrAssign(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) {
key := stringKey(xid) key := stringKey(xid)
pl := posting.GetOrCreate(key) pl := posting.GetOrCreate(key)
if pl.Length() == 0 { if pl.Length() == 0 {
return assignNew(pl, xid, mod, numInst) return assignNew(pl, xid, instanceIdx, numInst)
} else if pl.Length() > 1 { } else if pl.Length() > 1 {
glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment