diff --git a/loader/loader.go b/loader/loader.go index fe3869739cdf5f2efd98f0b50455cb96adfafd35..84968e599b42192150ab1077ef20ee799bc3b5cf 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -47,6 +47,7 @@ type state struct { cnq chan rdf.NQuad ctr *counters mod uint64 + numInstance uint64 } func (s *state) printCounters(ticker *time.Ticker) { @@ -127,7 +128,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { continue } - edge, err := nq.ToEdge() + edge, err := nq.ToEdge(s.mod, s.numInstance) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -138,7 +139,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge() + edge, err = nq.ToEdge(s.mod, s.numInstance) } key := posting.Key(edge.Entity, edge.Attribute) @@ -151,12 +152,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { - if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { + if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.mod { // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) - continue } else { - _, err := rdf.GetUid(nq.Subject) + _, err := rdf.GetUid(nq.Subject, s.mod, s.numInstance) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -166,16 +166,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { Error("While getting UID") return } - _, err = rdf.GetUid(nq.Subject) + _, err = rdf.GetUid(nq.Subject, s.mod, s.numInstance) } } - if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.mod != 0 { + if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.mod { // This instance shouldnt or cant assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) - continue } else { - _, err := rdf.GetUid(nq.ObjectId) + _, err := rdf.GetUid(nq.ObjectId, s.mod, s.numInstance) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -185,7 +184,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { Error("While getting UID") return } - _, err = rdf.GetUid(nq.ObjectId) + _, err = rdf.GetUid(nq.ObjectId, s.mod, s.numInstance) } } } @@ -234,7 +233,7 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { +func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64, numInstance uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -242,6 +241,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { // Producer: Start buffering input to channel. s.mod = mod + s.numInstance = numInstance s.input = make(chan string, 10000) go s.readLines(reader) diff --git a/query/query.go b/query/query.go index 2d1ccb2094e6e89c72c2d6fde8e00cf2e5839e32..3ee69a2adc97c2a498620dc099bbe7d0abaa4712 100644 --- a/query/query.go +++ b/query/query.go @@ -255,7 +255,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid) + u, err := uid.GetOrAssign(exid, 0, 1) // mod = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") diff --git a/rdf/parse.go b/rdf/parse.go index 809ef63ef17709a1c2d224fb1cb6ab685b4d572a..58f464d9b84883934bda11b654247e54f0d0c8d5 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,21 +36,21 @@ type NQuad struct { Language string } -func GetUid(s string) (uint64, error) { +func GetUid(s string, mod uint64, numInst uint64) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } - return uid.GetOrAssign(s) + return uid.GetOrAssign(s, mod, numInst) } -func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject) +func (nq NQuad) ToEdge(mod, numInst uint64) (result x.DirectedEdge, rerr error) { + sid, err := GetUid(nq.Subject, mod, numInst) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId) + oid, err := GetUid(nq.ObjectId, mod, numInst) if err != nil { return result, err } diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index c9babee4daae968ab8415aef04c06301459b5dab..422c76424a59679949e3dc896554d8f2f382f5f9 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -19,7 +19,8 @@ var glog = x.Log("uidassigner_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") -var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") +var mod = flag.Uint64("machineid", 1, "Only pick entities, where uid % mod == 0.") +var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -72,7 +73,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReaderWhileAssign(r, *mod) + count, err := loader.HandleRdfReaderWhileAssign(r, *mod, *numInstance) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/uid/assigner.go b/uid/assigner.go index 85ad39500040d0946f048c404130d9849269b85c..8e486e2b3e3004fdf2cb53115469a7d1638fa161 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -92,7 +92,7 @@ func init() { // go lmgr.clean() } -func allocateUniqueUid(xid string) (uid uint64, rerr error) { +func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) { for sp := ""; ; sp += " " { txid := xid + sp uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. @@ -133,7 +133,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) { " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string) (uint64, error) { +func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string) (uint64, error) { } // No current id exists. Create one. - uid, err := allocateUniqueUid(xid) + uid, err := allocateUniqueUid(xid, mod, numInst) if err != nil { return 0, err } @@ -173,11 +173,11 @@ func stringKey(xid string) []byte { return buf.Bytes() } -func GetOrAssign(xid string) (uid uint64, rerr error) { +func GetOrAssign(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) { key := stringKey(xid) pl := posting.GetOrCreate(key) if pl.Length() == 0 { - return assignNew(pl, xid) + return assignNew(pl, xid, mod, numInst) } else if pl.Length() > 1 { glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) diff --git a/uid/assigner_test.go b/uid/assigner_test.go index bf6d733168366bb928bef821140c7dae30c8a3d9..826c88581381a67e47204ff29aa80a0ae654c77c 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -46,7 +46,7 @@ func TestGetOrAssign(t *testing.T) { var u1, u2 uint64 { - uid, err := GetOrAssign("externalid0") + uid, err := GetOrAssign("externalid0", 0, 1) if err != nil { t.Error(err) } @@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) { } { - uid, err := GetOrAssign("externalid1") + uid, err := GetOrAssign("externalid1", 0, 1) if err != nil { t.Error(err) } @@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) { // return { - uid, err := GetOrAssign("externalid0") + uid, err := GetOrAssign("externalid0", 0, 1) if err != nil { t.Error(err) }