Skip to content
Snippets Groups Projects
Commit 2e5615e6 authored by Ashwin's avatar Ashwin
Browse files

Change arguments of GetUid to take the machine Id and the number of Instances

parent 49c59044
No related branches found
No related tags found
No related merge requests found
...@@ -47,6 +47,7 @@ type state struct { ...@@ -47,6 +47,7 @@ type state struct {
cnq chan rdf.NQuad cnq chan rdf.NQuad
ctr *counters ctr *counters
mod uint64 mod uint64
numInstance uint64
} }
func (s *state) printCounters(ticker *time.Ticker) { func (s *state) printCounters(ticker *time.Ticker) {
...@@ -127,7 +128,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -127,7 +128,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
continue continue
} }
edge, err := nq.ToEdge() edge, err := nq.ToEdge(s.mod, s.numInstance)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -138,7 +139,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -138,7 +139,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge") Error("While converting to edge")
return return
} }
edge, err = nq.ToEdge() edge, err = nq.ToEdge(s.mod, s.numInstance)
} }
key := posting.Key(edge.Entity, edge.Attribute) key := posting.Key(edge.Entity, edge.Attribute)
...@@ -151,12 +152,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -151,12 +152,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
for nq := range s.cnq { for nq := range s.cnq {
if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { if farm.Fingerprint64([]byte(nq.Subject))%s.numInstance != s.mod {
// This instance shouldnt assign UID to this string // This instance shouldnt assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
continue
} else { } else {
_, err := rdf.GetUid(nq.Subject) _, err := rdf.GetUid(nq.Subject, s.mod, s.numInstance)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -166,16 +166,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -166,16 +166,15 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(nq.Subject) _, err = rdf.GetUid(nq.Subject, s.mod, s.numInstance)
} }
} }
if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.mod != 0 { if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstance != s.mod {
// This instance shouldnt or cant assign UID to this string // This instance shouldnt or cant assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
continue
} else { } else {
_, err := rdf.GetUid(nq.ObjectId) _, err := rdf.GetUid(nq.ObjectId, s.mod, s.numInstance)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -185,7 +184,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -185,7 +184,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(nq.ObjectId) _, err = rdf.GetUid(nq.ObjectId, s.mod, s.numInstance)
} }
} }
} }
...@@ -234,7 +233,7 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { ...@@ -234,7 +233,7 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
} }
// Blocking function. // Blocking function.
func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64, numInstance uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
...@@ -242,6 +241,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { ...@@ -242,6 +241,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) {
// Producer: Start buffering input to channel. // Producer: Start buffering input to channel.
s.mod = mod s.mod = mod
s.numInstance = numInstance
s.input = make(chan string, 10000) s.input = make(chan string, 10000)
go s.readLines(reader) go s.readLines(reader)
......
...@@ -255,7 +255,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) { ...@@ -255,7 +255,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph, // This would set the Result field in SubGraph,
// and populate the children for attributes. // and populate the children for attributes.
if len(exid) > 0 { if len(exid) > 0 {
u, err := uid.GetOrAssign(exid) u, err := uid.GetOrAssign(exid, 0, 1) // mod = 0, numInstances = 1 by default
if err != nil { if err != nil {
x.Err(glog, err).WithField("xid", exid).Error( x.Err(glog, err).WithField("xid", exid).Error(
"While GetOrAssign uid from external id") "While GetOrAssign uid from external id")
......
...@@ -36,21 +36,21 @@ type NQuad struct { ...@@ -36,21 +36,21 @@ type NQuad struct {
Language string Language string
} }
func GetUid(s string) (uint64, error) { func GetUid(s string, mod uint64, numInst uint64) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") { if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64) return strconv.ParseUint(s[6:], 0, 64)
} }
return uid.GetOrAssign(s) return uid.GetOrAssign(s, mod, numInst)
} }
func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { func (nq NQuad) ToEdge(mod, numInst uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject) sid, err := GetUid(nq.Subject, mod, numInst)
if err != nil { if err != nil {
return result, err return result, err
} }
result.Entity = sid result.Entity = sid
if len(nq.ObjectId) > 0 { if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId) oid, err := GetUid(nq.ObjectId, mod, numInst)
if err != nil { if err != nil {
return result, err return result, err
} }
......
...@@ -19,7 +19,8 @@ var glog = x.Log("uidassigner_main") ...@@ -19,7 +19,8 @@ var glog = x.Log("uidassigner_main")
var rdfGzips = flag.String("rdfgzips", "", var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data") "Comma separated gzip files containing RDF data")
var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") var mod = flag.Uint64("machineid", 1, "Only pick entities, where uid % mod == 0.")
var numInstance = flag.Uint64("numInstance", 1, "Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
...@@ -72,7 +73,7 @@ func main() { ...@@ -72,7 +73,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.") glog.WithError(err).Fatal("Unable to create gzip reader.")
} }
count, err := loader.HandleRdfReaderWhileAssign(r, *mod) count, err := loader.HandleRdfReaderWhileAssign(r, *mod, *numInstance)
if err != nil { if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.") glog.WithError(err).Fatal("While handling rdf reader.")
} }
......
...@@ -92,7 +92,7 @@ func init() { ...@@ -92,7 +92,7 @@ func init() {
// go lmgr.clean() // go lmgr.clean()
} }
func allocateUniqueUid(xid string) (uid uint64, rerr error) { func allocateUniqueUid(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) {
for sp := ""; ; sp += " " { for sp := ""; ; sp += " " {
txid := xid + sp txid := xid + sp
uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. uid = farm.Fingerprint64([]byte(txid)) // Generate from hash.
...@@ -133,7 +133,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) { ...@@ -133,7 +133,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) {
" Wake the stupid developer up.") " Wake the stupid developer up.")
} }
func assignNew(pl *posting.List, xid string) (uint64, error) { func assignNew(pl *posting.List, xid string, mod uint64, numInst uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid) entry := lmgr.newOrExisting(xid)
entry.Lock() entry.Lock()
entry.ts = time.Now() entry.ts = time.Now()
...@@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string) (uint64, error) { ...@@ -151,7 +151,7 @@ func assignNew(pl *posting.List, xid string) (uint64, error) {
} }
// No current id exists. Create one. // No current id exists. Create one.
uid, err := allocateUniqueUid(xid) uid, err := allocateUniqueUid(xid, mod, numInst)
if err != nil { if err != nil {
return 0, err return 0, err
} }
...@@ -173,11 +173,11 @@ func stringKey(xid string) []byte { ...@@ -173,11 +173,11 @@ func stringKey(xid string) []byte {
return buf.Bytes() return buf.Bytes()
} }
func GetOrAssign(xid string) (uid uint64, rerr error) { func GetOrAssign(xid string, mod uint64, numInst uint64) (uid uint64, rerr error) {
key := stringKey(xid) key := stringKey(xid)
pl := posting.GetOrCreate(key) pl := posting.GetOrCreate(key)
if pl.Length() == 0 { if pl.Length() == 0 {
return assignNew(pl, xid) return assignNew(pl, xid, mod, numInst)
} else if pl.Length() > 1 { } else if pl.Length() > 1 {
glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
......
...@@ -46,7 +46,7 @@ func TestGetOrAssign(t *testing.T) { ...@@ -46,7 +46,7 @@ func TestGetOrAssign(t *testing.T) {
var u1, u2 uint64 var u1, u2 uint64
{ {
uid, err := GetOrAssign("externalid0") uid, err := GetOrAssign("externalid0", 0, 1)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
...@@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) { ...@@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) {
} }
{ {
uid, err := GetOrAssign("externalid1") uid, err := GetOrAssign("externalid1", 0, 1)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
...@@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) { ...@@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) {
// return // return
{ {
uid, err := GetOrAssign("externalid0") uid, err := GetOrAssign("externalid0", 0, 1)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment