Skip to content
Snippets Groups Projects
Commit 15741245 authored by Manish R Jain's avatar Manish R Jain
Browse files

Merge pull request #43 from dgraph-io/dmuts

Distributed query and mutation testing
parents 520e8560 2b7bc95a
No related branches found
No related tags found
No related merge requests found
# Dockerfile for DGraph # Dockerfile for DGraph
FROM golang:1.4.3 FROM golang:1.5.3
MAINTAINER Manish Jain <manishrjain@gmail.com> MAINTAINER Manish Jain <manishrjain@gmail.com>
# Get the necessary packages. # Get the necessary packages.
...@@ -21,7 +21,7 @@ ENV LD_LIBRARY_PATH "/usr/local/lib" ...@@ -21,7 +21,7 @@ ENV LD_LIBRARY_PATH "/usr/local/lib"
# Install DGraph and update dependencies to right versions. # Install DGraph and update dependencies to right versions.
RUN go get -v github.com/robfig/glock && \ RUN go get -v github.com/robfig/glock && \
go get -v github.com/dgraph-io/dgraph/... && \ go get -v github.com/dgraph-io/dgraph/... && \
glock sync github.com/dgraph-io/dgraph && echo "v0.1.3" glock sync github.com/dgraph-io/dgraph && echo "v0.2.0"
# Run some tests, don't build an image if we're failing tests. # Run some tests, don't build an image if we're failing tests.
RUN go test github.com/dgraph-io/dgraph/... RUN go test github.com/dgraph-io/dgraph/...
......
...@@ -66,12 +66,9 @@ type Lexer struct { ...@@ -66,12 +66,9 @@ type Lexer struct {
Mode int // mode based on information so far. Mode int // mode based on information so far.
} }
func NewLexer(input string) *Lexer { func (l *Lexer) Init(input string) {
l := &Lexer{ l.Input = input
Input: input, l.Items = make(chan item, 5)
Items: make(chan item, 100),
}
return l
} }
func (l *Lexer) Errorf(format string, func (l *Lexer) Errorf(format string,
......
...@@ -158,7 +158,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -158,7 +158,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
wg.Done() wg.Done()
} }
func (s *state) getUidForString(str string) { func (s *state) getUidForString(str string) error {
_, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances)
for err != nil { for err != nil {
// Just put in a retry loop to tackle temporary errors. // Just put in a retry loop to tackle temporary errors.
...@@ -169,31 +169,39 @@ func (s *state) getUidForString(str string) { ...@@ -169,31 +169,39 @@ func (s *state) getUidForString(str string) {
} else { } else {
glog.WithError(err).WithField("nq.Subject", str). glog.WithError(err).WithField("nq.Subject", str).
Error("While getting UID") Error("While getting UID")
return return err
} }
_, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances)
} }
return nil
} }
func (s *state) assignUidsOnly(wg *sync.WaitGroup) { func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
defer wg.Done()
for nq := range s.cnq { for nq := range s.cnq {
if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx { ignored := true
// This instance shouldnt assign UID to this string if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx {
atomic.AddUint64(&s.ctr.ignored, 1) if err := s.getUidForString(nq.Subject); err != nil {
} else { glog.WithError(err).Fatal("While assigning Uid to subject.")
s.getUidForString(nq.Subject) }
ignored = false
} }
if len(nq.ObjectId) == 0 || if len(nq.ObjectId) > 0 &&
farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx { farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx {
// This instance shouldnt or cant assign UID to this string if err := s.getUidForString(nq.ObjectId); err != nil {
glog.WithError(err).Fatal("While assigning Uid to object.")
}
ignored = false
}
if ignored {
atomic.AddUint64(&s.ctr.ignored, 1) atomic.AddUint64(&s.ctr.ignored, 1)
} else { } else {
s.getUidForString(nq.ObjectId) atomic.AddUint64(&s.ctr.processed, 1)
} }
} }
wg.Done()
} }
// Blocking function. // Blocking function.
......
...@@ -39,12 +39,11 @@ import ( ...@@ -39,12 +39,11 @@ import (
) )
var glog = x.Log("posting") var glog = x.Log("posting")
var E_TMP_ERROR = fmt.Errorf("Temporary Error. Please retry.")
const Set = 0x01 const Set = 0x01
const Del = 0x02 const Del = 0x02
var E_TMP_ERROR = errors.New("Temporary Error. Please retry.")
type buffer struct { type buffer struct {
d []byte d []byte
} }
......
...@@ -27,7 +27,6 @@ import ( ...@@ -27,7 +27,6 @@ import (
"github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/uid"
"github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/x"
"github.com/google/flatbuffers/go" "github.com/google/flatbuffers/go"
...@@ -275,15 +274,15 @@ func newGraph(euid uint64, exid string) (*SubGraph, error) { ...@@ -275,15 +274,15 @@ func newGraph(euid uint64, exid string) (*SubGraph, error) {
// This would set the Result field in SubGraph, // This would set the Result field in SubGraph,
// and populate the children for attributes. // and populate the children for attributes.
if len(exid) > 0 { if len(exid) > 0 {
// instanceIdx = 0, numInstances = 1 by default xidToUid := make(map[string]uint64)
u, err := uid.GetOrAssign(exid, 0, 1) xidToUid[exid] = 0
if err != nil { if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
x.Err(glog, err).WithField("xid", exid).Error( glog.WithError(err).Error("While getting uids over network")
"While GetOrAssign uid from external id")
return nil, err return nil, err
} }
glog.WithField("xid", exid).WithField("_uid_", u).Debug("GetOrAssign")
euid = u euid = xidToUid[exid]
glog.WithField("xid", exid).WithField("uid", euid).Debug("GetOrAssign")
} }
if euid == 0 { if euid == 0 {
......
go tool pprof --alloc_objects uidassigner heap.prof
(pprof) top10
196427053 of 207887723 total (94.49%)
Dropped 41 nodes (cum <= 1039438)
Showing top 10 nodes out of 31 (cum >= 8566234)
flat flat% sum% cum cum%
55529704 26.71% 26.71% 55529704 26.71% github.com/dgraph-io/dgraph/rdf.Parse
28255068 13.59% 40.30% 30647245 14.74% github.com/dgraph-io/dgraph/posting.(*List).getPostingList
20406729 9.82% 50.12% 20406729 9.82% github.com/zond/gotomic.newRealEntryWithHashCode
17777182 8.55% 58.67% 17777182 8.55% strings.makeCutsetFunc
17582839 8.46% 67.13% 17706815 8.52% github.com/dgraph-io/dgraph/loader.(*state).readLines
15139047 7.28% 74.41% 88445933 42.55% github.com/dgraph-io/dgraph/loader.(*state).parseStream
12927366 6.22% 80.63% 12927366 6.22% github.com/zond/gotomic.(*element).search
10789028 5.19% 85.82% 66411362 31.95% github.com/dgraph-io/dgraph/posting.GetOrCreate
9453856 4.55% 90.37% 9453856 4.55% github.com/zond/gotomic.(*hashHit).search
8566234 4.12% 94.49% 8566234 4.12% github.com/dgraph-io/dgraph/uid.stringKey
(pprof) list rdf.Parse
Total: 207887723
ROUTINE ======================== github.com/dgraph-io/dgraph/rdf.Parse in /home/mrjn/go/src/github.com/dgraph-io/dgraph/rdf/parse.go
55529704 55529704 (flat, cum) 26.71% of Total
. . 118: }
. . 119: return val[1 : len(val)-1]
. . 120:}
. . 121:
. . 122:func Parse(line string) (rnq NQuad, rerr error) {
54857942 54857942 123: l := lex.NewLexer(line)
. . 124: go run(l)
. . 125: var oval string
. . 126: var vend bool
This showed that lex.NewLexer(..) was pretty expensive in terms of memory allocation.
So, let's use sync.Pool here.
After using sync.Pool, this is the output:
422808936 of 560381333 total (75.45%)
Dropped 63 nodes (cum <= 2801906)
Showing top 10 nodes out of 62 (cum >= 18180150)
flat flat% sum% cum cum%
103445194 18.46% 18.46% 103445194 18.46% github.com/Sirupsen/logrus.(*Entry).WithFields
65448918 11.68% 30.14% 163184489 29.12% github.com/Sirupsen/logrus.(*Entry).WithField
48366300 8.63% 38.77% 203838187 36.37% github.com/dgraph-io/dgraph/posting.(*List).get
39789719 7.10% 45.87% 49276181 8.79% github.com/dgraph-io/dgraph/posting.(*List).getPostingList
36642638 6.54% 52.41% 36642638 6.54% github.com/dgraph-io/dgraph/lex.NewLexer
35190301 6.28% 58.69% 35190301 6.28% github.com/google/flatbuffers/go.(*Builder).growByteBuffer
31392455 5.60% 64.29% 31392455 5.60% github.com/zond/gotomic.newRealEntryWithHashCode
25895676 4.62% 68.91% 25895676 4.62% github.com/zond/gotomic.(*element).search
18546971 3.31% 72.22% 72863016 13.00% github.com/dgraph-io/dgraph/loader.(*state).parseStream
18090764 3.23% 75.45% 18180150 3.24% github.com/dgraph-io/dgraph/loader.(*state).readLines
After a few more discussions, I realized that lexer didn't need to be allocated on the heap.
So, I switched it to be allocated on stack. These are the results.
$ go tool pprof uidassigner heap.prof
Entering interactive mode (type "help" for commands)
(pprof) top10
1308.70MB of 1696.59MB total (77.14%)
Dropped 73 nodes (cum <= 8.48MB)
Showing top 10 nodes out of 52 (cum >= 161.50MB)
flat flat% sum% cum cum%
304.56MB 17.95% 17.95% 304.56MB 17.95% github.com/dgraph-io/dgraph/posting.NewList
209.55MB 12.35% 30.30% 209.55MB 12.35% github.com/Sirupsen/logrus.(*Entry).WithFields
207.55MB 12.23% 42.54% 417.10MB 24.58% github.com/Sirupsen/logrus.(*Entry).WithField
108MB 6.37% 48.90% 108MB 6.37% github.com/dgraph-io/dgraph/uid.(*lockManager).newOrExisting
88MB 5.19% 54.09% 88MB 5.19% github.com/zond/gotomic.newMockEntry
85.51MB 5.04% 59.13% 85.51MB 5.04% github.com/google/flatbuffers/go.(*Builder).growByteBuffer
78.01MB 4.60% 63.73% 78.01MB 4.60% github.com/dgraph-io/dgraph/posting.Key
78.01MB 4.60% 68.32% 78.51MB 4.63% github.com/dgraph-io/dgraph/uid.stringKey
76MB 4.48% 72.80% 76MB 4.48% github.com/zond/gotomic.newRealEntryWithHashCode
73.50MB 4.33% 77.14% 161.50MB 9.52% github.com/zond/gotomic.(*Hash).getBucketByIndex
Now, rdf.Parse is no longer shows up in memory profiler. Win!
...@@ -120,7 +120,9 @@ func stripBracketsIfPresent(val string) string { ...@@ -120,7 +120,9 @@ func stripBracketsIfPresent(val string) string {
} }
func Parse(line string) (rnq NQuad, rerr error) { func Parse(line string) (rnq NQuad, rerr error) {
l := lex.NewLexer(line) l := &lex.Lexer{}
l.Init(line)
go run(l) go run(l)
var oval string var oval string
var vend bool var vend bool
......
...@@ -88,6 +88,7 @@ func mutationHandler(mu *gql.Mutation) error { ...@@ -88,6 +88,7 @@ func mutationHandler(mu *gql.Mutation) error {
} }
} }
if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil { if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil {
glog.WithError(err).Error("GetOrAssignUidsOverNetwork")
return err return err
} }
...@@ -143,7 +144,20 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { ...@@ -143,7 +144,20 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return return
} }
mutationHandler(mu)
// If we have mutations, run them first.
if mu != nil && len(mu.Set) > 0 {
if err = mutationHandler(mu); err != nil {
glog.WithError(err).Error("While handling mutations.")
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
}
if gq == nil || (gq.UID == 0 && len(gq.XID) == 0) {
x.SetStatus(w, x.E_OK, "Done")
return
}
sg, err := query.ToSubGraph(gq) sg, err := query.ToSubGraph(gq)
if err != nil { if err != nil {
...@@ -204,6 +218,10 @@ func main() { ...@@ -204,6 +218,10 @@ func main() {
addrs := strings.Split(*workers, ",") addrs := strings.Split(*workers, ",")
lenAddr := uint64(len(addrs)) lenAddr := uint64(len(addrs))
if lenAddr == 0 {
// If no worker is specified, then we're it.
lenAddr = 1
}
posting.Init(clog) posting.Init(clog)
......
(pprof) top10
4.19mins of 7.46mins total (56.23%)
Dropped 569 nodes (cum <= 0.75mins)
Showing top 10 nodes out of 25 (cum >= 2.78mins)
flat flat% sum% cum cum%
1.71mins 22.86% 22.86% 4.03mins 54.02% runtime.scanobject
1.16mins 15.62% 38.48% 1.16mins 15.62% runtime.heapBitsForObject
0.95mins 12.77% 51.25% 1.21mins 16.16% runtime.greyobject
0.19mins 2.56% 53.81% 3.46mins 46.41% runtime.mallocgc
0.07mins 0.88% 54.69% 1.08mins 14.53% runtime.mapassign1
0.04mins 0.48% 55.17% 3.82mins 51.17% runtime.systemstack
0.03mins 0.35% 55.52% 0.92mins 12.36% runtime.gcDrain
0.02mins 0.26% 55.78% 1.42mins 18.98% github.com/dgraph-io/dgraph/rdf.Parse
0.02mins 0.23% 56.01% 0.92mins 12.29% runtime.newobject
0.02mins 0.22% 56.23% 2.78mins 37.25% runtime.gcDrainN
The above CPU profile was generated before using any `sync.Pool`
...@@ -27,6 +27,7 @@ var numInstances = flag.Uint64("numInstances", 1, ...@@ -27,6 +27,7 @@ var numInstances = flag.Uint64("numInstances", 1,
var uidDir = flag.String("uidpostings", "", var uidDir = flag.String("uidpostings", "",
"Directory to store xid to uid posting lists") "Directory to store xid to uid posting lists")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var memprofile = flag.String("memprofile", "", "write memory profile to file")
var numcpu = flag.Int("numCpu", runtime.NumCPU(), var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process") "Number of cores to be used by the process")
...@@ -93,4 +94,13 @@ func main() { ...@@ -93,4 +94,13 @@ func main() {
} }
glog.Info("Calling merge lists") glog.Info("Calling merge lists")
posting.MergeLists(100 * numCpus) // 100 per core. posting.MergeLists(100 * numCpus) // 100 per core.
if len(*memprofile) > 0 {
f, err := os.Create(*memprofile)
if err != nil {
glog.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
} }
...@@ -33,6 +33,11 @@ import ( ...@@ -33,6 +33,11 @@ import (
var glog = x.Log("uid") var glog = x.Log("uid")
var lmgr *lockManager var lmgr *lockManager
var uidStore *store.Store var uidStore *store.Store
var eidPool = sync.Pool{
New: func() interface{} {
return new(entry)
},
}
type entry struct { type entry struct {
sync.Mutex sync.Mutex
...@@ -63,7 +68,7 @@ func (lm *lockManager) newOrExisting(xid string) *entry { ...@@ -63,7 +68,7 @@ func (lm *lockManager) newOrExisting(xid string) *entry {
if e, ok := lm.locks[xid]; ok { if e, ok := lm.locks[xid]; ok {
return e return e
} }
e := new(entry) e := eidPool.Get().(*entry)
e.ts = time.Now() e.ts = time.Now()
lm.locks[xid] = e lm.locks[xid] = e
return e return e
...@@ -78,6 +83,7 @@ func (lm *lockManager) clean() { ...@@ -78,6 +83,7 @@ func (lm *lockManager) clean() {
if e.isOld() { if e.isOld() {
count += 1 count += 1
delete(lm.locks, xid) delete(lm.locks, xid)
eidPool.Put(e)
} }
} }
lm.Unlock() lm.Unlock()
...@@ -91,6 +97,7 @@ func (lm *lockManager) clean() { ...@@ -91,6 +97,7 @@ func (lm *lockManager) clean() {
func init() { func init() {
lmgr = new(lockManager) lmgr = new(lockManager)
lmgr.locks = make(map[string]*entry) lmgr.locks = make(map[string]*entry)
// TODO(manishrjain): This map should be cleaned up.
// go lmgr.clean() // go lmgr.clean()
} }
...@@ -181,9 +188,8 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, ...@@ -181,9 +188,8 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64,
} }
func stringKey(xid string) []byte { func stringKey(xid string) []byte {
buf := new(bytes.Buffer) var buf bytes.Buffer
buf.WriteString("_uid_") buf.WriteString("_uid_|")
buf.WriteString("|")
buf.WriteString(xid) buf.WriteString(xid)
return buf.Bytes() return buf.Bytes()
} }
......
package worker package worker
import ( import (
"fmt"
"sync" "sync"
"github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/conn"
...@@ -33,6 +34,10 @@ func createXidListBuffer(xids map[string]uint64) []byte { ...@@ -33,6 +34,10 @@ func createXidListBuffer(xids map[string]uint64) []byte {
func getOrAssignUids( func getOrAssignUids(
xidList *task.XidList) (uidList []byte, rerr error) { xidList *task.XidList) (uidList []byte, rerr error) {
if xidList.XidsLength() == 0 {
return uidList, fmt.Errorf("Empty xid list")
}
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
uids := make([]uint64, xidList.XidsLength()) uids := make([]uint64, xidList.XidsLength())
che := make(chan error, xidList.XidsLength()) che := make(chan error, xidList.XidsLength())
...@@ -40,15 +45,15 @@ func getOrAssignUids( ...@@ -40,15 +45,15 @@ func getOrAssignUids(
wg.Add(1) wg.Add(1)
xid := string(xidList.Xids(i)) xid := string(xidList.Xids(i))
go func() { go func(idx int) {
defer wg.Done() defer wg.Done()
u, err := uid.GetOrAssign(xid, 0, 1) u, err := uid.GetOrAssign(xid, 0, 1)
if err != nil { if err != nil {
che <- err che <- err
return return
} }
uids[i] = u uids[idx] = u
}() }(i)
} }
wg.Wait() wg.Wait()
close(che) close(che)
......
...@@ -42,6 +42,7 @@ func mutate(m *Mutations, left *Mutations) error { ...@@ -42,6 +42,7 @@ func mutate(m *Mutations, left *Mutations) error {
return fmt.Errorf("predicate fingerprint doesn't match this instance.") return fmt.Errorf("predicate fingerprint doesn't match this instance.")
} }
glog.WithField("edge", edge).Debug("mutate")
key := posting.Key(edge.Entity, edge.Attribute) key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore) plist := posting.GetOrCreate(key, dataStore)
if err := plist.AddMutation(edge, posting.Set); err != nil { if err := plist.AddMutation(edge, posting.Set); err != nil {
...@@ -93,6 +94,7 @@ func MutateOverNetwork( ...@@ -93,6 +94,7 @@ func MutateOverNetwork(
mu := mutationArray[idx] mu := mutationArray[idx]
if mu == nil { if mu == nil {
mu = new(Mutations) mu = new(Mutations)
mutationArray[idx] = mu
} }
mu.Set = append(mu.Set, edge) mu.Set = append(mu.Set, edge)
} }
......
...@@ -16,6 +16,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { ...@@ -16,6 +16,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
attr := string(q.Attr()) attr := string(q.Attr())
idx := farm.Fingerprint64([]byte(attr)) % numInstances idx := farm.Fingerprint64([]byte(attr)) % numInstances
glog.WithField("idx", idx).WithField("attr", attr).
WithField("numInstances", numInstances).Debug("ProcessTaskOverNetwork")
var runHere bool var runHere bool
if attr == "_xid_" || attr == "_uid_" { if attr == "_xid_" || attr == "_uid_" {
...@@ -39,8 +41,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { ...@@ -39,8 +41,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
if err := pool.Call("Worker.ServeTask", query, reply); err != nil { if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
glog.WithField("call", "Worker.ServeTask").Fatal(err) glog.WithField("call", "Worker.ServeTask").Fatal(err)
} }
glog.WithField("reply", string(reply.Data)).WithField("addr", addr). glog.WithField("reply_len", len(reply.Data)).WithField("addr", addr).
Info("Got reply from server") Debug("Got reply from server")
return reply.Data, nil return reply.Data, nil
} }
......
...@@ -126,6 +126,7 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { ...@@ -126,6 +126,7 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
q := new(task.Query) q := new(task.Query)
q.Init(query.Data, uo) q.Init(query.Data, uo)
attr := string(q.Attr()) attr := string(q.Attr())
glog.WithField("attr", attr).Debug("ServeTask")
if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
reply.Data, rerr = processTask(query.Data) reply.Data, rerr = processTask(query.Data)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment