diff --git a/Dockerfile b/Dockerfile index d963c15cb3326e8d35bc259d899d703c8b01a27e..51c74a4a3a60444eb6994bfabc16ffa8050dd32d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # Dockerfile for DGraph -FROM golang:1.4.3 +FROM golang:1.5.3 MAINTAINER Manish Jain <manishrjain@gmail.com> # Get the necessary packages. @@ -21,7 +21,7 @@ ENV LD_LIBRARY_PATH "/usr/local/lib" # Install DGraph and update dependencies to right versions. RUN go get -v github.com/robfig/glock && \ go get -v github.com/dgraph-io/dgraph/... && \ - glock sync github.com/dgraph-io/dgraph && echo "v0.1.3" + glock sync github.com/dgraph-io/dgraph && echo "v0.2.0" # Run some tests, don't build an image if we're failing tests. RUN go test github.com/dgraph-io/dgraph/... diff --git a/lex/lexer.go b/lex/lexer.go index 9027e11044c0e08449a18179af0902d37abe3702..c05fb98e3e943894505ece51fb76859037cd9aca 100644 --- a/lex/lexer.go +++ b/lex/lexer.go @@ -66,12 +66,9 @@ type Lexer struct { Mode int // mode based on information so far. } -func NewLexer(input string) *Lexer { - l := &Lexer{ - Input: input, - Items: make(chan item, 100), - } - return l +func (l *Lexer) Init(input string) { + l.Input = input + l.Items = make(chan item, 5) } func (l *Lexer) Errorf(format string, diff --git a/loader/loader.go b/loader/loader.go index f9beb446b63f3b959c5779439b45a4ec36d4d697..aa46cd220ab8bc7d71f262c82c3d84592e9d27b3 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -158,7 +158,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { wg.Done() } -func (s *state) getUidForString(str string) { +func (s *state) getUidForString(str string) error { _, err := rdf.GetUid(str, s.instanceIdx, s.numInstances) for err != nil { // Just put in a retry loop to tackle temporary errors. @@ -169,31 +169,39 @@ func (s *state) getUidForString(str string) { } else { glog.WithError(err).WithField("nq.Subject", str). Error("While getting UID") - return + return err } _, err = rdf.GetUid(str, s.instanceIdx, s.numInstances) } + return nil } func (s *state) assignUidsOnly(wg *sync.WaitGroup) { + defer wg.Done() + for nq := range s.cnq { - if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances != s.instanceIdx { - // This instance shouldnt assign UID to this string - atomic.AddUint64(&s.ctr.ignored, 1) - } else { - s.getUidForString(nq.Subject) + ignored := true + if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx { + if err := s.getUidForString(nq.Subject); err != nil { + glog.WithError(err).Fatal("While assigning Uid to subject.") + } + ignored = false } - if len(nq.ObjectId) == 0 || - farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances != s.instanceIdx { - // This instance shouldnt or cant assign UID to this string + if len(nq.ObjectId) > 0 && + farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx { + if err := s.getUidForString(nq.ObjectId); err != nil { + glog.WithError(err).Fatal("While assigning Uid to object.") + } + ignored = false + } + + if ignored { atomic.AddUint64(&s.ctr.ignored, 1) } else { - s.getUidForString(nq.ObjectId) + atomic.AddUint64(&s.ctr.processed, 1) } } - - wg.Done() } // Blocking function. diff --git a/posting/list.go b/posting/list.go index 31c859b2b2e1c21b8dadcb6deaeae63025085453..cef683f13c7acc948d81354b98e8c7b5f795fa89 100644 --- a/posting/list.go +++ b/posting/list.go @@ -39,12 +39,11 @@ import ( ) var glog = x.Log("posting") +var E_TMP_ERROR = fmt.Errorf("Temporary Error. Please retry.") const Set = 0x01 const Del = 0x02 -var E_TMP_ERROR = errors.New("Temporary Error. Please retry.") - type buffer struct { d []byte } diff --git a/query/query.go b/query/query.go index e564c87355b2423fe08acc5db0a26d1d4b5ac2bb..0c5c3d92aa84dba998fbfdb93a3aea396f3941ea 100644 --- a/query/query.go +++ b/query/query.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/task" - "github.com/dgraph-io/dgraph/uid" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" @@ -275,15 +274,15 @@ func newGraph(euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { - // instanceIdx = 0, numInstances = 1 by default - u, err := uid.GetOrAssign(exid, 0, 1) - if err != nil { - x.Err(glog, err).WithField("xid", exid).Error( - "While GetOrAssign uid from external id") + xidToUid := make(map[string]uint64) + xidToUid[exid] = 0 + if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil { + glog.WithError(err).Error("While getting uids over network") return nil, err } - glog.WithField("xid", exid).WithField("_uid_", u).Debug("GetOrAssign") - euid = u + + euid = xidToUid[exid] + glog.WithField("xid", exid).WithField("uid", euid).Debug("GetOrAssign") } if euid == 0 { diff --git a/rdf/README.txt b/rdf/README.txt new file mode 100644 index 0000000000000000000000000000000000000000..13ec847c20968e38e3b70a57f1750e6e386b6ab2 --- /dev/null +++ b/rdf/README.txt @@ -0,0 +1,76 @@ +go tool pprof --alloc_objects uidassigner heap.prof + +(pprof) top10 +196427053 of 207887723 total (94.49%) +Dropped 41 nodes (cum <= 1039438) +Showing top 10 nodes out of 31 (cum >= 8566234) + flat flat% sum% cum cum% + 55529704 26.71% 26.71% 55529704 26.71% github.com/dgraph-io/dgraph/rdf.Parse + 28255068 13.59% 40.30% 30647245 14.74% github.com/dgraph-io/dgraph/posting.(*List).getPostingList + 20406729 9.82% 50.12% 20406729 9.82% github.com/zond/gotomic.newRealEntryWithHashCode + 17777182 8.55% 58.67% 17777182 8.55% strings.makeCutsetFunc + 17582839 8.46% 67.13% 17706815 8.52% github.com/dgraph-io/dgraph/loader.(*state).readLines + 15139047 7.28% 74.41% 88445933 42.55% github.com/dgraph-io/dgraph/loader.(*state).parseStream + 12927366 6.22% 80.63% 12927366 6.22% github.com/zond/gotomic.(*element).search + 10789028 5.19% 85.82% 66411362 31.95% github.com/dgraph-io/dgraph/posting.GetOrCreate + 9453856 4.55% 90.37% 9453856 4.55% github.com/zond/gotomic.(*hashHit).search + 8566234 4.12% 94.49% 8566234 4.12% github.com/dgraph-io/dgraph/uid.stringKey + + +(pprof) list rdf.Parse +Total: 207887723 +ROUTINE ======================== github.com/dgraph-io/dgraph/rdf.Parse in /home/mrjn/go/src/github.com/dgraph-io/dgraph/rdf/parse.go + 55529704 55529704 (flat, cum) 26.71% of Total + . . 118: } + . . 119: return val[1 : len(val)-1] + . . 120:} + . . 121: + . . 122:func Parse(line string) (rnq NQuad, rerr error) { + 54857942 54857942 123: l := lex.NewLexer(line) + . . 124: go run(l) + . . 125: var oval string + . . 126: var vend bool + + +This showed that lex.NewLexer(..) was pretty expensive in terms of memory allocation. +So, let's use sync.Pool here. + +After using sync.Pool, this is the output: + +422808936 of 560381333 total (75.45%) +Dropped 63 nodes (cum <= 2801906) +Showing top 10 nodes out of 62 (cum >= 18180150) + flat flat% sum% cum cum% + 103445194 18.46% 18.46% 103445194 18.46% github.com/Sirupsen/logrus.(*Entry).WithFields + 65448918 11.68% 30.14% 163184489 29.12% github.com/Sirupsen/logrus.(*Entry).WithField + 48366300 8.63% 38.77% 203838187 36.37% github.com/dgraph-io/dgraph/posting.(*List).get + 39789719 7.10% 45.87% 49276181 8.79% github.com/dgraph-io/dgraph/posting.(*List).getPostingList + 36642638 6.54% 52.41% 36642638 6.54% github.com/dgraph-io/dgraph/lex.NewLexer + 35190301 6.28% 58.69% 35190301 6.28% github.com/google/flatbuffers/go.(*Builder).growByteBuffer + 31392455 5.60% 64.29% 31392455 5.60% github.com/zond/gotomic.newRealEntryWithHashCode + 25895676 4.62% 68.91% 25895676 4.62% github.com/zond/gotomic.(*element).search + 18546971 3.31% 72.22% 72863016 13.00% github.com/dgraph-io/dgraph/loader.(*state).parseStream + 18090764 3.23% 75.45% 18180150 3.24% github.com/dgraph-io/dgraph/loader.(*state).readLines + +After a few more discussions, I realized that lexer didn't need to be allocated on the heap. +So, I switched it to be allocated on stack. These are the results. + +$ go tool pprof uidassigner heap.prof +Entering interactive mode (type "help" for commands) +(pprof) top10 +1308.70MB of 1696.59MB total (77.14%) +Dropped 73 nodes (cum <= 8.48MB) +Showing top 10 nodes out of 52 (cum >= 161.50MB) + flat flat% sum% cum cum% + 304.56MB 17.95% 17.95% 304.56MB 17.95% github.com/dgraph-io/dgraph/posting.NewList + 209.55MB 12.35% 30.30% 209.55MB 12.35% github.com/Sirupsen/logrus.(*Entry).WithFields + 207.55MB 12.23% 42.54% 417.10MB 24.58% github.com/Sirupsen/logrus.(*Entry).WithField + 108MB 6.37% 48.90% 108MB 6.37% github.com/dgraph-io/dgraph/uid.(*lockManager).newOrExisting + 88MB 5.19% 54.09% 88MB 5.19% github.com/zond/gotomic.newMockEntry + 85.51MB 5.04% 59.13% 85.51MB 5.04% github.com/google/flatbuffers/go.(*Builder).growByteBuffer + 78.01MB 4.60% 63.73% 78.01MB 4.60% github.com/dgraph-io/dgraph/posting.Key + 78.01MB 4.60% 68.32% 78.51MB 4.63% github.com/dgraph-io/dgraph/uid.stringKey + 76MB 4.48% 72.80% 76MB 4.48% github.com/zond/gotomic.newRealEntryWithHashCode + 73.50MB 4.33% 77.14% 161.50MB 9.52% github.com/zond/gotomic.(*Hash).getBucketByIndex + +Now, rdf.Parse is no longer shows up in memory profiler. Win! diff --git a/rdf/parse.go b/rdf/parse.go index 0a8c8f6a2651366e161032debe2e5c8e30b6af40..732e851bfb8857e937a9ea226e768a0648756374 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -120,7 +120,9 @@ func stripBracketsIfPresent(val string) string { } func Parse(line string) (rnq NQuad, rerr error) { - l := lex.NewLexer(line) + l := &lex.Lexer{} + l.Init(line) + go run(l) var oval string var vend bool diff --git a/server/main.go b/server/main.go index 2c857348e0e99f9fffd9ab054ef6c7899f193a42..18c0bed29fa3146e067e46e5c809e92ae8ed58a0 100644 --- a/server/main.go +++ b/server/main.go @@ -88,6 +88,7 @@ func mutationHandler(mu *gql.Mutation) error { } } if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil { + glog.WithError(err).Error("GetOrAssignUidsOverNetwork") return err } @@ -143,7 +144,20 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) return } - mutationHandler(mu) + + // If we have mutations, run them first. + if mu != nil && len(mu.Set) > 0 { + if err = mutationHandler(mu); err != nil { + glog.WithError(err).Error("While handling mutations.") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + } + + if gq == nil || (gq.UID == 0 && len(gq.XID) == 0) { + x.SetStatus(w, x.E_OK, "Done") + return + } sg, err := query.ToSubGraph(gq) if err != nil { @@ -204,6 +218,10 @@ func main() { addrs := strings.Split(*workers, ",") lenAddr := uint64(len(addrs)) + if lenAddr == 0 { + // If no worker is specified, then we're it. + lenAddr = 1 + } posting.Init(clog) diff --git a/server/uidassigner/README.txt b/server/uidassigner/README.txt new file mode 100644 index 0000000000000000000000000000000000000000..30769517a5d135b909564d648426547ba533c0b9 --- /dev/null +++ b/server/uidassigner/README.txt @@ -0,0 +1,17 @@ +(pprof) top10 +4.19mins of 7.46mins total (56.23%) +Dropped 569 nodes (cum <= 0.75mins) +Showing top 10 nodes out of 25 (cum >= 2.78mins) + flat flat% sum% cum cum% + 1.71mins 22.86% 22.86% 4.03mins 54.02% runtime.scanobject + 1.16mins 15.62% 38.48% 1.16mins 15.62% runtime.heapBitsForObject + 0.95mins 12.77% 51.25% 1.21mins 16.16% runtime.greyobject + 0.19mins 2.56% 53.81% 3.46mins 46.41% runtime.mallocgc + 0.07mins 0.88% 54.69% 1.08mins 14.53% runtime.mapassign1 + 0.04mins 0.48% 55.17% 3.82mins 51.17% runtime.systemstack + 0.03mins 0.35% 55.52% 0.92mins 12.36% runtime.gcDrain + 0.02mins 0.26% 55.78% 1.42mins 18.98% github.com/dgraph-io/dgraph/rdf.Parse + 0.02mins 0.23% 56.01% 0.92mins 12.29% runtime.newobject + 0.02mins 0.22% 56.23% 2.78mins 37.25% runtime.gcDrainN + +The above CPU profile was generated before using any `sync.Pool` diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go index 2e1546da0a978442b43d541caae5fade797bea33..0a00382461b9ad5487637999494c3067f46b0c92 100644 --- a/server/uidassigner/main.go +++ b/server/uidassigner/main.go @@ -27,6 +27,7 @@ var numInstances = flag.Uint64("numInstances", 1, var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") +var memprofile = flag.String("memprofile", "", "write memory profile to file") var numcpu = flag.Int("numCpu", runtime.NumCPU(), "Number of cores to be used by the process") @@ -93,4 +94,13 @@ func main() { } glog.Info("Calling merge lists") posting.MergeLists(100 * numCpus) // 100 per core. + + if len(*memprofile) > 0 { + f, err := os.Create(*memprofile) + if err != nil { + glog.Fatal(err) + } + pprof.WriteHeapProfile(f) + f.Close() + } } diff --git a/uid/assigner.go b/uid/assigner.go index ef835558cfb37e7e61065d44821283acc11a8999..d7384fc3f63b191c9be1054c642f6e4f4a220037 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -33,6 +33,11 @@ import ( var glog = x.Log("uid") var lmgr *lockManager var uidStore *store.Store +var eidPool = sync.Pool{ + New: func() interface{} { + return new(entry) + }, +} type entry struct { sync.Mutex @@ -63,7 +68,7 @@ func (lm *lockManager) newOrExisting(xid string) *entry { if e, ok := lm.locks[xid]; ok { return e } - e := new(entry) + e := eidPool.Get().(*entry) e.ts = time.Now() lm.locks[xid] = e return e @@ -78,6 +83,7 @@ func (lm *lockManager) clean() { if e.isOld() { count += 1 delete(lm.locks, xid) + eidPool.Put(e) } } lm.Unlock() @@ -91,6 +97,7 @@ func (lm *lockManager) clean() { func init() { lmgr = new(lockManager) lmgr.locks = make(map[string]*entry) + // TODO(manishrjain): This map should be cleaned up. // go lmgr.clean() } @@ -181,9 +188,8 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, } func stringKey(xid string) []byte { - buf := new(bytes.Buffer) - buf.WriteString("_uid_") - buf.WriteString("|") + var buf bytes.Buffer + buf.WriteString("_uid_|") buf.WriteString(xid) return buf.Bytes() } diff --git a/worker/assign.go b/worker/assign.go index 52c4a721fd87be768a1ff26ff8481a4294571938..d74fea052e2b86c97366e08b14fc585812bd2072 100644 --- a/worker/assign.go +++ b/worker/assign.go @@ -1,6 +1,7 @@ package worker import ( + "fmt" "sync" "github.com/dgraph-io/dgraph/conn" @@ -33,6 +34,10 @@ func createXidListBuffer(xids map[string]uint64) []byte { func getOrAssignUids( xidList *task.XidList) (uidList []byte, rerr error) { + if xidList.XidsLength() == 0 { + return uidList, fmt.Errorf("Empty xid list") + } + wg := new(sync.WaitGroup) uids := make([]uint64, xidList.XidsLength()) che := make(chan error, xidList.XidsLength()) @@ -40,15 +45,15 @@ func getOrAssignUids( wg.Add(1) xid := string(xidList.Xids(i)) - go func() { + go func(idx int) { defer wg.Done() u, err := uid.GetOrAssign(xid, 0, 1) if err != nil { che <- err return } - uids[i] = u - }() + uids[idx] = u + }(i) } wg.Wait() close(che) diff --git a/worker/mutation.go b/worker/mutation.go index 8b204b78a542e3c62167aa945e4e57a1d2e407c0..37d6a4d4a97348b360795d2f657165a68a8155f2 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -42,6 +42,7 @@ func mutate(m *Mutations, left *Mutations) error { return fmt.Errorf("predicate fingerprint doesn't match this instance.") } + glog.WithField("edge", edge).Debug("mutate") key := posting.Key(edge.Entity, edge.Attribute) plist := posting.GetOrCreate(key, dataStore) if err := plist.AddMutation(edge, posting.Set); err != nil { @@ -93,6 +94,7 @@ func MutateOverNetwork( mu := mutationArray[idx] if mu == nil { mu = new(Mutations) + mutationArray[idx] = mu } mu.Set = append(mu.Set, edge) } diff --git a/worker/task.go b/worker/task.go index 5ce45ff3ee0021810ba9285e29042896ebdef395..bdcdcf4ba534ae0442d6594dacd9e3358750c1b5 100644 --- a/worker/task.go +++ b/worker/task.go @@ -16,6 +16,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { attr := string(q.Attr()) idx := farm.Fingerprint64([]byte(attr)) % numInstances + glog.WithField("idx", idx).WithField("attr", attr). + WithField("numInstances", numInstances).Debug("ProcessTaskOverNetwork") var runHere bool if attr == "_xid_" || attr == "_uid_" { @@ -39,8 +41,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { if err := pool.Call("Worker.ServeTask", query, reply); err != nil { glog.WithField("call", "Worker.ServeTask").Fatal(err) } - glog.WithField("reply", string(reply.Data)).WithField("addr", addr). - Info("Got reply from server") + glog.WithField("reply_len", len(reply.Data)).WithField("addr", addr). + Debug("Got reply from server") return reply.Data, nil } diff --git a/worker/worker.go b/worker/worker.go index 3fd932e8b4847664456b14a843f9f4376fe97c0b..6a2871013a90345dada6261802bcf85c02494508 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -126,6 +126,7 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { q := new(task.Query) q.Init(query.Data, uo) attr := string(q.Attr()) + glog.WithField("attr", attr).Debug("ServeTask") if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { reply.Data, rerr = processTask(query.Data)