diff --git a/commit/log.go b/commit/log.go index fc478310ee710b77b2b49819c794fc0e86b4982a..46bbc1f44349ce70a9d3e38ce75534571254b2eb 100644 --- a/commit/log.go +++ b/commit/log.go @@ -243,6 +243,7 @@ func lastTimestamp(path string) (int64, error) { return 0, err } + discard := make([]byte, 4096) reader := bufio.NewReaderSize(f, 2<<20) var maxTs int64 header := make([]byte, 16) @@ -277,7 +278,10 @@ func lastTimestamp(path string) (int64, error) { }).Fatal("Log file doesn't have monotonically increasing records.") } - reader.Discard(int(h.size)) + for int(h.size) > len(discard) { + discard = make([]byte, len(discard)*2) + } + reader.Read(discard[:int(h.size)]) } return maxTs, nil } @@ -396,6 +400,7 @@ func streamEntriesInFile(path string, } defer f.Close() + discard := make([]byte, 4096) reader := bufio.NewReaderSize(f, 5<<20) header := make([]byte, 16) for { @@ -429,7 +434,10 @@ func streamEntriesInFile(path string, ch <- data } else { - reader.Discard(int(hdr.size)) + for int(hdr.size) > len(discard) { + discard = make([]byte, len(discard)*2) + } + reader.Read(discard[:int(hdr.size)]) } } return nil diff --git a/notes.txt b/notes.txt new file mode 100644 index 0000000000000000000000000000000000000000..cf94b51250fab9a13436dfeeda87eccc275ead7a --- /dev/null +++ b/notes.txt @@ -0,0 +1 @@ +Have to increase the file limit set in Ubuntu to avoid the "too many open files" issue, probably caused by goroutines opening up RocksDB SST files. diff --git a/posting/list.go b/posting/list.go index bbca733f44f1ca25bb4f4782e2fe2f9661163545..af53ede64117964f5f91aed1224bc79ab9746d07 100644 --- a/posting/list.go +++ b/posting/list.go @@ -85,6 +85,22 @@ func (pa ByUid) Len() int { return len(pa) } func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] } func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() } +func samePosting(a *types.Posting, b *types.Posting) bool { + if a.Uid() != b.Uid() { + return false + } + if a.ValueLength() != b.ValueLength() { + return false + } + if !bytes.Equal(a.ValueBytes(), b.ValueBytes()) { + return false + } + if !bytes.Equal(a.Source(), b.Source()) { + return false + } + return true +} + // key = (entity uid, attribute) func Key(uid uint64, attr string) []byte { buf := new(bytes.Buffer) @@ -455,6 +471,15 @@ func (l *List) mergeMutation(mp *types.Posting) { } else { // curUid not found in mindex. if inPlist { // In plist, so just set it in mlayer. + // If this posting matches what we already have in posting list, + // we don't need to `dirty` this by adding to mlayer. + plist := l.getPostingList() + var cp types.Posting + if ok := plist.Postings(&cp, pi); ok { + if samePosting(&cp, mp) { + return // do nothing. + } + } l.mlayer[pi] = *mp } else { // not in plist, not in mindex, so insert in mindex. diff --git a/posting/lists.go b/posting/lists.go index 0edf382024b33b79b726dd4219cd0ffa09740981..58e1e431e3f56a932cfbf138dcbd756cbb6fff6f 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -19,6 +19,7 @@ package posting import ( "math/rand" "runtime" + "runtime/debug" "sync" "sync/atomic" "time" @@ -57,7 +58,7 @@ var MIB uint64 func checkMemoryUsage() { MIB = 1 << 20 - MAX_MEMORY = 2 * (1 << 30) + MAX_MEMORY = 3 * (1 << 30) for _ = range time.Tick(5 * time.Second) { var ms runtime.MemStats @@ -78,6 +79,8 @@ func checkMemoryUsage() { glog.Info("Merged lists. Calling GC.") runtime.GC() // Call GC to do some cleanup. + glog.Info("Trying to free OS memory") + debug.FreeOSMemory() runtime.ReadMemStats(&ms) megs = ms.Alloc / MIB diff --git a/server/loader/main.go b/server/loader/main.go index 70f662deb3484c407095689cd3c646ebde17db6b..7cb23237372092693432e52e069a063bb01dd586 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -58,8 +58,11 @@ func main() { } logrus.SetLevel(logrus.InfoLevel) - numCpus := runtime.GOMAXPROCS(-1) - glog.WithField("gomaxprocs", numCpus).Info("Number of CPUs") + numCpus := runtime.NumCPU() + prevProcs := runtime.GOMAXPROCS(numCpus) + glog.WithField("num_cpu", numCpus). + WithField("prev_maxprocs", prevProcs). + Info("Set max procs to num cpus") if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") diff --git a/server/main.go b/server/main.go index 35d94a163ed15e0ed7b50df319a7b4c337616035..ec9d02a3d9c5520763dae5181c5f88a2069384c1 100644 --- a/server/main.go +++ b/server/main.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net/http" "runtime" + "time" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" @@ -51,14 +52,16 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") return } - glog.WithField("q", string(q)).Info("Query received.") + glog.WithField("q", string(q)).Debug("Query received.") + now := time.Now() sg, err := gql.Parse(string(q)) if err != nil { x.Err(glog, err).Error("While parsing query") x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) return } - glog.WithField("q", string(q)).Info("Query parsed.") + parseLat := time.Since(now) + glog.WithField("q", string(q)).Debug("Query parsed.") rch := make(chan error) go query.ProcessGraph(sg, rch) err = <-rch @@ -67,13 +70,23 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { x.SetStatus(w, x.E_ERROR, err.Error()) return } - glog.WithField("q", string(q)).Info("Graph processed.") + processLat := time.Since(now) + glog.WithField("q", string(q)).Debug("Graph processed.") js, err := sg.ToJson() if err != nil { x.Err(glog, err).Error("While converting to Json.") x.SetStatus(w, x.E_ERROR, err.Error()) return } + jsonLat := time.Since(now) + glog.WithFields(logrus.Fields{ + "total": time.Since(now), + "parsing": parseLat.String(), + "process": processLat - parseLat, + "json": jsonLat - processLat, + }).Info("Query Latencies") + + glog.WithField("latency", time.Since(now).String()).Info("Query Latency") w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, string(js)) } @@ -84,7 +97,11 @@ func main() { glog.Fatal("Unable to parse flags") } logrus.SetLevel(logrus.InfoLevel) - glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs") + numCpus := runtime.NumCPU() + prev := runtime.GOMAXPROCS(numCpus) + glog.WithField("num_cpu", numCpus). + WithField("prev_maxprocs", prev). + Info("Set max procs to num cpus") ps := new(store.Store) ps.Init(*postingDir) diff --git a/server/notes.txt b/server/notes.txt new file mode 100644 index 0000000000000000000000000000000000000000..51be067be115df3d0cba2edf737f4c58f8062670 --- /dev/null +++ b/server/notes.txt @@ -0,0 +1,29 @@ +curl localhost:8080/query -XPOST -d '{ +me(_xid_: m.06pj8) { + type.object.name.en + film.director.film { + type.object.name.en + film.film.starring { + film.performance.actor { + film.director.film { + type.object.name.en + } + type.object.name.en + } + } + film.film.initial_release_date + film.film.country + film.film.genre { + type.object.name.en + } + } +} +}' > output.json + +INFO[0554] Query Latencies json=35.434904ms package=server parsing=93.385µs process=24.785928ms total=60.314523ms + +The first time this query runs, it's a bit slower, taking around 250ms in processing, mostly because we have to load +the posting lists from rocksdb, and use disk seeks. But, on consecutive runs, when the PLs are already in memory, +this takes under 25ms for processing. IT actually takes more time to convert the results to JSON! + +My reactions are mixed. I'm happy, but unsatisfied.