Skip to content
Snippets Groups Projects
Commit e546b010 authored by Manish R Jain's avatar Manish R Jain
Browse files

Real world use cases. Allow for dots in names. Handle backspace. Run automatic...

Real world use cases. Allow for dots in names. Handle backspace. Run automatic mutation merging. Load multiple rdf gzips, etc.
parent f9dddbf9
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ import (
"fmt"
"testing"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/query"
)
......@@ -75,6 +76,31 @@ func TestParse(t *testing.T) {
}
}
func TestParseXid(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
query := `
query {
user(_uid_: 0x11) {
type.object.name
}
}`
sg, err := Parse(query)
if err != nil {
t.Error(err)
return
}
if sg == nil {
t.Error("subgraph is nil")
return
}
if len(sg.Children) != 1 {
t.Errorf("Expected 1 children. Got: %v", len(sg.Children))
}
if err := checkAttr(sg.Children[0], "type.object.name"); err != nil {
t.Error(err)
}
}
func TestParse_error1(t *testing.T) {
query := `
mutation {
......
......@@ -234,5 +234,8 @@ func isNameSuffix(r rune) bool {
if r >= '0' && r <= '9' {
return true
}
if r == '.' {
return true
}
return false
}
......@@ -24,6 +24,7 @@ import (
"fmt"
"math"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
......@@ -47,11 +48,12 @@ type MutationLink struct {
type List struct {
sync.RWMutex
key []byte
hash uint32
buffer []byte
pstore *store.Store // postinglist store
clog *commit.Logger
key []byte
hash uint32
buffer []byte
pstore *store.Store // postinglist store
clog *commit.Logger
lastCompact time.Time
// Mutations
mlayer map[int]types.Posting // stores only replace instructions.
......@@ -569,20 +571,37 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf)
}
func (l *List) isDirty() bool {
func (l *List) IsDirty() bool {
l.RLock()
defer l.RUnlock()
return len(l.mindex)+len(l.mlayer) > 0
}
func (l *List) CommitIfDirty() error {
if !l.isDirty() {
func (l *List) DirtyRatio() float64 {
l.RLock()
defer l.RUnlock()
d := len(l.mindex) + len(l.mlayer)
plist := types.GetRootAsPostingList(l.buffer, 0)
ln := plist.PostingsLength()
if ln == 0 {
return math.MaxFloat64
}
return float64(d) / float64(ln)
}
func (l *List) CompactIfDirty() error {
if !l.IsDirty() {
glog.WithField("dirty", false).Debug("Not Committing")
return nil
} else {
glog.WithField("dirty", true).Debug("Committing")
}
return l.compact()
}
func (l *List) compact() error {
l.Lock()
defer l.Unlock()
......@@ -615,12 +634,19 @@ func (l *List) CommitIfDirty() error {
}
// Now reset the mutation variables.
l.lastCompact = time.Now()
l.mlayer = make(map[int]types.Posting)
l.mdelta = 0
l.mindex = nil
return nil
}
func (l *List) LastCompactionTs() time.Time {
l.RLock()
defer l.RUnlock()
return l.lastCompact
}
func (l *List) GetUids() []uint64 {
l.RLock()
defer l.RUnlock()
......
......@@ -18,33 +18,52 @@ package posting
import (
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/store"
"github.com/dgryski/go-farm"
)
type entry struct {
l *List
}
var lmutex sync.RWMutex
var lcache map[uint64]*List
var lcache map[uint64]*entry
var pstore *store.Store
var clog *commit.Logger
var ch chan uint64
func Init(posting *store.Store, log *commit.Logger) {
lmutex.Lock()
defer lmutex.Unlock()
lcache = make(map[uint64]*List)
lcache = make(map[uint64]*entry)
pstore = posting
clog = log
ch = make(chan uint64, 1000)
go queueForProcessing()
go process()
}
func get(k uint64) *List {
lmutex.RLock()
defer lmutex.RUnlock()
if e, ok := lcache[k]; ok {
return e.l
}
return nil
}
func Get(key []byte) *List {
// Acquire read lock and check if list is available.
lmutex.RLock()
uid := farm.Fingerprint64(key)
if list, ok := lcache[uid]; ok {
if e, ok := lcache[uid]; ok {
lmutex.RUnlock()
return list
return e.l
}
lmutex.RUnlock()
......@@ -52,12 +71,61 @@ func Get(key []byte) *List {
lmutex.Lock()
defer lmutex.Unlock()
// Check again after acquiring write lock.
if list, ok := lcache[uid]; ok {
return list
if e, ok := lcache[uid]; ok {
return e.l
}
list := new(List)
list.init(key, pstore, clog)
lcache[uid] = list
return list
e := new(entry)
e.l = new(List)
e.l.init(key, pstore, clog)
lcache[uid] = e
return e.l
}
func queueForProcessing() {
ticker := time.NewTicker(time.Minute)
for _ = range ticker.C {
count := 0
skipped := 0
lmutex.RLock()
now := time.Now()
for eid, e := range lcache {
if len(ch) >= cap(ch) {
break
}
if len(ch) < int(0.3*float32(cap(ch))) && e.l.IsDirty() {
// Let's add some work here.
ch <- eid
count += 1
} else if now.Sub(e.l.LastCompactionTs()) > 10*time.Minute {
// Only queue lists which haven't been processed for a while.
ch <- eid
count += 1
} else {
skipped += 1
}
}
lmutex.RUnlock()
glog.WithFields(logrus.Fields{
"added": count,
"skipped": skipped,
"pending": len(ch),
}).Info("Added for compaction")
}
}
func process() {
ticker := time.NewTicker(100 * time.Millisecond)
for _ = range ticker.C {
eid := <-ch // blocking.
l := get(eid)
if l == nil {
continue
}
glog.WithField("eid", eid).WithField("pending", len(ch)).
Info("Commiting list")
if err := l.CompactIfDirty(); err != nil {
glog.WithError(err).Error("While commiting dirty list.")
}
}
}
......@@ -51,9 +51,12 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
result.ValueId = oid
} else {
result.Value = nq.ObjectValue
// TODO: Handle Language
}
result.Attribute = nq.Predicate
if len(nq.Language) > 0 {
result.Attribute = nq.Predicate + "." + nq.Language
} else {
result.Attribute = nq.Predicate
}
result.Source = nq.Label
result.Timestamp = time.Now()
return result, nil
......
......@@ -185,6 +185,14 @@ var testNQuads = []struct {
ObjectId: "bob",
},
},
{
input: `_:alice <likes> "mov\"enpick" .`, // ignores the <bob> after dot.
nq: NQuad{
Subject: "_:alice",
Predicate: "likes",
ObjectValue: `mov\"enpick`,
},
},
}
func TestLex(t *testing.T) {
......
......@@ -184,7 +184,19 @@ func lexLanguage(l *lex.Lexer) lex.StateFn {
// Assumes '"' has already been encountered.
func lexLiteral(l *lex.Lexer) lex.StateFn {
l.AcceptUntil(isEndLiteral)
for {
r := l.Next()
if r == '\u005c' { // backslash
r = l.Next()
continue // This would skip over the escaped rune.
}
if r == lex.EOF || isEndLiteral(r) {
break
}
}
l.Backup()
l.Emit(itemLiteral)
l.Next() // Move to end literal.
l.Ignore() // Ignore end literal.
......
......@@ -17,12 +17,16 @@
package main
import (
"compress/gzip"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"sync/atomic"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
......@@ -32,24 +36,54 @@ import (
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-farm"
)
var glog = x.Log("rdf")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
var rdfData = flag.String("rdfdata", "", "File containing RDF data")
var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data")
var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
var port = flag.String("port", "8080", "Port to run server on.")
type counters struct {
read uint64
processed uint64
ignored uint64
}
func handleRdfReader(reader io.Reader) (int, error) {
func printCounters(ticker *time.Ticker, c *counters) {
for _ = range ticker.C {
glog.WithFields(logrus.Fields{
"read": atomic.LoadUint64(&c.read),
"processed": atomic.LoadUint64(&c.processed),
"ignored": atomic.LoadUint64(&c.ignored),
}).Info("Counters")
}
}
// Blocking function.
func handleRdfReader(reader io.Reader) (uint64, error) {
cnq := make(chan rdf.NQuad, 1000)
done := make(chan error)
ctr := new(counters)
ticker := time.NewTicker(time.Second)
go rdf.ParseStream(reader, cnq, done)
count := 0
go printCounters(ticker, ctr)
Loop:
for {
select {
case nq := <-cnq:
atomic.AddUint64(&ctr.read, 1)
if farm.Fingerprint64([]byte(nq.Subject))%*mod != 0 {
// Ignore due to mod sampling.
atomic.AddUint64(&ctr.ignored, 1)
break
}
edge, err := nq.ToEdge()
if err != nil {
x.Err(glog, err).WithField("nq", nq).Error("While converting to edge")
......@@ -58,7 +92,8 @@ Loop:
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.Get(key)
plist.AddMutation(edge, posting.Set)
count += 1
atomic.AddUint64(&ctr.processed, 1)
case err := <-done:
if err != nil {
x.Err(glog, err).Error("While reading request")
......@@ -67,7 +102,8 @@ Loop:
break Loop
}
}
return count, nil
ticker.Stop()
return atomic.LoadUint64(&ctr.processed), nil
}
func rdfHandler(w http.ResponseWriter, r *http.Request) {
......@@ -99,12 +135,14 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
return
}
glog.WithField("q", string(q)).Info("Query received.")
sg, err := gql.Parse(string(q))
if err != nil {
x.Err(glog, err).Error("While parsing query")
x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
return
}
glog.WithField("q", string(q)).Info("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch)
err = <-rch
......@@ -113,6 +151,7 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.E_ERROR, err.Error())
return
}
glog.WithField("q", string(q)).Info("Graph processed.")
js, err := sg.ToJson()
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
......@@ -128,32 +167,47 @@ func main() {
if !flag.Parsed() {
glog.Fatal("Unable to parse flags")
}
logrus.SetLevel(logrus.DebugLevel)
logrus.SetLevel(logrus.InfoLevel)
ps := new(store.Store)
ps.Init(*postingDir)
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.SyncEvery = 1000
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
if len(*rdfData) > 0 {
f, err := os.Open(*rdfData)
if err != nil {
glog.Fatal(err)
}
defer f.Close()
if len(*rdfGzips) > 0 {
files := strings.Split(*rdfGzips, ",")
for _, path := range files {
if len(path) == 0 {
continue
}
glog.WithField("path", path).Info("Handling...")
f, err := os.Open(path)
if err != nil {
glog.WithError(err).Fatal("Unable to open rdf file.")
}
count, err := handleRdfReader(f)
if err != nil {
glog.Fatal(err)
r, err := gzip.NewReader(f)
if err != nil {
glog.WithError(err).Fatal("Unable to create gzip reader.")
}
count, err := handleRdfReader(r)
if err != nil {
glog.Fatal(err)
}
glog.WithField("count", count).Info("RDFs parsed")
r.Close()
f.Close()
}
glog.WithField("count", count).Debug("RDFs parsed")
}
http.HandleFunc("/rdf", rdfHandler)
http.HandleFunc("/query", queryHandler)
if err := http.ListenAndServe(":8080", nil); err != nil {
glog.WithField("port", *port).Info("Listening for requests...")
if err := http.ListenAndServe(":"+*port, nil); err != nil {
x.Err(glog, err).Fatal("ListenAndServe")
}
}
......@@ -65,9 +65,6 @@ func allocateNew(xid string) (uid uint64, rerr error) {
if rerr != nil {
x.Err(log, rerr).Error("While adding mutation")
}
if err := pl.CommitIfDirty(); err != nil {
x.Err(log, err).Error("While commiting")
}
return uid, rerr
}
return 0, errors.New("Some unhandled route lead me here." +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment