Skip to content
Snippets Groups Projects
Commit ca369823 authored by Manish R Jain's avatar Manish R Jain
Browse files

Loaded up 21M edges, all of film data + names. And ran a complex query, with a 60ms latency!

parent 879a3329
No related branches found
No related tags found
No related merge requests found
...@@ -243,6 +243,7 @@ func lastTimestamp(path string) (int64, error) { ...@@ -243,6 +243,7 @@ func lastTimestamp(path string) (int64, error) {
return 0, err return 0, err
} }
discard := make([]byte, 4096)
reader := bufio.NewReaderSize(f, 2<<20) reader := bufio.NewReaderSize(f, 2<<20)
var maxTs int64 var maxTs int64
header := make([]byte, 16) header := make([]byte, 16)
...@@ -277,7 +278,10 @@ func lastTimestamp(path string) (int64, error) { ...@@ -277,7 +278,10 @@ func lastTimestamp(path string) (int64, error) {
}).Fatal("Log file doesn't have monotonically increasing records.") }).Fatal("Log file doesn't have monotonically increasing records.")
} }
reader.Discard(int(h.size)) for int(h.size) > len(discard) {
discard = make([]byte, len(discard)*2)
}
reader.Read(discard[:int(h.size)])
} }
return maxTs, nil return maxTs, nil
} }
...@@ -396,6 +400,7 @@ func streamEntriesInFile(path string, ...@@ -396,6 +400,7 @@ func streamEntriesInFile(path string,
} }
defer f.Close() defer f.Close()
discard := make([]byte, 4096)
reader := bufio.NewReaderSize(f, 5<<20) reader := bufio.NewReaderSize(f, 5<<20)
header := make([]byte, 16) header := make([]byte, 16)
for { for {
...@@ -429,7 +434,10 @@ func streamEntriesInFile(path string, ...@@ -429,7 +434,10 @@ func streamEntriesInFile(path string,
ch <- data ch <- data
} else { } else {
reader.Discard(int(hdr.size)) for int(hdr.size) > len(discard) {
discard = make([]byte, len(discard)*2)
}
reader.Read(discard[:int(hdr.size)])
} }
} }
return nil return nil
......
Have to increase the file limit set in Ubuntu to avoid the "too many open files" issue, probably caused by goroutines opening up RocksDB SST files.
...@@ -85,6 +85,22 @@ func (pa ByUid) Len() int { return len(pa) } ...@@ -85,6 +85,22 @@ func (pa ByUid) Len() int { return len(pa) }
func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] } func (pa ByUid) Swap(i, j int) { pa[i], pa[j] = pa[j], pa[i] }
func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() } func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }
func samePosting(a *types.Posting, b *types.Posting) bool {
if a.Uid() != b.Uid() {
return false
}
if a.ValueLength() != b.ValueLength() {
return false
}
if !bytes.Equal(a.ValueBytes(), b.ValueBytes()) {
return false
}
if !bytes.Equal(a.Source(), b.Source()) {
return false
}
return true
}
// key = (entity uid, attribute) // key = (entity uid, attribute)
func Key(uid uint64, attr string) []byte { func Key(uid uint64, attr string) []byte {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
...@@ -455,6 +471,15 @@ func (l *List) mergeMutation(mp *types.Posting) { ...@@ -455,6 +471,15 @@ func (l *List) mergeMutation(mp *types.Posting) {
} else { // curUid not found in mindex. } else { // curUid not found in mindex.
if inPlist { // In plist, so just set it in mlayer. if inPlist { // In plist, so just set it in mlayer.
// If this posting matches what we already have in posting list,
// we don't need to `dirty` this by adding to mlayer.
plist := l.getPostingList()
var cp types.Posting
if ok := plist.Postings(&cp, pi); ok {
if samePosting(&cp, mp) {
return // do nothing.
}
}
l.mlayer[pi] = *mp l.mlayer[pi] = *mp
} else { // not in plist, not in mindex, so insert in mindex. } else { // not in plist, not in mindex, so insert in mindex.
......
...@@ -19,6 +19,7 @@ package posting ...@@ -19,6 +19,7 @@ package posting
import ( import (
"math/rand" "math/rand"
"runtime" "runtime"
"runtime/debug"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -57,7 +58,7 @@ var MIB uint64 ...@@ -57,7 +58,7 @@ var MIB uint64
func checkMemoryUsage() { func checkMemoryUsage() {
MIB = 1 << 20 MIB = 1 << 20
MAX_MEMORY = 2 * (1 << 30) MAX_MEMORY = 3 * (1 << 30)
for _ = range time.Tick(5 * time.Second) { for _ = range time.Tick(5 * time.Second) {
var ms runtime.MemStats var ms runtime.MemStats
...@@ -78,6 +79,8 @@ func checkMemoryUsage() { ...@@ -78,6 +79,8 @@ func checkMemoryUsage() {
glog.Info("Merged lists. Calling GC.") glog.Info("Merged lists. Calling GC.")
runtime.GC() // Call GC to do some cleanup. runtime.GC() // Call GC to do some cleanup.
glog.Info("Trying to free OS memory")
debug.FreeOSMemory()
runtime.ReadMemStats(&ms) runtime.ReadMemStats(&ms)
megs = ms.Alloc / MIB megs = ms.Alloc / MIB
......
...@@ -58,8 +58,11 @@ func main() { ...@@ -58,8 +58,11 @@ func main() {
} }
logrus.SetLevel(logrus.InfoLevel) logrus.SetLevel(logrus.InfoLevel)
numCpus := runtime.GOMAXPROCS(-1) numCpus := runtime.NumCPU()
glog.WithField("gomaxprocs", numCpus).Info("Number of CPUs") prevProcs := runtime.GOMAXPROCS(numCpus)
glog.WithField("num_cpu", numCpus).
WithField("prev_maxprocs", prevProcs).
Info("Set max procs to num cpus")
if len(*rdfGzips) == 0 { if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified") glog.Fatal("No RDF GZIP files specified")
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"runtime" "runtime"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/commit"
...@@ -51,14 +52,16 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { ...@@ -51,14 +52,16 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return return
} }
glog.WithField("q", string(q)).Info("Query received.") glog.WithField("q", string(q)).Debug("Query received.")
now := time.Now()
sg, err := gql.Parse(string(q)) sg, err := gql.Parse(string(q))
if err != nil { if err != nil {
x.Err(glog, err).Error("While parsing query") x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return return
} }
glog.WithField("q", string(q)).Info("Query parsed.") parseLat := time.Since(now)
glog.WithField("q", string(q)).Debug("Query parsed.")
rch := make(chan error) rch := make(chan error)
go query.ProcessGraph(sg, rch) go query.ProcessGraph(sg, rch)
err = <-rch err = <-rch
...@@ -67,13 +70,23 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { ...@@ -67,13 +70,23 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.E_ERROR, err.Error()) x.SetStatus(w, x.E_ERROR, err.Error())
return return
} }
glog.WithField("q", string(q)).Info("Graph processed.") processLat := time.Since(now)
glog.WithField("q", string(q)).Debug("Graph processed.")
js, err := sg.ToJson() js, err := sg.ToJson()
if err != nil { if err != nil {
x.Err(glog, err).Error("While converting to Json.") x.Err(glog, err).Error("While converting to Json.")
x.SetStatus(w, x.E_ERROR, err.Error()) x.SetStatus(w, x.E_ERROR, err.Error())
return return
} }
jsonLat := time.Since(now)
glog.WithFields(logrus.Fields{
"total": time.Since(now),
"parsing": parseLat.String(),
"process": processLat - parseLat,
"json": jsonLat - processLat,
}).Info("Query Latencies")
glog.WithField("latency", time.Since(now).String()).Info("Query Latency")
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(js)) fmt.Fprint(w, string(js))
} }
...@@ -84,7 +97,11 @@ func main() { ...@@ -84,7 +97,11 @@ func main() {
glog.Fatal("Unable to parse flags") glog.Fatal("Unable to parse flags")
} }
logrus.SetLevel(logrus.InfoLevel) logrus.SetLevel(logrus.InfoLevel)
glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs") numCpus := runtime.NumCPU()
prev := runtime.GOMAXPROCS(numCpus)
glog.WithField("num_cpu", numCpus).
WithField("prev_maxprocs", prev).
Info("Set max procs to num cpus")
ps := new(store.Store) ps := new(store.Store)
ps.Init(*postingDir) ps.Init(*postingDir)
......
curl localhost:8080/query -XPOST -d '{
me(_xid_: m.06pj8) {
type.object.name.en
film.director.film {
type.object.name.en
film.film.starring {
film.performance.actor {
film.director.film {
type.object.name.en
}
type.object.name.en
}
}
film.film.initial_release_date
film.film.country
film.film.genre {
type.object.name.en
}
}
}
}' > output.json
INFO[0554] Query Latencies json=35.434904ms package=server parsing=93.385µs process=24.785928ms total=60.314523ms
The first time this query runs, it's a bit slower, taking around 250ms in processing, mostly because we have to load
the posting lists from rocksdb, and use disk seeks. But, on consecutive runs, when the PLs are already in memory,
this takes under 25ms for processing. IT actually takes more time to convert the results to JSON!
My reactions are mixed. I'm happy, but unsatisfied.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment