diff --git a/gql/parser.go b/gql/parser.go index ae511c2c1726c83ab092faaa757867498c1471a7..2ba5654f32f31986dc184d7c39768e6f86cf7958 100644 --- a/gql/parser.go +++ b/gql/parser.go @@ -23,6 +23,7 @@ import ( "github.com/dgraph-io/dgraph/lex" "github.com/dgraph-io/dgraph/query" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" ) diff --git a/loader/loader.go b/loader/loader.go index a3953e117f18bbd86f2258d263f0c1def688442d..7af54bb8ef606847b5e4dedd0fff5f993ed004d3 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -29,6 +29,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" ) @@ -120,9 +121,9 @@ func (s *state) parseStream(done chan error) { done <- nil } -func (s *state) handleNQuads(wg *sync.WaitGroup) { +func (s *state) handleNQuads(wg *sync.WaitGroup, rwStore, rStore *store.Store) { for nq := range s.cnq { - edge, err := nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, rStore) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -133,19 +134,19 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, rStore) } key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key) + plist := posting.GetOrCreate(key, rwStore) plist.AddMutation(edge, posting.Set) atomic.AddUint64(&s.ctr.processed, 1) } wg.Done() } -func (s *state) getUidForString(str string) { - _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) +func (s *state) getUidForString(str string, rwStore *store.Store) { + _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, rwStore) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -157,24 +158,24 @@ func (s *state) getUidForString(str string) { Error("While getting UID") return } - _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) + _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances, rwStore) } } -func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { +func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup, rwStore *store.Store) { for nq := range s.cnq { if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx { // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - s.getUidForString(nq.Subject) + s.getUidForString(nq.Subject, rwStore) } if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx { // This instance shouldnt or cant assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) } else { - s.getUidForString(nq.ObjectId) + s.getUidForString(nq.ObjectId, rwStore) } } @@ -182,7 +183,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { } // Blocking function. -func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { +func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64, rwStore, rStore *store.Store) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -204,7 +205,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) wg := new(sync.WaitGroup) for i := 0; i < 3000; i++ { wg.Add(1) - go s.handleNQuads(wg) // NQuads --> Posting list [slow]. + go s.handleNQuads(wg, rwStore, rStore) // NQuads --> Posting list [slow]. } // Block until all parseStream goroutines are finished. @@ -224,7 +225,7 @@ func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { +func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstances uint64, rwStore *store.Store) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -246,7 +247,7 @@ func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, numInstanc wg := new(sync.WaitGroup) for i := 0; i < 3000; i++ { wg.Add(1) - go s.handleNQuadsWhileAssign(wg) //Different compared to HandleRdfReader + go s.handleNQuadsWhileAssign(wg, rwStore) //Different compared to HandleRdfReader } // Block until all parseStream goroutines are finished. diff --git a/posting/lists.go b/posting/lists.go index 2c24bcb325eeaee36c00ea6c9a66e832415de9ab..8f3d7744a121de2ade683d74063c620469009716 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -155,18 +155,16 @@ func checkMemoryUsage() { var stopTheWorld sync.RWMutex var lhmap *gotomic.Hash var dirtymap *gotomic.Hash -var pstore *store.Store var clog *commit.Logger -func Init(posting *store.Store, log *commit.Logger) { +func Init(log *commit.Logger) { lhmap = gotomic.NewHash() dirtymap = gotomic.NewHash() - pstore = posting clog = log go checkMemoryUsage() } -func GetOrCreate(key []byte) *List { +func GetOrCreate(key []byte, pstore *store.Store) *List { stopTheWorld.RLock() defer stopTheWorld.RUnlock() diff --git a/posting/worker.go b/posting/worker.go index bbaa558c475f3c1ffc65e9ac1ae50cb1a4252b86..154028e37b7e43d6aa36facfe200f9d4d7b65b4a 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -1,12 +1,13 @@ package posting import ( + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -func ProcessTask(query []byte) (result []byte, rerr error) { +func ProcessTask(query []byte, pstore *store.Store) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) q.Init(query, uo) @@ -19,7 +20,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := Key(uid, attr) - pl := GetOrCreate(key) + pl := GetOrCreate(key, pstore) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { diff --git a/posting/worker_test.go b/posting/worker_test.go index 2aabe52f05dd08c8c0b3900aae1d4e5517a4250d..65baa20982603c3fe28092f9554fbba7df84533c 100644 --- a/posting/worker_test.go +++ b/posting/worker_test.go @@ -56,32 +56,32 @@ func TestProcessTask(t *testing.T) { clog.Init() defer clog.Close() - Init(ps, clog) + Init(clog) edge := x.DirectedEdge{ ValueId: 23, Source: "author0", Timestamp: time.Now(), } - addEdge(t, edge, GetOrCreate(Key(10, "friend"))) - addEdge(t, edge, GetOrCreate(Key(11, "friend"))) - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(10, "friend"), ps)) + addEdge(t, edge, GetOrCreate(Key(11, "friend"), ps)) + addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) edge.ValueId = 25 - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) edge.ValueId = 26 - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) edge.ValueId = 31 - addEdge(t, edge, GetOrCreate(Key(10, "friend"))) - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(10, "friend"), ps)) + addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) edge.Value = "photon" - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"), ps)) query := NewQuery("friend", []uint64{10, 11, 12}) - result, err := ProcessTask(query) + result, err := ProcessTask(query, ps) if err != nil { t.Error(err) } diff --git a/query/query.go b/query/query.go index 4d2e28fe5d8e352c6fca59296918fc833b84fd58..aa4fd4c4cf1341ca773b5ed5e4452a3fa868f945 100644 --- a/query/query.go +++ b/query/query.go @@ -25,6 +25,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" @@ -251,11 +252,11 @@ func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) { return json.Marshal(r) } -func NewGraph(euid uint64, exid string) (*SubGraph, error) { +func NewGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - u, err := uid.GetOrAssign(exid, 0, 1) // instanceIdx = 0, numInstances = 1 by default + u, err := uid.GetOrAssign(exid, 0, 1, pstore) // instanceIdx = 0, numInstances = 1 by default if err != nil { x.Err(glog, err).WithField("xid", exid).Error( "While GetOrAssign uid from external id") @@ -381,11 +382,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error) { +func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query) + sg.result, err = posting.ProcessTask(sg.query, pstore) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err @@ -424,7 +425,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan) + go ProcessGraph(child, childchan, pstore) } // Now get all the results back. diff --git a/rdf/parse.go b/rdf/parse.go index d56471ac5de95cc82fb460be0237fbb80152a855..7788553de64b8186575c0417d95336e0111904d6 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -23,6 +23,7 @@ import ( "time" "github.com/dgraph-io/dgraph/lex" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" ) @@ -36,21 +37,21 @@ type NQuad struct { Language string } -func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) { +func GetUid(s string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } - return uid.GetOrAssign(s, instanceIdx, numInstances) + return uid.GetOrAssign(s, instanceIdx, numInstances, rStore) } -func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject, instanceIdx, numInstances) +func (nq NQuad) ToEdge(instanceIdx, numInstances uint64, rStore *store.Store) (result x.DirectedEdge, rerr error) { + sid, err := GetUid(nq.Subject, instanceIdx, numInstances, rStore) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances) + oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances, rStore) if err != nil { return result, err } diff --git a/server/loader/main.go b/server/loader/main.go index 3871936f01cec1d86e0d9ab4d49169563d1b4607..9077416d867123be87203fb571c703191dd66680 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -38,6 +38,7 @@ var rdfGzips = flag.String("rdfgzips", "", var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") var postingDir = flag.String("postings", "", "Directory to store posting lists") +var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -69,7 +70,11 @@ func main() { ps.Init(*postingDir) defer ps.Close() - posting.Init(ps, nil) + uidS := new(store.Store) + uidS.Init(*uidDir) + defer uidS.Close() + + posting.Init(nil) files := strings.Split(*rdfGzips, ",") for _, path := range files { @@ -87,7 +92,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances) + count, err := loader.HandleRdfReader(r, *instanceIdx, *numInstances, ps, uidS) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/main.go b/server/main.go index f9c692ee4f41dbafe505945e956f08331bca1f64..2089271cc17ab9ac6a6c57792010a7edd48303f3 100644 --- a/server/main.go +++ b/server/main.go @@ -110,7 +110,7 @@ func main() { glog.Fatal("Unable to parse flags") } logrus.SetLevel(logrus.InfoLevel) - numCpus := *numcpu + numCpus := *numcpu prev := runtime.GOMAXPROCS(numCpus) glog.WithField("num_cpu", numCpus). WithField("prev_maxprocs", prev). @@ -125,7 +125,7 @@ func main() { clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) http.HandleFunc("/query", queryHandler) glog.WithField("port", *port).Info("Listening for requests...") diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index 3d307b99892b064fb8ec3d8c14fcf161cb424ebf..b7d16f9149cfa7d918b0355454f6e07510c7f670 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -58,7 +58,7 @@ func main() { ps.Init(*uidDir) defer ps.Close() - posting.Init(ps, nil) + posting.Init(nil) files := strings.Split(*rdfGzips, ",") for _, path := range files { @@ -76,7 +76,7 @@ func main() { glog.WithError(err).Fatal("Unable to create gzip reader.") } - count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances) + count, err := loader.HandleRdfReaderWhileAssign(r, *instanceIdx, *numInstances, ps) if err != nil { glog.WithError(err).Fatal("While handling rdf reader.") } diff --git a/server/uidassigner/main_test.go b/server/uidassigner/main_test.go index 7c43abaa53cb843e8dc0e9ded0a9045d1310582a..d6ca54de607e04533adb771b0762a4002b951219 100644 --- a/server/uidassigner/main_test.go +++ b/server/uidassigner/main_test.go @@ -33,19 +33,19 @@ func TestQuery(t *testing.T) { clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"} for _, str := range list { if farm.Fingerprint64([]byte(str))%numInstances == 0 { - uid, err := rdf.GetUid(str, 0, numInstances) + uid, err := rdf.GetUid(str, 0, numInstances, ps) if uid < minIdx0 || uid > minIdx0+mod-1 { t.Error("Not the correct UID", err) } t.Logf("Instance-0 Correct UID", str, uid) } else { - uid, err := rdf.GetUid(str, 1, numInstances) + uid, err := rdf.GetUid(str, 1, numInstances, ps) if uid < minIdx1 || uid > minIdx1+mod-1 { t.Error("Not the correct UID", err) } diff --git a/uid/assigner.go b/uid/assigner.go index 8fb105b07cea8383738513426f40b0838c239f30..81a5db389ced52db308d54ed44db0857b8b2c05e 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -25,6 +25,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting/types" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" ) @@ -92,7 +93,7 @@ func init() { // go lmgr.clean() } -func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { +func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uid uint64, rerr error) { mod := math.MaxUint64 / numInstances minIdx := instanceIdx * mod @@ -111,7 +112,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid // Check if this uid has already been allocated. key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.GetOrCreate(key) + pl := posting.GetOrCreate(key, rStore) if pl.Length() > 0 { // Something already present here. @@ -140,7 +141,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -158,7 +159,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances ui } // No current id exists. Create one. - uid, err := allocateUniqueUid(xid, instanceIdx, numInstances) + uid, err := allocateUniqueUid(xid, instanceIdx, numInstances, rStore) if err != nil { return 0, err } @@ -180,11 +181,11 @@ func stringKey(xid string) []byte { return buf.Bytes() } -func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { +func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64, rStore *store.Store) (uid uint64, rerr error) { key := stringKey(xid) - pl := posting.GetOrCreate(key) + pl := posting.GetOrCreate(key, rStore) if pl.Length() == 0 { - return assignNew(pl, xid, instanceIdx, numInstances) + return assignNew(pl, xid, instanceIdx, numInstances, rStore) } else if pl.Length() > 1 { glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) @@ -201,9 +202,9 @@ func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint6 " Wake the stupid developer up.") } -func ExternalId(uid uint64) (xid string, rerr error) { +func ExternalId(uid uint64, rStore *store.Store) (xid string, rerr error) { key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.GetOrCreate(key) + pl := posting.GetOrCreate(key, rStore) if pl.Length() == 0 { return "", errors.New("NO external id") } diff --git a/uid/assigner_test.go b/uid/assigner_test.go index 826c88581381a67e47204ff29aa80a0ae654c77c..c083cb6ed99a403f71e8bed606cb293948a225d4 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -42,11 +42,11 @@ func TestGetOrAssign(t *testing.T) { clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) var u1, u2 uint64 { - uid, err := GetOrAssign("externalid0", 0, 1) + uid, err := GetOrAssign("externalid0", 0, 1, ps) if err != nil { t.Error(err) } @@ -55,7 +55,7 @@ func TestGetOrAssign(t *testing.T) { } { - uid, err := GetOrAssign("externalid1", 0, 1) + uid, err := GetOrAssign("externalid1", 0, 1, ps) if err != nil { t.Error(err) } @@ -69,7 +69,7 @@ func TestGetOrAssign(t *testing.T) { // return { - uid, err := GetOrAssign("externalid0", 0, 1) + uid, err := GetOrAssign("externalid0", 0, 1, ps) if err != nil { t.Error(err) } @@ -81,7 +81,7 @@ func TestGetOrAssign(t *testing.T) { // return { - xid, err := ExternalId(u1) + xid, err := ExternalId(u1, ps) if err != nil { t.Error(err) } @@ -91,7 +91,7 @@ func TestGetOrAssign(t *testing.T) { } return { - xid, err := ExternalId(u2) + xid, err := ExternalId(u2, ps) if err != nil { t.Error(err) }