From 4659b14715ba87e02a3598b62aa6e0b03c1a0783 Mon Sep 17 00:00:00 2001 From: Ashwin <ashwin2007ray@gmail.com> Date: Mon, 15 Feb 2016 16:26:17 +1100 Subject: [PATCH] Renamed some variables for more clarity --- loader/loader.go | 19 ++++++++++--------- server/loader/main.go | 16 ++++++++-------- uid/assigner.go | 13 +++++++------ worker/worker.go | 6 +++--- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 718ae5fd..f8af5781 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -35,7 +35,7 @@ import ( ) var glog = x.Log("loader") -var rStore, rwStore *store.Store +var uidStore, dataStore *store.Store type counters struct { read uint64 @@ -53,8 +53,8 @@ type state struct { } func Init(rs, rws *store.Store) { - rStore = rs - rwStore = rws + uidStore = rs + dataStore = rws } func (s *state) printCounters(ticker *time.Ticker) { @@ -129,7 +129,7 @@ func (s *state) parseStream(done chan error) { func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { - edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, rStore) + edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, uidStore) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -140,13 +140,14 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, rStore) + edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, uidStore) } // Only handle this edge if the attribute satisfies the modulo rule - if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances == s.instanceIdx { + if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances == + s.instanceIdx { key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key, rwStore) + plist := posting.GetOrCreate(key, dataStore) plist.AddMutation(edge, posting.Set) atomic.AddUint64(&s.ctr.processed, 1) } else { @@ -158,7 +159,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { } func (s *state) getUidForString(str string) { - _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, rwStore) + _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -170,7 +171,7 @@ func (s *state) getUidForString(str string) { Error("While getting UID") return } - _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances, rwStore) + _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore) } } diff --git a/server/loader/main.go b/server/loader/main.go index 2edbe972..e47d7571 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -67,17 +67,17 @@ func main() { if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") } - ps := new(store.Store) - ps.Init(*postingDir) - defer ps.Close() + dataStore := new(store.Store) + dataStore.Init(*postingDir) + defer dataStore.Close() - uidS := new(store.Store) - uidS.Init(*uidDir) - defer uidS.Close() + uidStore := new(store.Store) + uidStore.Init(*uidDir) + defer uidStore.Close() posting.Init(nil) - uid.Init(uidS) - loader.Init(uidS, ps) + uid.Init(uidStore) + loader.Init(uidStore, dataStore) files := strings.Split(*rdfGzips, ",") for _, path := range files { diff --git a/uid/assigner.go b/uid/assigner.go index e4ce2737..fad1d574 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -32,7 +32,7 @@ import ( var glog = x.Log("uid") var lmgr *lockManager -var rStore *store.Store +var uidStore *store.Store type entry struct { sync.Mutex @@ -95,7 +95,7 @@ func init() { } func Init(ps *store.Store) { - rStore = ps + uidStore = ps } func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { @@ -117,7 +117,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid // Check if this uid has already been allocated. key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.GetOrCreate(key, rStore) + pl := posting.GetOrCreate(key, uidStore) if pl.Length() > 0 { // Something already present here. @@ -146,7 +146,8 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, + numInstances uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -188,7 +189,7 @@ func stringKey(xid string) []byte { func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { key := stringKey(xid) - pl := posting.GetOrCreate(key, rStore) + pl := posting.GetOrCreate(key, uidStore) if pl.Length() == 0 { return assignNew(pl, xid, instanceIdx, numInstances) @@ -209,7 +210,7 @@ func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint6 func ExternalId(uid uint64) (xid string, rerr error) { key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.GetOrCreate(key, rStore) + pl := posting.GetOrCreate(key, uidStore) if pl.Length() == 0 { return "", errors.New("NO external id") } diff --git a/worker/worker.go b/worker/worker.go index e654b7c8..5538e0fa 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -8,10 +8,10 @@ import ( "github.com/google/flatbuffers/go" ) -var pstore *store.Store +var dataStore *store.Store func Init(ps *store.Store) { - pstore = ps + dataStore = ps } func ProcessTask(query []byte) (result []byte, rerr error) { @@ -27,7 +27,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := posting.Key(uid, attr) - pl := posting.GetOrCreate(key, pstore) + pl := posting.GetOrCreate(key, dataStore) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { -- GitLab