Skip to content
Snippets Groups Projects
Commit a44f7047 authored by Ashwin's avatar Ashwin
Browse files

Rename numInstance

parent 7dae8675
Branches
Tags
No related merge requests found
...@@ -43,11 +43,11 @@ type counters struct { ...@@ -43,11 +43,11 @@ type counters struct {
} }
type state struct { type state struct {
input chan string input chan string
cnq chan rdf.NQuad cnq chan rdf.NQuad
ctr *counters ctr *counters
instanceIdx uint64 instanceIdx uint64
numInstance uint64 numInstances uint64
} }
func (s *state) printCounters(ticker *time.Ticker) { func (s *state) printCounters(ticker *time.Ticker) {
...@@ -122,7 +122,7 @@ func (s *state) parseStream(done chan error) { ...@@ -122,7 +122,7 @@ func (s *state) parseStream(done chan error) {
func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuads(wg *sync.WaitGroup) {
for nq := range s.cnq { for nq := range s.cnq {
edge, err := nq.ToEdge(s.instanceIdx, s.numInstance) edge, err := nq.ToEdge(s.instanceIdx, s.numInstances)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -133,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -133,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge") Error("While converting to edge")
return return
} }
edge, err = nq.ToEdge(s.instanceIdx, s.numInstance) edge, err = nq.ToEdge(s.instanceIdx, s.numInstances)
} }
key := posting.Key(edge.Entity, edge.Attribute) key := posting.Key(edge.Entity, edge.Attribute)
...@@ -146,11 +146,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -146,11 +146,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
for nq := range s.cnq { for nq := range s.cnq {
if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.instanceIdx { if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx {
// This instance shouldnt assign UID to this string // This instance shouldnt assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
} else { } else {
_, err := rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance) _, err := rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstances)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -162,15 +162,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -162,15 +162,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstance) _, err = rdf.GetUid(nq.Subject, s.instanceIdx, s.numInstances)
} }
} }
if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.instanceIdx { if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx {
// This instance shouldnt or cant assign UID to this string // This instance shouldnt or cant assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
} else { } else {
_, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) _, err := rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstances)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -182,7 +182,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -182,7 +182,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstance) _, err = rdf.GetUid(nq.ObjectId, s.instanceIdx, s.numInstances)
} }
} }
} }
...@@ -190,7 +190,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -190,7 +190,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
} }
// Blocking function. // Blocking function.
func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) { func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
...@@ -198,7 +198,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) ( ...@@ -198,7 +198,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) (
// Producer: Start buffering input to channel. // Producer: Start buffering input to channel.
s.instanceIdx = instanceIdx s.instanceIdx = instanceIdx
s.numInstance = numInstance s.numInstances = numInstances
s.input = make(chan string, 10000) s.input = make(chan string, 10000)
go s.readLines(reader) go s.readLines(reader)
...@@ -232,7 +232,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) ( ...@@ -232,7 +232,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstance uint64) (
} }
// Blocking function. // Blocking function.
func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstance uint64) (uint64, error) { func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
...@@ -240,7 +240,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstanc ...@@ -240,7 +240,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstanc
// Producer: Start buffering input to channel. // Producer: Start buffering input to channel.
s.instanceIdx = instanceIdx s.instanceIdx = instanceIdx
s.numInstance = numInstance s.numInstances = numInstances
s.input = make(chan string, 10000) s.input = make(chan string, 10000)
go s.readLines(reader) go s.readLines(reader)
......
...@@ -36,21 +36,21 @@ type NQuad struct { ...@@ -36,21 +36,21 @@ type NQuad struct {
Language string Language string
} }
func GetUid(s string, instanceIdx uint64, numInst uint64) (uint64, error) { func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") { if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64) return strconv.ParseUint(s[6:], 0, 64)
} }
return uid.GetOrAssign(s, instanceIdx, numInst) return uid.GetOrAssign(s, instanceIdx, numInstances)
} }
func (nq NQuad) ToEdge(instanceIdx, numInst uint64) (result x.DirectedEdge, rerr error) { func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInst) sid, err := GetUid(nq.Subject, instanceIdx, numInstances)
if err != nil { if err != nil {
return result, err return result, err
} }
result.Entity = sid result.Entity = sid
if len(nq.ObjectId) > 0 { if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId, instanceIdx, numInst) oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances)
if err != nil { if err != nil {
return result, err return result, err
} }
......
...@@ -36,7 +36,7 @@ var glog = x.Log("loader_main") ...@@ -36,7 +36,7 @@ var glog = x.Log("loader_main")
var rdfGzips = flag.String("rdfgzips", "", var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data") "Comma separated gzip files containing RDF data")
var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared")
var postingDir = flag.String("postings", "", "Directory to store posting lists") var postingDir = flag.String("postings", "", "Directory to store posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
...@@ -87,7 +87,7 @@ func main() { ...@@ -87,7 +87,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.") glog.WithError(err).Fatal("Unable to create gzip reader.")
} }
count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstance) count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances)
if err != nil { if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.") glog.WithError(err).Fatal("While handling rdf reader.")
} }
......
...@@ -20,7 +20,7 @@ var glog = x.Log("uidassigner_main") ...@@ -20,7 +20,7 @@ var glog = x.Log("uidassigner_main")
var rdfGzips = flag.String("rdfgzips", "", var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data") "Comma separated gzip files containing RDF data")
var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
...@@ -72,7 +72,7 @@ func main() { ...@@ -72,7 +72,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.") glog.WithError(err).Fatal("Unable to create gzip reader.")
} }
count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstance) count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances)
if err != nil { if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.") glog.WithError(err).Fatal("While handling rdf reader.")
} }
......
...@@ -92,10 +92,10 @@ func init() { ...@@ -92,10 +92,10 @@ func init() {
// go lmgr.clean() // go lmgr.clean()
} }
func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) {
minIdx := instanceIdx * math.MaxUint64 / numInst minIdx := instanceIdx * math.MaxUint64 / numInstances
mod := math.MaxUint64 / numInst mod := math.MaxUint64 / numInstances
for sp := ""; ; sp += " " { for sp := ""; ; sp += " " {
txid := xid + sp txid := xid + sp
...@@ -140,7 +140,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint ...@@ -140,7 +140,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInst uint64) (uid uint
" Wake the stupid developer up.") " Wake the stupid developer up.")
} }
func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64) (uint64, error) { func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid) entry := lmgr.newOrExisting(xid)
entry.Lock() entry.Lock()
entry.ts = time.Now() entry.ts = time.Now()
...@@ -158,7 +158,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64) ...@@ -158,7 +158,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInst uint64)
} }
// No current id exists. Create one. // No current id exists. Create one.
uid, err := allocateUniqueUid(xid, instanceIdx, numInst) uid, err := allocateUniqueUid(xid, instanceIdx, numInstances)
if err != nil { if err != nil {
return 0, err return 0, err
} }
...@@ -180,11 +180,11 @@ func stringKey(xid string) []byte { ...@@ -180,11 +180,11 @@ func stringKey(xid string) []byte {
return buf.Bytes() return buf.Bytes()
} }
func GetOrAssign(xid string, instanceIdx uint64, numInst uint64) (uid uint64, rerr error) { func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) {
key := stringKey(xid) key := stringKey(xid)
pl := posting.GetOrCreate(key) pl := posting.GetOrCreate(key)
if pl.Length() == 0 { if pl.Length() == 0 {
return assignNew(pl, xid, instanceIdx, numInst) return assignNew(pl, xid, instanceIdx, numInstances)
} else if pl.Length() > 1 { } else if pl.Length() > 1 {
glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment