diff --git a/loader/loader.go b/loader/loader.go index 305ce6e54743dd9e25e5c34ed66ba6e277295f79..48b0776a7733d546ab2b62cb0c36d8d808814c26 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -29,11 +29,13 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" ) var glog = x.Log("loader") +var uidStore, dataStore *store.Store type counters struct { read uint64 @@ -50,6 +52,11 @@ type state struct { numInstances uint64 } +func Init(uidstore, datastore *store.Store) { + uidStore = uidstore + dataStore = datastore +} + func (s *state) printCounters(ticker *time.Ticker) { var prev uint64 for _ = range ticker.C { @@ -122,7 +129,7 @@ func (s *state) parseStream(done chan error) { func (s *state) handleNQuads(wg *sync.WaitGroup) { for nq := range s.cnq { - edge, err := nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, uidStore) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -133,19 +140,26 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { Error("While converting to edge") return } - edge, err = nq.ToEdge(s.instanceIdx, s.numInstances) + edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, uidStore) + } + + // Only handle this edge if the attribute satisfies the modulo rule + if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances == + s.instanceIdx { + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.GetOrCreate(key, dataStore) + plist.AddMutation(edge, posting.Set) + atomic.AddUint64(&s.ctr.processed, 1) + } else { + atomic.AddUint64(&s.ctr.ignored, 1) } - key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key) - plist.AddMutation(edge, posting.Set) - atomic.AddUint64(&s.ctr.processed, 1) } wg.Done() } func (s *state) getUidForString(str string) { - _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) + _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore) for err != nil { // Just put in a retry loop to tackle temporary errors. if err == posting.E_TMP_ERROR { @@ -157,7 +171,7 @@ func (s *state) getUidForString(str string) { Error("While getting UID") return } - _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) + _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore) } } @@ -183,9 +197,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { } // Blocking function. -func HandleRdfReader(reader io.Reader, - instanceIdx uint64, numInstances uint64) (uint64, error) { - +func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) @@ -227,9 +239,8 @@ func HandleRdfReader(reader io.Reader, } // Blocking function. -func HandleRdfReaderWhileAssign(reader io.Reader, - instanceIdx uint64, numInstances uint64) (uint64, error) { - +func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64, + numInstances uint64) (uint64, error) { s := new(state) s.ctr = new(counters) ticker := time.NewTicker(time.Second) diff --git a/posting/lists.go b/posting/lists.go index 2c24bcb325eeaee36c00ea6c9a66e832415de9ab..8f3d7744a121de2ade683d74063c620469009716 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -155,18 +155,16 @@ func checkMemoryUsage() { var stopTheWorld sync.RWMutex var lhmap *gotomic.Hash var dirtymap *gotomic.Hash -var pstore *store.Store var clog *commit.Logger -func Init(posting *store.Store, log *commit.Logger) { +func Init(log *commit.Logger) { lhmap = gotomic.NewHash() dirtymap = gotomic.NewHash() - pstore = posting clog = log go checkMemoryUsage() } -func GetOrCreate(key []byte) *List { +func GetOrCreate(key []byte, pstore *store.Store) *List { stopTheWorld.RLock() defer stopTheWorld.RUnlock() diff --git a/query/query.go b/query/query.go index 6d076f03ecdfe2c4c8c46b2ff52e66ca2c7dfff9..a2a2d981744dc24c93922691c972fa0dba309ee6 100644 --- a/query/query.go +++ b/query/query.go @@ -26,8 +26,10 @@ import ( "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -261,8 +263,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) { } } -func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { - sg, err := newGraph(gq.UID, gq.XID) +func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) { + sg, err := newGraph(gq.UID, gq.XID, pstore) if err != nil { return nil, err } @@ -270,7 +272,7 @@ func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { return sg, nil } -func newGraph(euid uint64, exid string) (*SubGraph, error) { +func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { @@ -400,11 +402,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error) { +func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) { var err error if len(sg.query) > 0 && sg.Attr != "_root_" { // This task execution would go over the wire in later versions. - sg.result, err = posting.ProcessTask(sg.query) + sg.result, err = worker.ProcessTask(sg.query) if err != nil { x.Err(glog, err).Error("While processing task.") rch <- err @@ -443,7 +445,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.query = createTaskQuery(child.Attr, sorted) - go ProcessGraph(child, childchan) + go ProcessGraph(child, childchan, pstore) } // Now get all the results back. diff --git a/query/query_test.go b/query/query_test.go index 44bbbd07c93bf4403f9ee393bdb913b7f6dcf999..136f171b9a2659f9c799d1155202b84411f1a62e 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -88,11 +89,22 @@ func checkSingleValue(t *testing.T, child *SubGraph, func TestNewGraph(t *testing.T) { var ex uint64 ex = 101 - sg, err := newGraph(ex, "") + + dir, err := ioutil.TempDir("", "storetest_") if err != nil { t.Error(err) + return } + ps := new(store.Store) + ps.Init(dir) + sg, err := newGraph(ex, "", ps) + if err != nil { + t.Error(err) + } + + worker.Init(ps) + uo := flatbuffers.GetUOffsetT(sg.result) r := new(task.Result) r.Init(sg.result, uo) @@ -111,20 +123,22 @@ func TestNewGraph(t *testing.T) { } } -func populateGraph(t *testing.T) string { +func populateGraph(t *testing.T) (string, *store.Store) { // logrus.SetLevel(logrus.DebugLevel) dir, err := ioutil.TempDir("", "storetest_") if err != nil { t.Error(err) - return "" + return "", nil } ps := new(store.Store) ps.Init(dir) + worker.Init(ps) + clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() - posting.Init(ps, clog) + posting.Init(clog) // So, user we're interested in has uid: 1. // She has 4 friends: 23, 24, 25, 31, and 101 @@ -133,48 +147,48 @@ func populateGraph(t *testing.T) string { Source: "testing", Timestamp: time.Now(), } - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 24 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 25 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 31 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) edge.ValueId = 101 - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps)) // Now let's add a few properties for the main user. edge.Value = "Michonne" - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"), ps)) edge.Value = "female" - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"), ps)) edge.Value = "alive" - addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"), ps)) // Now let's add a name for each of the friends, except 101. edge.Value = "Rick Grimes" - addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"), ps)) edge.Value = "Glenn Rhee" - addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"), ps)) edge.Value = "Daryl Dixon" - addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"), ps)) edge.Value = "Andrea" - addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"), ps)) - return dir + return dir, ps } func TestProcessGraph(t *testing.T) { - dir := populateGraph(t) + dir, ps := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -194,13 +208,13 @@ func TestProcessGraph(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + sg, err := ToSubGraph(gq, ps) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch) + go ProcessGraph(sg, ch, ps) err = <-ch if err != nil { t.Error(err) @@ -265,7 +279,7 @@ func TestProcessGraph(t *testing.T) { } func TestToJson(t *testing.T) { - dir := populateGraph(t) + dir, ps := populateGraph(t) defer os.RemoveAll(dir) // Alright. Now we have everything set up. Let's create the query. @@ -286,13 +300,13 @@ func TestToJson(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + sg, err := ToSubGraph(gq, ps) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch) + go ProcessGraph(sg, ch, ps) err = <-ch if err != nil { t.Error(err) diff --git a/rdf/parse.go b/rdf/parse.go index d56471ac5de95cc82fb460be0237fbb80152a855..8d59bb3447e294548ff84f5de050961cd7d81651 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -23,6 +23,7 @@ import ( "time" "github.com/dgraph-io/dgraph/lex" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" ) @@ -36,21 +37,23 @@ type NQuad struct { Language string } -func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) { +func GetUid(s string, instanceIdx uint64, numInstances uint64, + rStore *store.Store) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } return uid.GetOrAssign(s, instanceIdx, numInstances) } -func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) { - sid, err := GetUid(nq.Subject, instanceIdx, numInstances) +func (nq NQuad) ToEdge(instanceIdx, numInstances uint64, + rStore *store.Store) (result x.DirectedEdge, rerr error) { + sid, err := GetUid(nq.Subject, instanceIdx, numInstances, rStore) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances) + oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances, rStore) if err != nil { return result, err } @@ -103,7 +106,8 @@ func Parse(line string) (rnq NQuad, rerr error) { // TODO: Strictly parse common types like integers, floats etc. if len(oval) == 0 { glog.Fatalf( - "itemObject should be emitted before itemObjectType. Input: %q", line) + "itemObject should be emitted before itemObjectType. Input: %q", + line) } oval += "@@" + stripBracketsIfPresent(item.Val) } diff --git a/server/loader/main.go b/server/loader/main.go index 3871936f01cec1d86e0d9ab4d49169563d1b4607..8f6bcf7cc4ae3a4e5b54754c92f39a3a267b5a5b 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/loader" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" ) @@ -35,9 +36,12 @@ var glog = x.Log("loader_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") -var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") -var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") +var instanceIdx = flag.Uint64("instanceIdx", 0, + "Only pick entities, where Fingerprint % numInstance == instanceIdx.") +var numInstances = flag.Uint64("numInstances", 1, + "Total number of instances among which uid assigning is shared") var postingDir = flag.String("postings", "", "Directory to store posting lists") +var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -65,11 +69,17 @@ func main() { if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") } - ps := new(store.Store) - ps.Init(*postingDir) - defer ps.Close() + dataStore := new(store.Store) + dataStore.Init(*postingDir) + defer dataStore.Close() - posting.Init(ps, nil) + uidStore := new(store.Store) + uidStore.Init(*uidDir) + defer uidStore.Close() + + posting.Init(nil) + uid.Init(uidStore) + loader.Init(uidStore, dataStore) files := strings.Split(*rdfGzips, ",") for _, path := range files { diff --git a/server/main.go b/server/main.go index 11c240aa6ed1d08e2132f0e86ab63faac5e92bd4..0e62178e48cc0444c42453254847c02eadc87bd1 100644 --- a/server/main.go +++ b/server/main.go @@ -30,6 +30,8 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" ) @@ -51,67 +53,69 @@ func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Connection", "close") } -func queryHandler(w http.ResponseWriter, r *http.Request) { - addCorsHeaders(w) - if r.Method == "OPTIONS" { - return +func queryHandler(ps *store.Store) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + addCorsHeaders(w) + if r.Method == "OPTIONS" { + return + } + if r.Method != "POST" { + x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") + return + } + + var l query.Latency + l.Start = time.Now() + defer r.Body.Close() + q, err := ioutil.ReadAll(r.Body) + if err != nil || len(q) == 0 { + x.Err(glog, err).Error("While reading query") + x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") + return + } + + glog.WithField("q", string(q)).Debug("Query received.") + gq, err := gql.Parse(string(q)) + if err != nil { + x.Err(glog, err).Error("While parsing query") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + sg, err := query.ToSubGraph(gq, ps) + if err != nil { + x.Err(glog, err).Error("While conversion to internal format") + x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) + return + } + l.Parsing = time.Since(l.Start) + glog.WithField("q", string(q)).Debug("Query parsed.") + + rch := make(chan error) + go query.ProcessGraph(sg, rch, ps) + err = <-rch + if err != nil { + x.Err(glog, err).Error("While executing query") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + l.Processing = time.Since(l.Start) - l.Parsing + glog.WithField("q", string(q)).Debug("Graph processed.") + js, err := sg.ToJson(&l) + if err != nil { + x.Err(glog, err).Error("While converting to Json.") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + glog.WithFields(logrus.Fields{ + "total": time.Since(l.Start), + "parsing": l.Parsing, + "process": l.Processing, + "json": l.Json, + }).Info("Query Latencies") + + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, string(js)) } - if r.Method != "POST" { - x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method") - return - } - - var l query.Latency - l.Start = time.Now() - defer r.Body.Close() - q, err := ioutil.ReadAll(r.Body) - if err != nil || len(q) == 0 { - x.Err(glog, err).Error("While reading query") - x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") - return - } - - glog.WithField("q", string(q)).Debug("Query received.") - gq, _, err := gql.Parse(string(q)) - if err != nil { - x.Err(glog, err).Error("While parsing query") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - sg, err := query.ToSubGraph(gq) - if err != nil { - x.Err(glog, err).Error("While conversion to internal format") - x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) - return - } - l.Parsing = time.Since(l.Start) - glog.WithField("q", string(q)).Debug("Query parsed.") - - rch := make(chan error) - go query.ProcessGraph(sg, rch) - err = <-rch - if err != nil { - x.Err(glog, err).Error("While executing query") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - l.Processing = time.Since(l.Start) - l.Parsing - glog.WithField("q", string(q)).Debug("Graph processed.") - js, err := sg.ToJson(&l) - if err != nil { - x.Err(glog, err).Error("While converting to Json.") - x.SetStatus(w, x.E_ERROR, err.Error()) - return - } - glog.WithFields(logrus.Fields{ - "total": time.Since(l.Start), - "parsing": l.Parsing, - "process": l.Processing, - "json": l.Json, - }).Info("Query Latencies") - - w.Header().Set("Content-Type", "application/json") - fmt.Fprint(w, string(js)) } func main() { @@ -135,9 +139,11 @@ func main() { clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) + worker.Init(ps) + uid.Init(ps) - http.HandleFunc("/query", queryHandler) + http.HandleFunc("/query", queryHandler(ps)) glog.WithField("port", *port).Info("Listening for requests...") if err := http.ListenAndServe(":"+*port, nil); err != nil { x.Err(glog, err).Fatal("ListenAndServe") diff --git a/server/main_test.go b/server/main_test.go index 010195130e0932fbebc2c57546c2476276643fe9..2d13e5515fb16f5a02b3c3677e83d16d7f104241 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -28,6 +28,8 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/worker" ) var q0 = ` @@ -43,34 +45,37 @@ var q0 = ` } ` -func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) { +func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr error) { var err error dir1, err = ioutil.TempDir("", "storetest_") if err != nil { - return "", "", nil, err + return "", "", nil, nil, err } - ps := new(store.Store) + ps = new(store.Store) ps.Init(dir1) dir2, err = ioutil.TempDir("", "storemuts_") if err != nil { - return dir1, "", nil, err + return dir1, "", nil, nil, err } clog = commit.NewLogger(dir2, "mutations", 50<<20) clog.Init() - posting.Init(ps, clog) + posting.Init(clog) + worker.Init(ps) + uid.Init(ps) + loader.Init(ps, ps) f, err := os.Open("testdata.nq") if err != nil { - return dir1, dir2, clog, err + return dir1, dir2, nil, clog, err } defer f.Close() _, err = loader.HandleRdfReader(f, 0, 1) if err != nil { - return dir1, dir2, clog, err + return dir1, dir2, nil, clog, err } - return dir1, dir2, clog, nil + return dir1, dir2, ps, clog, nil } func closeAll(dir1, dir2 string, clog *commit.Logger) { @@ -80,7 +85,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) { } func TestQuery(t *testing.T) { - dir1, dir2, clog, err := prepare() + dir1, dir2, ps, clog, err := prepare() if err != nil { t.Error(err) return @@ -93,7 +98,7 @@ func TestQuery(t *testing.T) { t.Error(err) return } - g, err := query.ToSubGraph(gq) + g, err := query.ToSubGraph(gq, ps) if err != nil { t.Error(err) return @@ -137,7 +142,7 @@ func TestQuery(t *testing.T) { } ch := make(chan error) - go query.ProcessGraph(g, ch) + go query.ProcessGraph(g, ch, ps) if err := <-ch; err != nil { t.Error(err) return @@ -175,7 +180,7 @@ var q1 = ` ` func BenchmarkQuery(b *testing.B) { - dir1, dir2, clog, err := prepare() + dir1, dir2, ps, clog, err := prepare() if err != nil { b.Error(err) return @@ -189,14 +194,14 @@ func BenchmarkQuery(b *testing.B) { b.Error(err) return } - g, err := query.ToSubGraph(gq) + g, err := query.ToSubGraph(gq, ps) if err != nil { b.Error(err) return } ch := make(chan error) - go query.ProcessGraph(g, ch) + go query.ProcessGraph(g, ch, ps) if err := <-ch; err != nil { b.Error(err) return diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index 3d307b99892b064fb8ec3d8c14fcf161cb424ebf..cdf0d29d73ad42ef82a3029e1d7b564f524c7d6b 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -12,6 +12,7 @@ import ( "github.com/dgraph-io/dgraph/loader" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/x" ) @@ -19,8 +20,10 @@ var glog = x.Log("uidassigner_main") var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") -var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.") -var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared") +var instanceIdx = flag.Uint64("instanceIdx", 0, + "Only pick entities, where Fingerprint % numInstance == instanceIdx.") +var numInstances = flag.Uint64("numInstances", 1, + "Total number of instances among which uid assigning is shared") var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -48,7 +51,7 @@ func main() { glog.WithField("instanceIdx", *instanceIdx). WithField("numInstances", *numInstances). - Info("Only those XIDs which satisfy FP(xid) % numInstance == instanceIdx will be given UID") + Info("Only those XIDs with FP(xid) % numInstance == instanceIdx will be given UID") if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") @@ -58,7 +61,9 @@ func main() { ps.Init(*uidDir) defer ps.Close() - posting.Init(ps, nil) + posting.Init(nil) + uid.Init(ps) + loader.Init(nil, ps) files := strings.Split(*rdfGzips, ",") for _, path := range files { diff --git a/server/uidassigner/main_test.go b/server/uidassigner/main_test.go index 7c43abaa53cb843e8dc0e9ded0a9045d1310582a..269b9a839845955eb9114b67f08f1a2bcf31fd7f 100644 --- a/server/uidassigner/main_test.go +++ b/server/uidassigner/main_test.go @@ -11,6 +11,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/uid" "github.com/dgryski/go-farm" ) @@ -33,19 +34,21 @@ func TestQuery(t *testing.T) { clog := commit.NewLogger(dir, "mutations", 50<<20) clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) + + uid.Init(ps) list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"} for _, str := range list { if farm.Fingerprint64([]byte(str))%numInstances == 0 { - uid, err := rdf.GetUid(str, 0, numInstances) + uid, err := rdf.GetUid(str, 0, numInstances, ps) if uid < minIdx0 || uid > minIdx0+mod-1 { t.Error("Not the correct UID", err) } t.Logf("Instance-0 Correct UID", str, uid) } else { - uid, err := rdf.GetUid(str, 1, numInstances) + uid, err := rdf.GetUid(str, 1, numInstances, ps) if uid < minIdx1 || uid > minIdx1+mod-1 { t.Error("Not the correct UID", err) } diff --git a/uid/assigner.go b/uid/assigner.go index 8fb105b07cea8383738513426f40b0838c239f30..fad1d5740423da27e6040d1088f0c27ad34eb1e9 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -25,12 +25,14 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting/types" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/x" "github.com/dgryski/go-farm" ) var glog = x.Log("uid") var lmgr *lockManager +var uidStore *store.Store type entry struct { sync.Mutex @@ -92,6 +94,10 @@ func init() { // go lmgr.clean() } +func Init(ps *store.Store) { + uidStore = ps +} + func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { mod := math.MaxUint64 / numInstances @@ -111,7 +117,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid // Check if this uid has already been allocated. key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.GetOrCreate(key) + pl := posting.GetOrCreate(key, uidStore) if pl.Length() > 0 { // Something already present here. @@ -140,7 +146,8 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid " Wake the stupid developer up.") } -func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) { +func assignNew(pl *posting.List, xid string, instanceIdx uint64, + numInstances uint64) (uint64, error) { entry := lmgr.newOrExisting(xid) entry.Lock() entry.ts = time.Now() @@ -182,7 +189,7 @@ func stringKey(xid string) []byte { func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) { key := stringKey(xid) - pl := posting.GetOrCreate(key) + pl := posting.GetOrCreate(key, uidStore) if pl.Length() == 0 { return assignNew(pl, xid, instanceIdx, numInstances) @@ -203,7 +210,7 @@ func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint6 func ExternalId(uid uint64) (xid string, rerr error) { key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.GetOrCreate(key) + pl := posting.GetOrCreate(key, uidStore) if pl.Length() == 0 { return "", errors.New("NO external id") } diff --git a/uid/assigner_test.go b/uid/assigner_test.go index 826c88581381a67e47204ff29aa80a0ae654c77c..b08eed2052db06aebbaecbba3ac17ffc3d3ae41b 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -42,7 +42,8 @@ func TestGetOrAssign(t *testing.T) { clog.Init() defer clog.Close() - posting.Init(ps, clog) + posting.Init(clog) + Init(ps) var u1, u2 uint64 { diff --git a/posting/worker.go b/worker/worker.go similarity index 87% rename from posting/worker.go rename to worker/worker.go index bbaa558c475f3c1ffc65e9ac1ae50cb1a4252b86..5538e0fa41dd35188fdaa0e8546ca8d885530407 100644 --- a/posting/worker.go +++ b/worker/worker.go @@ -1,11 +1,19 @@ -package posting +package worker import ( + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) +var dataStore *store.Store + +func Init(ps *store.Store) { + dataStore = ps +} + func ProcessTask(query []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(query) q := new(task.Query) @@ -18,8 +26,8 @@ func ProcessTask(query []byte) (result []byte, rerr error) { attr := string(q.Attr()) for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) - key := Key(uid, attr) - pl := GetOrCreate(key) + key := posting.Key(uid, attr) + pl := posting.GetOrCreate(key, dataStore) var valoffset flatbuffers.UOffsetT if val, err := pl.Value(); err != nil { diff --git a/posting/worker_test.go b/worker/worker_test.go similarity index 74% rename from posting/worker_test.go rename to worker/worker_test.go index 2aabe52f05dd08c8c0b3900aae1d4e5517a4250d..06f4be87f02727da500464a9b077a47e34b3c8bf 100644 --- a/posting/worker_test.go +++ b/worker/worker_test.go @@ -1,4 +1,4 @@ -package posting +package worker import ( "fmt" @@ -8,14 +8,15 @@ import ( "time" "github.com/dgraph-io/dgraph/commit" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) -func addEdge(t *testing.T, edge x.DirectedEdge, l *List) { - if err := l.AddMutation(edge, Set); err != nil { +func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) { + if err := l.AddMutation(edge, posting.Set); err != nil { t.Error(err) } } @@ -56,29 +57,30 @@ func TestProcessTask(t *testing.T) { clog.Init() defer clog.Close() - Init(ps, clog) + posting.Init(clog) + Init(ps) edge := x.DirectedEdge{ ValueId: 23, Source: "author0", Timestamp: time.Now(), } - addEdge(t, edge, GetOrCreate(Key(10, "friend"))) - addEdge(t, edge, GetOrCreate(Key(11, "friend"))) - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(11, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 25 - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 26 - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.ValueId = 31 - addEdge(t, edge, GetOrCreate(Key(10, "friend"))) - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps)) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) edge.Value = "photon" - addEdge(t, edge, GetOrCreate(Key(12, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps)) query := NewQuery("friend", []uint64{10, 11, 12}) result, err := ProcessTask(query) @@ -126,7 +128,7 @@ func TestProcessTask(t *testing.T) { t.Errorf("Unable to retrieve value") } var iout interface{} - if err := ParseValue(&iout, tval.ValBytes()); err != nil { + if err := posting.ParseValue(&iout, tval.ValBytes()); err != nil { t.Error(err) } v := iout.(string)