Skip to content
Snippets Groups Projects
Commit 9b9d270c authored by Ashwin Ramesh's avatar Ashwin Ramesh
Browse files

Merge pull request #16 from ashwin95r/uidassigner

Make getUid global
parents 90c93200 5501bdd4
No related branches found
No related tags found
No related merge requests found
......@@ -43,10 +43,11 @@ type counters struct {
}
type state struct {
input chan string
cnq chan rdf.NQuad
ctr *counters
mod uint64
input chan string
cnq chan rdf.NQuad
ctr *counters
instanceIdx uint64
numInstances uint64
}
func (s *state) printCounters(ticker *time.Ticker) {
......@@ -121,13 +122,7 @@ func (s *state) parseStream(done chan error) {
func (s *state) handleNQuads(wg *sync.WaitGroup) {
for nq := range s.cnq {
if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 {
// Ignore due to mod sampling.
atomic.AddUint64(&s.ctr.ignored, 1)
continue
}
edge, err := nq.ToEdge()
edge, err := nq.ToEdge(s.instanceIdx, s.numInstances)
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
......@@ -138,7 +133,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge")
return
}
edge, err = nq.ToEdge()
edge, err = nq.ToEdge(s.instanceIdx, s.numInstances)
}
key := posting.Key(edge.Entity, edge.Attribute)
......@@ -149,41 +144,53 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
wg.Done()
}
func (s *state) getUidForString(str string) {
_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances)
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
time.Sleep(time.Microsecond)
glog.WithError(err).WithField("nq.Subject", str).
Error("Temporary error")
} else {
glog.WithError(err).WithField("nq.Subject", str).
Error("While getting UID")
return
}
_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances)
}
}
func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
for nq := range s.cnq {
if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 {
// Ignore due to mod sampling.
if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx {
// This instance shouldnt assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1)
continue
} else {
s.getUidForString(nq.Subject)
}
edge, err := nq.ToEdge()
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
time.Sleep(time.Microsecond)
} else {
glog.WithError(err).WithField("nq", nq).
Error("While converting to edge")
return
}
edge, err = nq.ToEdge()
if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx {
// This instance shouldnt or cant assign UID to this string
atomic.AddUint64(&s.ctr.ignored, 1)
} else {
s.getUidForString(nq.ObjectId)
}
glog.Info(edge);
}
wg.Done()
}
// Blocking function.
func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) {
s := new(state)
s.ctr = new(counters)
ticker := time.NewTicker(time.Second)
go s.printCounters(ticker)
// Producer: Start buffering input to channel.
s.mod = mod
s.instanceIdx = instanceIdx
s.numInstances = numInstances
s.input = make(chan string, 10000)
go s.readLines(reader)
......@@ -217,14 +224,15 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
}
// Blocking function.
func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) {
func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) {
s := new(state)
s.ctr = new(counters)
ticker := time.NewTicker(time.Second)
go s.printCounters(ticker)
// Producer: Start buffering input to channel.
s.mod = mod
s.instanceIdx = instanceIdx
s.numInstances = numInstances
s.input = make(chan string, 10000)
go s.readLines(reader)
......@@ -256,4 +264,3 @@ func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) {
ticker.Stop()
return atomic.LoadUint64(&s.ctr.processed), nil
}
......@@ -255,7 +255,7 @@ func NewGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph,
// and populate the children for attributes.
if len(exid) > 0 {
u, err := uid.GetOrAssign(exid)
u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default
if err != nil {
x.Err(glog, err).WithField("xid", exid).Error(
"While GetOrAssign uid from external id")
......
......@@ -36,21 +36,21 @@ type NQuad struct {
Language string
}
func getUid(s string) (uint64, error) {
func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64)
}
return uid.GetOrAssign(s)
return uid.GetOrAssign(s, instanceIdx, numInstances)
}
func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
sid, err := getUid(nq.Subject)
func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInstances)
if err != nil {
return result, err
}
result.Entity = sid
if len(nq.ObjectId) > 0 {
oid, err := getUid(nq.ObjectId)
oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances)
if err != nil {
return result, err
}
......
......@@ -35,7 +35,8 @@ var glog = x.Log("loader_main")
var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data")
var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
......@@ -86,7 +87,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.")
}
count, err := loader.HandleRdfReader(r, *mod)
count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances)
if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.")
}
......
......@@ -66,7 +66,7 @@ func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) {
return dir1, dir2, clog, err
}
defer f.Close()
_, err = loader.HandleRdfReader(f, 1)
_, err = loader.HandleRdfReader(f, 0, 1)
if err != nil {
return dir1, dir2, clog, err
}
......
package main
import (
"compress/gzip"
"flag"
"os"
"runtime"
"runtime/pprof"
"strings"
"compress/gzip"
"flag"
"os"
"runtime"
"runtime/pprof"
"strings"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)
var glog = x.Log("uidassigner_main")
var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data")
var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
"Comma separated gzip files containing RDF data")
var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
func main() {
flag.Parse()
if !flag.Parsed() {
glog.Fatal("Unable to parse flags")
}
if len(*cpuprofile) > 0 {
f, err := os.Create(*cpuprofile)
if err != nil {
glog.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
flag.Parse()
if !flag.Parsed() {
glog.Fatal("Unable to parse flags")
}
if len(*cpuprofile) > 0 {
f, err := os.Create(*cpuprofile)
if err != nil {
glog.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
logrus.SetLevel(logrus.InfoLevel)
numCpus := *numcpu
......@@ -46,6 +46,10 @@ func main() {
WithField("prev_maxprocs", prevProcs).
Info("Set max procs to num cpus")
glog.WithField("instanceIdx", *instanceIdx).
WithField("numInstances", *numInstances).
Info("Only those XIDs which satisfy FP(xid) % numInstance == instanceIdx will be given UID")
if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified")
}
......@@ -72,7 +76,7 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.")
}
count, err := loader.HandleRdfReaderWhileAssign(r, *mod)
count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances)
if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.")
}
......@@ -83,4 +87,3 @@ func main() {
glog.Info("Calling merge lists")
posting.MergeLists(100 * numCpus) // 100 per core.
}
File added
......@@ -92,10 +92,17 @@ func init() {
// go lmgr.clean()
}
func allocateUniqueUid(xid string) (uid uint64, rerr error) {
func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) {
mod := math.MaxUint64 / numInstances
minIdx := instanceIdx * mod
for sp := ""; ; sp += " " {
txid := xid + sp
uid = farm.Fingerprint64([]byte(txid)) // Generate from hash.
uid1 := farm.Fingerprint64([]byte(txid)) // Generate from hash.
uid = (uid1 % mod) + minIdx
glog.WithField("txid", txid).WithField("uid", uid).Debug("Generated")
if uid == math.MaxUint64 {
glog.Debug("Hit uint64max while generating fingerprint. Ignoring...")
......@@ -133,7 +140,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) {
" Wake the stupid developer up.")
}
func assignNew(pl *posting.List, xid string) (uint64, error) {
func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid)
entry.Lock()
entry.ts = time.Now()
......@@ -151,7 +158,7 @@ func assignNew(pl *posting.List, xid string) (uint64, error) {
}
// No current id exists. Create one.
uid, err := allocateUniqueUid(xid)
uid, err := allocateUniqueUid(xid, instanceIdx, numInstances)
if err != nil {
return 0, err
}
......@@ -173,11 +180,11 @@ func stringKey(xid string) []byte {
return buf.Bytes()
}
func GetOrAssign(xid string) (uid uint64, rerr error) {
func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) {
key := stringKey(xid)
pl := posting.GetOrCreate(key)
if pl.Length() == 0 {
return assignNew(pl, xid)
return assignNew(pl, xid, instanceIdx, numInstances)
} else if pl.Length() > 1 {
glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
......
......@@ -46,7 +46,7 @@ func TestGetOrAssign(t *testing.T) {
var u1, u2 uint64
{
uid, err := GetOrAssign("externalid0")
uid, err := GetOrAssign("externalid0", 0, 1)
if err != nil {
t.Error(err)
}
......@@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) {
}
{
uid, err := GetOrAssign("externalid1")
uid, err := GetOrAssign("externalid1", 0, 1)
if err != nil {
t.Error(err)
}
......@@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) {
// return
{
uid, err := GetOrAssign("externalid0")
uid, err := GetOrAssign("externalid0", 0, 1)
if err != nil {
t.Error(err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment