Skip to content
Snippets Groups Projects
Commit 235c62c7 authored by Ashwin Ramesh's avatar Ashwin Ramesh
Browse files

Merge pull request #30 from ashwin95r/removeStore

Changes to limit line length to 80 chars
parents 100b8030 677e35d9
No related branches found
No related tags found
No related merge requests found
...@@ -129,7 +129,7 @@ func (s *state) parseStream(done chan error) { ...@@ -129,7 +129,7 @@ func (s *state) parseStream(done chan error) {
func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuads(wg *sync.WaitGroup) {
for nq := range s.cnq { for nq := range s.cnq {
edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, uidStore) edge, err := nq.ToEdge(s.instanceIdx, s.numInstances)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -140,7 +140,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -140,7 +140,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge") Error("While converting to edge")
return return
} }
edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, uidStore) edge, err = nq.ToEdge(s.instanceIdx, s.numInstances)
} }
// Only handle this edge if the attribute satisfies the modulo rule // Only handle this edge if the attribute satisfies the modulo rule
...@@ -159,7 +159,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -159,7 +159,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
} }
func (s *state) getUidForString(str string) { func (s *state) getUidForString(str string) {
_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore) _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR { if err == posting.E_TMP_ERROR {
...@@ -171,7 +171,7 @@ func (s *state) getUidForString(str string) { ...@@ -171,7 +171,7 @@ func (s *state) getUidForString(str string) {
Error("While getting UID") Error("While getting UID")
return return
} }
_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore) _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances)
} }
} }
...@@ -197,7 +197,9 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { ...@@ -197,7 +197,9 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
} }
// Blocking function. // Blocking function.
func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { func HandleRdfReader(reader io.Reader, instanceIdx uint64,
numInstances uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"time" "time"
"github.com/dgraph-io/dgraph/lex" "github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/x"
) )
...@@ -37,23 +36,23 @@ type NQuad struct { ...@@ -37,23 +36,23 @@ type NQuad struct {
Language string Language string
} }
func GetUid(s string, instanceIdx uint64, numInstances uint64, func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) {
rStore *store.Store) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") { if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64) return strconv.ParseUint(s[6:], 0, 64)
} }
return uid.GetOrAssign(s, instanceIdx, numInstances) return uid.GetOrAssign(s, instanceIdx, numInstances)
} }
func (nq NQuad) ToEdge(instanceIdx, numInstances uint64, func (nq NQuad) ToEdge(instanceIdx,
rStore *store.Store) (result x.DirectedEdge, rerr error) { numInstances uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInstances, rStore)
sid, err := GetUid(nq.Subject, instanceIdx, numInstances)
if err != nil { if err != nil {
return result, err return result, err
} }
result.Entity = sid result.Entity = sid
if len(nq.ObjectId) > 0 { if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances, rStore) oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances)
if err != nil { if err != nil {
return result, err return result, err
} }
......
...@@ -43,7 +43,8 @@ var numInstances = flag.Uint64("numInstances", 1, ...@@ -43,7 +43,8 @@ var numInstances = flag.Uint64("numInstances", 1,
var postingDir = flag.String("postings", "", "Directory to store posting lists") var postingDir = flag.String("postings", "", "Directory to store posting lists")
var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists") var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
func main() { func main() {
flag.Parse() flag.Parse()
......
...@@ -24,9 +24,11 @@ var instanceIdx = flag.Uint64("instanceIdx", 0, ...@@ -24,9 +24,11 @@ var instanceIdx = flag.Uint64("instanceIdx", 0,
"Only pick entities, where Fingerprint % numInstance == instanceIdx.") "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, var numInstances = flag.Uint64("numInstances", 1,
"Total number of instances among which uid assigning is shared") "Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var uidDir = flag.String("uidpostings", "",
"Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process")
func main() { func main() {
flag.Parse() flag.Parse()
...@@ -51,7 +53,7 @@ func main() { ...@@ -51,7 +53,7 @@ func main() {
glog.WithField("instanceIdx", *instanceIdx). glog.WithField("instanceIdx", *instanceIdx).
WithField("numInstances", *numInstances). WithField("numInstances", *numInstances).
Info("Only those XIDs with FP(xid) % numInstance == instanceIdx will be given UID") Info("Only XIDs with FP(xid)%numInstance == instanceIdx will be given UID")
if len(*rdfGzips) == 0 { if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified") glog.Fatal("No RDF GZIP files specified")
...@@ -81,7 +83,8 @@ func main() { ...@@ -81,7 +83,8 @@ func main() {
glog.WithError(err).Fatal("Unable to create gzip reader.") glog.WithError(err).Fatal("Unable to create gzip reader.")
} }
count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances) count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx,
*numInstances)
if err != nil { if err != nil {
glog.WithError(err).Fatal("While handling rdf reader.") glog.WithError(err).Fatal("While handling rdf reader.")
} }
......
...@@ -41,14 +41,14 @@ func TestQuery(t *testing.T) { ...@@ -41,14 +41,14 @@ func TestQuery(t *testing.T) {
list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"} list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"}
for _, str := range list { for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 0 { if farm.Fingerprint64([]byte(str))%numInstances == 0 {
uid, err := rdf.GetUid(str, 0, numInstances, ps) uid, err := rdf.GetUid(str, 0, numInstances)
if uid < minIdx0 || uid > minIdx0+mod-1 { if uid < minIdx0 || uid > minIdx0+mod-1 {
t.Error("Not the correct UID", err) t.Error("Not the correct UID", err)
} }
t.Logf("Instance-0 Correct UID", str, uid) t.Logf("Instance-0 Correct UID", str, uid)
} else { } else {
uid, err := rdf.GetUid(str, 1, numInstances, ps) uid, err := rdf.GetUid(str, 1, numInstances)
if uid < minIdx1 || uid > minIdx1+mod-1 { if uid < minIdx1 || uid > minIdx1+mod-1 {
t.Error("Not the correct UID", err) t.Error("Not the correct UID", err)
} }
......
...@@ -98,11 +98,11 @@ func Init(ps *store.Store) { ...@@ -98,11 +98,11 @@ func Init(ps *store.Store) {
uidStore = ps uidStore = ps
} }
func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { func allocateUniqueUid(xid string, instanceIdx uint64,
numInstances uint64) (uid uint64, rerr error) {
mod := math.MaxUint64 / numInstances mod := math.MaxUint64 / numInstances
minIdx := instanceIdx * mod minIdx := instanceIdx * mod
for sp := ""; ; sp += " " { for sp := ""; ; sp += " " {
txid := xid + sp txid := xid + sp
...@@ -148,6 +148,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid ...@@ -148,6 +148,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid
func assignNew(pl *posting.List, xid string, instanceIdx uint64, func assignNew(pl *posting.List, xid string, instanceIdx uint64,
numInstances uint64) (uint64, error) { numInstances uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid) entry := lmgr.newOrExisting(xid)
entry.Lock() entry.Lock()
entry.ts = time.Now() entry.ts = time.Now()
...@@ -187,7 +188,9 @@ func stringKey(xid string) []byte { ...@@ -187,7 +188,9 @@ func stringKey(xid string) []byte {
return buf.Bytes() return buf.Bytes()
} }
func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { func GetOrAssign(xid string, instanceIdx uint64,
numInstances uint64) (uid uint64, rerr error) {
key := stringKey(xid) key := stringKey(xid)
pl := posting.GetOrCreate(key, uidStore) pl := posting.GetOrCreate(key, uidStore)
if pl.Length() == 0 { if pl.Length() == 0 {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment