Skip to content
Snippets Groups Projects
Commit aafb012a authored by Ashwin's avatar Ashwin
Browse files

Changes to limit line length to 80 chars

parent d56ec522
No related branches found
No related tags found
No related merge requests found
...@@ -197,7 +197,9 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -197,7 +197,9 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
} }
// Blocking function. // Blocking function.
func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { func HandleRdfReader(reader io.Reader, instanceIdx uint64,
numInstances uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"time" "time"
"github.com/dgraph-io/dgraph/lex" "github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/x"
) )
...@@ -37,23 +36,23 @@ type NQuad struct { ...@@ -37,23 +36,23 @@ type NQuad struct {
Language string Language string
} }
func GetUid(s string, instanceIdx uint64, numInstances uint64, func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) {
rStore *store.Store) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") { if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64) return strconv.ParseUint(s[6:], 0, 64)
} }
return uid.GetOrAssign(s, instanceIdx, numInstances) return uid.GetOrAssign(s, instanceIdx, numInstances)
} }
func (nq NQuad) ToEdge(instanceIdx, numInstances uint64, func (nq NQuad) ToEdge(instanceIdx,
rStore *store.Store) (result x.DirectedEdge, rerr error) { numInstances uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInstances, rStore)
sid, err := GetUid(nq.Subject, instanceIdx, numInstances)
if err != nil { if err != nil {
return result, err return result, err
} }
result.Entity = sid result.Entity = sid
if len(nq.ObjectId) > 0 { if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances, rStore) oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances)
if err != nil { if err != nil {
return result, err return result, err
} }
......
...@@ -43,7 +43,8 @@ var numInstances = flag.Uint64("numInstances", 1, ...@@ -43,7 +43,8 @@ var numInstances = flag.Uint64("numInstances", 1,
var postingDir = flag.String("postings", "", "Directory to store posting lists") var postingDir = flag.String("postings", "", "Directory to store posting lists")
var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists") var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
func main() { func main() {
flag.Parse() flag.Parse()
......
...@@ -24,9 +24,11 @@ var instanceIdx = flag.Uint64("instanceIdx", 0, ...@@ -24,9 +24,11 @@ var instanceIdx = flag.Uint64("instanceIdx", 0,
"Only pick entities, where Fingerprint % numInstance == instanceIdx.") "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, var numInstances = flag.Uint64("numInstances", 1,
"Total number of instances among which uid assigning is shared") "Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var uidDir = flag.String("uidpostings", "",
"Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
func main() { func main() {
flag.Parse() flag.Parse()
...@@ -51,7 +53,7 @@ func main() { ...@@ -51,7 +53,7 @@ func main() {
glog.WithField("instanceIdx", *instanceIdx). glog.WithField("instanceIdx", *instanceIdx).
WithField("numInstances", *numInstances). WithField("numInstances", *numInstances).
Info("Only those XIDs with FP(xid) % numInstance == instanceIdx will be given UID") Info("Only XIDs with FP(xid)%numInstance == instanceIdx will be given UID")
if len(*rdfGzips) == 0 { if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified") glog.Fatal("No RDF GZIP files specified")
...@@ -81,7 +83,8 @@ func main() { ...@@ -81,7 +83,8 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.") glog.WithError(err).Fatal("Unable to create gzip reader.")
} }
count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances) count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx,
*numInstances)
if err != nil { if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.") glog.WithError(err).Fatal("While handling rdf reader.")
} }
......
...@@ -98,11 +98,11 @@ func Init(ps *store.Store) { ...@@ -98,11 +98,11 @@ func Init(ps *store.Store) {
uidStore = ps uidStore = ps
} }
func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { func allocateUniqueUid(xid string, instanceIdx uint64,
numInstances uint64) (uid uint64, rerr error) {
mod := math.MaxUint64 / numInstances mod := math.MaxUint64 / numInstances
minIdx := instanceIdx * mod minIdx := instanceIdx * mod
for sp := ""; ; sp += " " { for sp := ""; ; sp += " " {
txid := xid + sp txid := xid + sp
...@@ -148,6 +148,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid ...@@ -148,6 +148,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid
func assignNew(pl *posting.List, xid string, instanceIdx uint64, func assignNew(pl *posting.List, xid string, instanceIdx uint64,
numInstances uint64) (uint64, error) { numInstances uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid) entry := lmgr.newOrExisting(xid)
entry.Lock() entry.Lock()
entry.ts = time.Now() entry.ts = time.Now()
...@@ -187,7 +188,9 @@ func stringKey(xid string) []byte { ...@@ -187,7 +188,9 @@ func stringKey(xid string) []byte {
return buf.Bytes() return buf.Bytes()
} }
func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { func GetOrAssign(xid string, instanceIdx uint64,
numInstances uint64) (uid uint64, rerr error) {
key := stringKey(xid) key := stringKey(xid)
pl := posting.GetOrCreate(key, uidStore) pl := posting.GetOrCreate(key, uidStore)
if pl.Length() == 0 { if pl.Length() == 0 {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment