Skip to content
Snippets Groups Projects
Commit 2d29951f authored by Ashwin's avatar Ashwin
Browse files

resolve merge conflict in server/main.go

parents 696c43f0 3a70965d
Branches
No related tags found
No related merge requests found
......@@ -29,11 +29,13 @@ import (
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
)
var glog = x.Log("loader")
var uidStore, dataStore *store.Store
type counters struct {
read uint64
......@@ -50,6 +52,11 @@ type state struct {
numInstances uint64
}
func Init(uidstore, datastore *store.Store) {
uidStore = uidstore
dataStore = datastore
}
func (s *state) printCounters(ticker *time.Ticker) {
var prev uint64
for _ = range ticker.C {
......@@ -122,7 +129,7 @@ func (s *state) parseStream(done chan error) {
func (s *state) handleNQuads(wg *sync.WaitGroup) {
for nq := range s.cnq {
edge, err := nq.ToEdge(s.instanceIdx, s.numInstances)
edge, err := nq.ToEdge(s.instanceIdx, s.numInstances, uidStore)
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
......@@ -133,19 +140,26 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
Error("While converting to edge")
return
}
edge, err = nq.ToEdge(s.instanceIdx, s.numInstances)
edge, err = nq.ToEdge(s.instanceIdx, s.numInstances, uidStore)
}
// Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances ==
s.instanceIdx {
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
}
wg.Done()
}
func (s *state) getUidForString(str string) {
_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances)
_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore)
for err != nil {
// Just put in a retry loop to tackle temporary errors.
if err == posting.E_TMP_ERROR {
......@@ -157,7 +171,7 @@ func (s *state) getUidForString(str string) {
Error("While getting UID")
return
}
_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances)
_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances, dataStore)
}
}
......@@ -183,9 +197,7 @@ func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) {
}
// Blocking function.
func HandleRdfReader(reader io.Reader,
instanceIdx uint64, numInstances uint64) (uint64, error) {
func HandleRdfReader(reader io.Reader, instanceIdx uint64, numInstances uint64) (uint64, error) {
s := new(state)
s.ctr = new(counters)
ticker := time.NewTicker(time.Second)
......@@ -227,9 +239,8 @@ func HandleRdfReader(reader io.Reader,
}
// Blocking function.
func HandleRdfReaderWhileAssign(reader io.Reader,
instanceIdx uint64, numInstances uint64) (uint64, error) {
func HandleRdfReaderWhileAssign(reader io.Reader, instanceIdx uint64,
numInstances uint64) (uint64, error) {
s := new(state)
s.ctr = new(counters)
ticker := time.NewTicker(time.Second)
......
......@@ -155,18 +155,16 @@ func checkMemoryUsage() {
var stopTheWorld sync.RWMutex
var lhmap *gotomic.Hash
var dirtymap *gotomic.Hash
var pstore *store.Store
var clog *commit.Logger
func Init(posting *store.Store, log *commit.Logger) {
func Init(log *commit.Logger) {
lhmap = gotomic.NewHash()
dirtymap = gotomic.NewHash()
pstore = posting
clog = log
go checkMemoryUsage()
}
func GetOrCreate(key []byte) *List {
func GetOrCreate(key []byte, pstore *store.Store) *List {
stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
......
......@@ -26,8 +26,10 @@ import (
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
)
......@@ -261,8 +263,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) {
}
}
func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) {
sg, err := newGraph(gq.UID, gq.XID)
func ToSubGraph(gq *gql.GraphQuery, pstore *store.Store) (*SubGraph, error) {
sg, err := newGraph(gq.UID, gq.XID, pstore)
if err != nil {
return nil, err
}
......@@ -270,7 +272,7 @@ func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) {
return sg, nil
}
func newGraph(euid uint64, exid string) (*SubGraph, error) {
func newGraph(euid uint64, exid string, pstore *store.Store) (*SubGraph, error) {
// This would set the Result field in SubGraph,
// and populate the children for attributes.
if len(exid) > 0 {
......@@ -400,11 +402,11 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) {
return sorted, nil
}
func ProcessGraph(sg *SubGraph, rch chan error) {
func ProcessGraph(sg *SubGraph, rch chan error, pstore *store.Store) {
var err error
if len(sg.query) > 0 && sg.Attr != "_root_" {
// This task execution would go over the wire in later versions.
sg.result, err = posting.ProcessTask(sg.query)
sg.result, err = worker.ProcessTask(sg.query)
if err != nil {
x.Err(glog, err).Error("While processing task.")
rch <- err
......@@ -443,7 +445,7 @@ func ProcessGraph(sg *SubGraph, rch chan error) {
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]
child.query = createTaskQuery(child.Attr, sorted)
go ProcessGraph(child, childchan)
go ProcessGraph(child, childchan, pstore)
}
// Now get all the results back.
......
......@@ -28,6 +28,7 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
)
......@@ -88,11 +89,22 @@ func checkSingleValue(t *testing.T, child *SubGraph,
func TestNewGraph(t *testing.T) {
var ex uint64
ex = 101
sg, err := newGraph(ex, "")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return
}
ps := new(store.Store)
ps.Init(dir)
sg, err := newGraph(ex, "", ps)
if err != nil {
t.Error(err)
}
worker.Init(ps)
uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result)
r.Init(sg.result, uo)
......@@ -111,20 +123,22 @@ func TestNewGraph(t *testing.T) {
}
}
func populateGraph(t *testing.T) string {
func populateGraph(t *testing.T) (string, *store.Store) {
// logrus.SetLevel(logrus.DebugLevel)
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
return ""
return "", nil
}
ps := new(store.Store)
ps.Init(dir)
worker.Init(ps)
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
posting.Init(ps, clog)
posting.Init(clog)
// So, user we're interested in has uid: 1.
// She has 4 friends: 23, 24, 25, 31, and 101
......@@ -133,48 +147,48 @@ func populateGraph(t *testing.T) string {
Source: "testing",
Timestamp: time.Now(),
}
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 24
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 25
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 31
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
edge.ValueId = 101
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
// Now let's add a few properties for the main user.
edge.Value = "Michonne"
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"), ps))
edge.Value = "female"
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"), ps))
edge.Value = "alive"
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"), ps))
// Now let's add a name for each of the friends, except 101.
edge.Value = "Rick Grimes"
addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"), ps))
edge.Value = "Glenn Rhee"
addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"), ps))
edge.Value = "Daryl Dixon"
addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"), ps))
edge.Value = "Andrea"
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"), ps))
return dir
return dir, ps
}
func TestProcessGraph(t *testing.T) {
dir := populateGraph(t)
dir, ps := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
......@@ -194,13 +208,13 @@ func TestProcessGraph(t *testing.T) {
if err != nil {
t.Error(err)
}
sg, err := ToSubGraph(gq)
sg, err := ToSubGraph(gq, ps)
if err != nil {
t.Error(err)
}
ch := make(chan error)
go ProcessGraph(sg, ch)
go ProcessGraph(sg, ch, ps)
err = <-ch
if err != nil {
t.Error(err)
......@@ -265,7 +279,7 @@ func TestProcessGraph(t *testing.T) {
}
func TestToJson(t *testing.T) {
dir := populateGraph(t)
dir, ps := populateGraph(t)
defer os.RemoveAll(dir)
// Alright. Now we have everything set up. Let's create the query.
......@@ -286,13 +300,13 @@ func TestToJson(t *testing.T) {
if err != nil {
t.Error(err)
}
sg, err := ToSubGraph(gq)
sg, err := ToSubGraph(gq, ps)
if err != nil {
t.Error(err)
}
ch := make(chan error)
go ProcessGraph(sg, ch)
go ProcessGraph(sg, ch, ps)
err = <-ch
if err != nil {
t.Error(err)
......
......@@ -23,6 +23,7 @@ import (
"time"
"github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x"
)
......@@ -36,21 +37,23 @@ type NQuad struct {
Language string
}
func GetUid(s string, instanceIdx uint64, numInstances uint64) (uint64, error) {
func GetUid(s string, instanceIdx uint64, numInstances uint64,
rStore *store.Store) (uint64, error) {
if strings.HasPrefix(s, "_uid_:") {
return strconv.ParseUint(s[6:], 0, 64)
}
return uid.GetOrAssign(s, instanceIdx, numInstances)
}
func (nq NQuad) ToEdge(instanceIdx, numInstances uint64) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInstances)
func (nq NQuad) ToEdge(instanceIdx, numInstances uint64,
rStore *store.Store) (result x.DirectedEdge, rerr error) {
sid, err := GetUid(nq.Subject, instanceIdx, numInstances, rStore)
if err != nil {
return result, err
}
result.Entity = sid
if len(nq.ObjectId) > 0 {
oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances)
oid, err := GetUid(nq.ObjectId, instanceIdx, numInstances, rStore)
if err != nil {
return result, err
}
......@@ -103,7 +106,8 @@ func Parse(line string) (rnq NQuad, rerr error) {
// TODO: Strictly parse common types like integers, floats etc.
if len(oval) == 0 {
glog.Fatalf(
"itemObject should be emitted before itemObjectType. Input: %q", line)
"itemObject should be emitted before itemObjectType. Input: %q",
line)
}
oval += "@@" + stripBracketsIfPresent(item.Val)
}
......
......@@ -28,6 +28,7 @@ import (
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x"
)
......@@ -35,9 +36,12 @@ var glog = x.Log("loader_main")
var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data")
var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared")
var instanceIdx = flag.Uint64("instanceIdx", 0,
"Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1,
"Total number of instances among which uid assigning is shared")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var uidDir = flag.String("uidDir", "", "Directory to read UID posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
......@@ -65,11 +69,17 @@ func main() {
if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified")
}
ps := new(store.Store)
ps.Init(*postingDir)
defer ps.Close()
dataStore := new(store.Store)
dataStore.Init(*postingDir)
defer dataStore.Close()
posting.Init(ps, nil)
uidStore := new(store.Store)
uidStore.Init(*uidDir)
defer uidStore.Close()
posting.Init(nil)
uid.Init(uidStore)
loader.Init(uidStore, dataStore)
files := strings.Split(*rdfGzips, ",")
for _, path := range files {
......
......@@ -30,6 +30,8 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
)
......@@ -51,67 +53,69 @@ func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Connection", "close")
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
func queryHandler(ps *store.Store) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
addCorsHeaders(w)
if r.Method == "OPTIONS" {
return
}
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
sg, err := query.ToSubGraph(gq, ps)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch, ps)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
if r.Method != "POST" {
x.SetStatus(w, x.E_INVALID_METHOD, "Invalid method")
return
}
var l query.Latency
l.Start = time.Now()
defer r.Body.Close()
q, err := ioutil.ReadAll(r.Body)
if err != nil || len(q) == 0 {
x.Err(glog, err).Error("While reading query")
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Debug("Query received.")
gq, _, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
sg, err := query.ToSubGraph(gq)
if err != nil {
x.Err(glog, err).Error("While conversion to internal format")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
l.Parsing = time.Since(l.Start)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch)
err = <-rch
if err != nil {
x.Err(glog, err).Error("While executing query")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
l.Processing = time.Since(l.Start) - l.Parsing
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson(&l)
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithFields(logrus.Fields{
"total": time.Since(l.Start),
"parsing": l.Parsing,
"process": l.Processing,
"json": l.Json,
}).Info("Query Latencies")
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js))
}
func main() {
......@@ -135,9 +139,11 @@ func main() {
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
posting.Init(clog)
worker.Init(ps)
uid.Init(ps)
http.HandleFunc("/query", queryHandler)
http.HandleFunc("/query", queryHandler(ps))
glog.WithField("port", *port).Info("Listening for requests...")
if err := http.ListenAndServe(":"+*port, nil); err != nil {
x.Err(glog, err).Fatal("ListenAndServe")
......
......@@ -28,6 +28,8 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker"
)
var q0 = `
......@@ -43,34 +45,37 @@ var q0 = `
}
`
func prepare() (dir1, dir2 string, clog *commit.Logger, rerr error) {
func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr error) {
var err error
dir1, err = ioutil.TempDir("", "storetest_")
if err != nil {
return "", "", nil, err
return "", "", nil, nil, err
}
ps := new(store.Store)
ps = new(store.Store)
ps.Init(dir1)
dir2, err = ioutil.TempDir("", "storemuts_")
if err != nil {
return dir1, "", nil, err
return dir1, "", nil, nil, err
}
clog = commit.NewLogger(dir2, "mutations", 50<<20)
clog.Init()
posting.Init(ps, clog)
posting.Init(clog)
worker.Init(ps)
uid.Init(ps)
loader.Init(ps, ps)
f, err := os.Open("testdata.nq")
if err != nil {
return dir1, dir2, clog, err
return dir1, dir2, nil, clog, err
}
defer f.Close()
_, err = loader.HandleRdfReader(f, 0, 1)
if err != nil {
return dir1, dir2, clog, err
return dir1, dir2, nil, clog, err
}
return dir1, dir2, clog, nil
return dir1, dir2, ps, clog, nil
}
func closeAll(dir1, dir2 string, clog *commit.Logger) {
......@@ -80,7 +85,7 @@ func closeAll(dir1, dir2 string, clog *commit.Logger) {
}
func TestQuery(t *testing.T) {
dir1, dir2, clog, err := prepare()
dir1, dir2, ps, clog, err := prepare()
if err != nil {
t.Error(err)
return
......@@ -93,7 +98,7 @@ func TestQuery(t *testing.T) {
t.Error(err)
return
}
g, err := query.ToSubGraph(gq)
g, err := query.ToSubGraph(gq, ps)
if err != nil {
t.Error(err)
return
......@@ -137,7 +142,7 @@ func TestQuery(t *testing.T) {
}
ch := make(chan error)
go query.ProcessGraph(g, ch)
go query.ProcessGraph(g, ch, ps)
if err := <-ch; err != nil {
t.Error(err)
return
......@@ -175,7 +180,7 @@ var q1 = `
`
func BenchmarkQuery(b *testing.B) {
dir1, dir2, clog, err := prepare()
dir1, dir2, ps, clog, err := prepare()
if err != nil {
b.Error(err)
return
......@@ -189,14 +194,14 @@ func BenchmarkQuery(b *testing.B) {
b.Error(err)
return
}
g, err := query.ToSubGraph(gq)
g, err := query.ToSubGraph(gq, ps)
if err != nil {
b.Error(err)
return
}
ch := make(chan error)
go query.ProcessGraph(g, ch)
go query.ProcessGraph(g, ch, ps)
if err := <-ch; err != nil {
b.Error(err)
return
......
......@@ -12,6 +12,7 @@ import (
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/x"
)
......@@ -19,8 +20,10 @@ var glog = x.Log("uidassigner_main")
var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data")
var instanceIdx = flag.Uint64("instanceIdx", 0, "Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, "Total number of instances among which uid assigning is shared")
var instanceIdx = flag.Uint64("instanceIdx", 0,
"Only pick entities, where Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1,
"Total number of instances among which uid assigning is shared")
var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process")
......@@ -48,7 +51,7 @@ func main() {
glog.WithField("instanceIdx", *instanceIdx).
WithField("numInstances", *numInstances).
Info("Only those XIDs which satisfy FP(xid) % numInstance == instanceIdx will be given UID")
Info("Only those XIDs with FP(xid) % numInstance == instanceIdx will be given UID")
if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified")
......@@ -58,7 +61,9 @@ func main() {
ps.Init(*uidDir)
defer ps.Close()
posting.Init(ps, nil)
posting.Init(nil)
uid.Init(ps)
loader.Init(nil, ps)
files := strings.Split(*rdfGzips, ",")
for _, path := range files {
......
......@@ -11,6 +11,7 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgryski/go-farm"
)
......@@ -33,19 +34,21 @@ func TestQuery(t *testing.T) {
clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
posting.Init(clog)
uid.Init(ps)
list := []string{"alice", "bob", "mallory", "ash", "man", "dgraph"}
for _, str := range list {
if farm.Fingerprint64([]byte(str))%numInstances == 0 {
uid, err := rdf.GetUid(str, 0, numInstances)
uid, err := rdf.GetUid(str, 0, numInstances, ps)
if uid < minIdx0 || uid > minIdx0+mod-1 {
t.Error("Not the correct UID", err)
}
t.Logf("Instance-0 Correct UID", str, uid)
} else {
uid, err := rdf.GetUid(str, 1, numInstances)
uid, err := rdf.GetUid(str, 1, numInstances, ps)
if uid < minIdx1 || uid > minIdx1+mod-1 {
t.Error("Not the correct UID", err)
}
......
......@@ -25,12 +25,14 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
)
var glog = x.Log("uid")
var lmgr *lockManager
var uidStore *store.Store
type entry struct {
sync.Mutex
......@@ -92,6 +94,10 @@ func init() {
// go lmgr.clean()
}
func Init(ps *store.Store) {
uidStore = ps
}
func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) {
mod := math.MaxUint64 / numInstances
......@@ -111,7 +117,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid
// Check if this uid has already been allocated.
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.GetOrCreate(key)
pl := posting.GetOrCreate(key, uidStore)
if pl.Length() > 0 {
// Something already present here.
......@@ -140,7 +146,8 @@ func allocateUniqueUid(xid string, instanceIdx uint64, numInstances uint64) (uid
" Wake the stupid developer up.")
}
func assignNew(pl *posting.List, xid string, instanceIdx uint64, numInstances uint64) (uint64, error) {
func assignNew(pl *posting.List, xid string, instanceIdx uint64,
numInstances uint64) (uint64, error) {
entry := lmgr.newOrExisting(xid)
entry.Lock()
entry.ts = time.Now()
......@@ -182,7 +189,7 @@ func stringKey(xid string) []byte {
func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint64, rerr error) {
key := stringKey(xid)
pl := posting.GetOrCreate(key)
pl := posting.GetOrCreate(key, uidStore)
if pl.Length() == 0 {
return assignNew(pl, xid, instanceIdx, numInstances)
......@@ -203,7 +210,7 @@ func GetOrAssign(xid string, instanceIdx uint64, numInstances uint64) (uid uint6
func ExternalId(uid uint64) (xid string, rerr error) {
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.GetOrCreate(key)
pl := posting.GetOrCreate(key, uidStore)
if pl.Length() == 0 {
return "", errors.New("NO external id")
}
......
......@@ -42,7 +42,8 @@ func TestGetOrAssign(t *testing.T) {
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
posting.Init(clog)
Init(ps)
var u1, u2 uint64
{
......
package posting
package worker
import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
)
var dataStore *store.Store
func Init(ps *store.Store) {
dataStore = ps
}
func ProcessTask(query []byte) (result []byte, rerr error) {
uo := flatbuffers.GetUOffsetT(query)
q := new(task.Query)
......@@ -18,8 +26,8 @@ func ProcessTask(query []byte) (result []byte, rerr error) {
attr := string(q.Attr())
for i := 0; i < q.UidsLength(); i++ {
uid := q.Uids(i)
key := Key(uid, attr)
pl := GetOrCreate(key)
key := posting.Key(uid, attr)
pl := posting.GetOrCreate(key, dataStore)
var valoffset flatbuffers.UOffsetT
if val, err := pl.Value(); err != nil {
......
package posting
package worker
import (
"fmt"
......@@ -8,14 +8,15 @@ import (
"time"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go"
)
func addEdge(t *testing.T, edge x.DirectedEdge, l *List) {
if err := l.AddMutation(edge, Set); err != nil {
func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) {
if err := l.AddMutation(edge, posting.Set); err != nil {
t.Error(err)
}
}
......@@ -56,29 +57,30 @@ func TestProcessTask(t *testing.T) {
clog.Init()
defer clog.Close()
Init(ps, clog)
posting.Init(clog)
Init(ps)
edge := x.DirectedEdge{
ValueId: 23,
Source: "author0",
Timestamp: time.Now(),
}
addEdge(t, edge, GetOrCreate(Key(10, "friend")))
addEdge(t, edge, GetOrCreate(Key(11, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps))
addEdge(t, edge, posting.GetOrCreate(posting.Key(11, "friend"), ps))
addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps))
edge.ValueId = 25
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps))
edge.ValueId = 26
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps))
edge.ValueId = 31
addEdge(t, edge, GetOrCreate(Key(10, "friend")))
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(10, "friend"), ps))
addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps))
edge.Value = "photon"
addEdge(t, edge, GetOrCreate(Key(12, "friend")))
addEdge(t, edge, posting.GetOrCreate(posting.Key(12, "friend"), ps))
query := NewQuery("friend", []uint64{10, 11, 12})
result, err := ProcessTask(query)
......@@ -126,7 +128,7 @@ func TestProcessTask(t *testing.T) {
t.Errorf("Unable to retrieve value")
}
var iout interface{}
if err := ParseValue(&iout, tval.ValBytes()); err != nil {
if err := posting.ParseValue(&iout, tval.ValBytes()); err != nil {
t.Error(err)
}
v := iout.(string)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment