From e546b0103085ea13def0380741992c86cf9d1c38 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Mon, 16 Nov 2015 18:48:03 +1100
Subject: [PATCH] Real world use cases. Allow for dots in names. Handle
 backspace. Run automatic mutation merging. Load multiple rdf gzips, etc.

---
 gql/parser_test.go | 26 ++++++++++++++
 gql/state.go       |  3 ++
 posting/list.go    | 42 +++++++++++++++++-----
 posting/lists.go   | 88 +++++++++++++++++++++++++++++++++++++++------
 rdf/parse.go       |  7 ++--
 rdf/parse_test.go  |  8 +++++
 rdf/state.go       | 14 +++++++-
 server/main.go     | 90 ++++++++++++++++++++++++++++++++++++----------
 uid/assigner.go    |  3 --
 9 files changed, 239 insertions(+), 42 deletions(-)

diff --git a/gql/parser_test.go b/gql/parser_test.go
index 14d1ae2b..ab17f0a2 100644
--- a/gql/parser_test.go
+++ b/gql/parser_test.go
@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"testing"
 
+	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/query"
 )
 
@@ -75,6 +76,31 @@ func TestParse(t *testing.T) {
 	}
 }
 
+func TestParseXid(t *testing.T) {
+	logrus.SetLevel(logrus.DebugLevel)
+	query := `
+	query {
+		user(_uid_: 0x11) {
+			type.object.name
+		}
+	}`
+	sg, err := Parse(query)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	if sg == nil {
+		t.Error("subgraph is nil")
+		return
+	}
+	if len(sg.Children) != 1 {
+		t.Errorf("Expected 1 children. Got: %v", len(sg.Children))
+	}
+	if err := checkAttr(sg.Children[0], "type.object.name"); err != nil {
+		t.Error(err)
+	}
+}
+
 func TestParse_error1(t *testing.T) {
 	query := `
 		mutation {
diff --git a/gql/state.go b/gql/state.go
index 4fdb3859..3f4e531c 100644
--- a/gql/state.go
+++ b/gql/state.go
@@ -234,5 +234,8 @@ func isNameSuffix(r rune) bool {
 	if r >= '0' && r <= '9' {
 		return true
 	}
+	if r == '.' {
+		return true
+	}
 	return false
 }
diff --git a/posting/list.go b/posting/list.go
index f574c230..2ad8ffc0 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -24,6 +24,7 @@ import (
 	"fmt"
 	"math"
 	"sync"
+	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/commit"
@@ -47,11 +48,12 @@ type MutationLink struct {
 
 type List struct {
 	sync.RWMutex
-	key    []byte
-	hash   uint32
-	buffer []byte
-	pstore *store.Store // postinglist store
-	clog   *commit.Logger
+	key         []byte
+	hash        uint32
+	buffer      []byte
+	pstore      *store.Store // postinglist store
+	clog        *commit.Logger
+	lastCompact time.Time
 
 	// Mutations
 	mlayer        map[int]types.Posting // stores only replace instructions.
@@ -569,20 +571,37 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
 	return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf)
 }
 
-func (l *List) isDirty() bool {
+func (l *List) IsDirty() bool {
 	l.RLock()
 	defer l.RUnlock()
 	return len(l.mindex)+len(l.mlayer) > 0
 }
 
-func (l *List) CommitIfDirty() error {
-	if !l.isDirty() {
+func (l *List) DirtyRatio() float64 {
+	l.RLock()
+	defer l.RUnlock()
+
+	d := len(l.mindex) + len(l.mlayer)
+	plist := types.GetRootAsPostingList(l.buffer, 0)
+	ln := plist.PostingsLength()
+	if ln == 0 {
+		return math.MaxFloat64
+	}
+
+	return float64(d) / float64(ln)
+}
+
+func (l *List) CompactIfDirty() error {
+	if !l.IsDirty() {
 		glog.WithField("dirty", false).Debug("Not Committing")
 		return nil
 	} else {
 		glog.WithField("dirty", true).Debug("Committing")
 	}
+	return l.compact()
+}
 
+func (l *List) compact() error {
 	l.Lock()
 	defer l.Unlock()
 
@@ -615,12 +634,19 @@ func (l *List) CommitIfDirty() error {
 	}
 
 	// Now reset the mutation variables.
+	l.lastCompact = time.Now()
 	l.mlayer = make(map[int]types.Posting)
 	l.mdelta = 0
 	l.mindex = nil
 	return nil
 }
 
+func (l *List) LastCompactionTs() time.Time {
+	l.RLock()
+	defer l.RUnlock()
+	return l.lastCompact
+}
+
 func (l *List) GetUids() []uint64 {
 	l.RLock()
 	defer l.RUnlock()
diff --git a/posting/lists.go b/posting/lists.go
index 9bbcd589..78f73d09 100644
--- a/posting/lists.go
+++ b/posting/lists.go
@@ -18,33 +18,52 @@ package posting
 
 import (
 	"sync"
+	"time"
 
+	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/commit"
 	"github.com/dgraph-io/dgraph/store"
 	"github.com/dgryski/go-farm"
 )
 
+type entry struct {
+	l *List
+}
+
 var lmutex sync.RWMutex
-var lcache map[uint64]*List
+var lcache map[uint64]*entry
 var pstore *store.Store
 var clog *commit.Logger
+var ch chan uint64
 
 func Init(posting *store.Store, log *commit.Logger) {
 	lmutex.Lock()
 	defer lmutex.Unlock()
 
-	lcache = make(map[uint64]*List)
+	lcache = make(map[uint64]*entry)
 	pstore = posting
 	clog = log
+	ch = make(chan uint64, 1000)
+	go queueForProcessing()
+	go process()
+}
+
+func get(k uint64) *List {
+	lmutex.RLock()
+	defer lmutex.RUnlock()
+	if e, ok := lcache[k]; ok {
+		return e.l
+	}
+	return nil
 }
 
 func Get(key []byte) *List {
 	// Acquire read lock and check if list is available.
 	lmutex.RLock()
 	uid := farm.Fingerprint64(key)
-	if list, ok := lcache[uid]; ok {
+	if e, ok := lcache[uid]; ok {
 		lmutex.RUnlock()
-		return list
+		return e.l
 	}
 	lmutex.RUnlock()
 
@@ -52,12 +71,61 @@ func Get(key []byte) *List {
 	lmutex.Lock()
 	defer lmutex.Unlock()
 	// Check again after acquiring write lock.
-	if list, ok := lcache[uid]; ok {
-		return list
+	if e, ok := lcache[uid]; ok {
+		return e.l
 	}
 
-	list := new(List)
-	list.init(key, pstore, clog)
-	lcache[uid] = list
-	return list
+	e := new(entry)
+	e.l = new(List)
+	e.l.init(key, pstore, clog)
+	lcache[uid] = e
+	return e.l
+}
+
+func queueForProcessing() {
+	ticker := time.NewTicker(time.Minute)
+	for _ = range ticker.C {
+		count := 0
+		skipped := 0
+		lmutex.RLock()
+		now := time.Now()
+		for eid, e := range lcache {
+			if len(ch) >= cap(ch) {
+				break
+			}
+			if len(ch) < int(0.3*float32(cap(ch))) && e.l.IsDirty() {
+				// Let's add some work here.
+				ch <- eid
+				count += 1
+			} else if now.Sub(e.l.LastCompactionTs()) > 10*time.Minute {
+				// Only queue lists which haven't been processed for a while.
+				ch <- eid
+				count += 1
+			} else {
+				skipped += 1
+			}
+		}
+		lmutex.RUnlock()
+		glog.WithFields(logrus.Fields{
+			"added":   count,
+			"skipped": skipped,
+			"pending": len(ch),
+		}).Info("Added for compaction")
+	}
+}
+
+func process() {
+	ticker := time.NewTicker(100 * time.Millisecond)
+	for _ = range ticker.C {
+		eid := <-ch // blocking.
+		l := get(eid)
+		if l == nil {
+			continue
+		}
+		glog.WithField("eid", eid).WithField("pending", len(ch)).
+			Info("Commiting list")
+		if err := l.CompactIfDirty(); err != nil {
+			glog.WithError(err).Error("While commiting dirty list.")
+		}
+	}
 }
diff --git a/rdf/parse.go b/rdf/parse.go
index e67db806..8e12969c 100644
--- a/rdf/parse.go
+++ b/rdf/parse.go
@@ -51,9 +51,12 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
 		result.ValueId = oid
 	} else {
 		result.Value = nq.ObjectValue
-		// TODO: Handle Language
 	}
-	result.Attribute = nq.Predicate
+	if len(nq.Language) > 0 {
+		result.Attribute = nq.Predicate + "." + nq.Language
+	} else {
+		result.Attribute = nq.Predicate
+	}
 	result.Source = nq.Label
 	result.Timestamp = time.Now()
 	return result, nil
diff --git a/rdf/parse_test.go b/rdf/parse_test.go
index 5c71901c..66934d1e 100644
--- a/rdf/parse_test.go
+++ b/rdf/parse_test.go
@@ -185,6 +185,14 @@ var testNQuads = []struct {
 			ObjectId:  "bob",
 		},
 	},
+	{
+		input: `_:alice <likes> "mov\"enpick" .`, // ignores the <bob> after dot.
+		nq: NQuad{
+			Subject:     "_:alice",
+			Predicate:   "likes",
+			ObjectValue: `mov\"enpick`,
+		},
+	},
 }
 
 func TestLex(t *testing.T) {
diff --git a/rdf/state.go b/rdf/state.go
index 623cd373..70a40d11 100644
--- a/rdf/state.go
+++ b/rdf/state.go
@@ -184,7 +184,19 @@ func lexLanguage(l *lex.Lexer) lex.StateFn {
 
 // Assumes '"' has already been encountered.
 func lexLiteral(l *lex.Lexer) lex.StateFn {
-	l.AcceptUntil(isEndLiteral)
+	for {
+		r := l.Next()
+		if r == '\u005c' { // backslash
+			r = l.Next()
+			continue // This would skip over the escaped rune.
+		}
+
+		if r == lex.EOF || isEndLiteral(r) {
+			break
+		}
+	}
+	l.Backup()
+
 	l.Emit(itemLiteral)
 	l.Next()   // Move to end literal.
 	l.Ignore() // Ignore end literal.
diff --git a/server/main.go b/server/main.go
index 33f4f84a..8e7fb2fa 100644
--- a/server/main.go
+++ b/server/main.go
@@ -17,12 +17,16 @@
 package main
 
 import (
+	"compress/gzip"
 	"flag"
 	"fmt"
 	"io"
 	"io/ioutil"
 	"net/http"
 	"os"
+	"strings"
+	"sync/atomic"
+	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/dgraph-io/dgraph/commit"
@@ -32,24 +36,54 @@ import (
 	"github.com/dgraph-io/dgraph/rdf"
 	"github.com/dgraph-io/dgraph/store"
 	"github.com/dgraph-io/dgraph/x"
+	"github.com/dgryski/go-farm"
 )
 
 var glog = x.Log("rdf")
 
 var postingDir = flag.String("postings", "", "Directory to store posting lists")
 var mutationDir = flag.String("mutations", "", "Directory to store mutations")
-var rdfData = flag.String("rdfdata", "", "File containing RDF data")
+var rdfGzips = flag.String("rdfgzips", "",
+	"Comma separated gzip files containing RDF data")
+var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
+var port = flag.String("port", "8080", "Port to run server on.")
+
+type counters struct {
+	read      uint64
+	processed uint64
+	ignored   uint64
+}
 
-func handleRdfReader(reader io.Reader) (int, error) {
+func printCounters(ticker *time.Ticker, c *counters) {
+	for _ = range ticker.C {
+		glog.WithFields(logrus.Fields{
+			"read":      atomic.LoadUint64(&c.read),
+			"processed": atomic.LoadUint64(&c.processed),
+			"ignored":   atomic.LoadUint64(&c.ignored),
+		}).Info("Counters")
+	}
+}
+
+// Blocking function.
+func handleRdfReader(reader io.Reader) (uint64, error) {
 	cnq := make(chan rdf.NQuad, 1000)
 	done := make(chan error)
+	ctr := new(counters)
+	ticker := time.NewTicker(time.Second)
 	go rdf.ParseStream(reader, cnq, done)
-
-	count := 0
+	go printCounters(ticker, ctr)
 Loop:
 	for {
 		select {
 		case nq := <-cnq:
+			atomic.AddUint64(&ctr.read, 1)
+
+			if farm.Fingerprint64([]byte(nq.Subject))%*mod != 0 {
+				// Ignore due to mod sampling.
+				atomic.AddUint64(&ctr.ignored, 1)
+				break
+			}
+
 			edge, err := nq.ToEdge()
 			if err != nil {
 				x.Err(glog, err).WithField("nq", nq).Error("While converting to edge")
@@ -58,7 +92,8 @@ Loop:
 			key := posting.Key(edge.Entity, edge.Attribute)
 			plist := posting.Get(key)
 			plist.AddMutation(edge, posting.Set)
-			count += 1
+			atomic.AddUint64(&ctr.processed, 1)
+
 		case err := <-done:
 			if err != nil {
 				x.Err(glog, err).Error("While reading request")
@@ -67,7 +102,8 @@ Loop:
 			break Loop
 		}
 	}
-	return count, nil
+	ticker.Stop()
+	return atomic.LoadUint64(&ctr.processed), nil
 }
 
 func rdfHandler(w http.ResponseWriter, r *http.Request) {
@@ -99,12 +135,14 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
 		x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.")
 		return
 	}
+	glog.WithField("q", string(q)).Info("Query received.")
 	sg, err := gql.Parse(string(q))
 	if err != nil {
 		x.Err(glog, err).Error("While parsing query")
 		x.SetStatus(w, x.E_INVALID_REQUEST, err.Error())
 		return
 	}
+	glog.WithField("q", string(q)).Info("Query parsed.")
 	rch := make(chan error)
 	go query.ProcessGraph(sg, rch)
 	err = <-rch
@@ -113,6 +151,7 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
 		x.SetStatus(w, x.E_ERROR, err.Error())
 		return
 	}
+	glog.WithField("q", string(q)).Info("Graph processed.")
 	js, err := sg.ToJson()
 	if err != nil {
 		x.Err(glog, err).Error("While converting to Json.")
@@ -128,32 +167,47 @@ func main() {
 	if !flag.Parsed() {
 		glog.Fatal("Unable to parse flags")
 	}
-	logrus.SetLevel(logrus.DebugLevel)
+	logrus.SetLevel(logrus.InfoLevel)
 
 	ps := new(store.Store)
 	ps.Init(*postingDir)
 	clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
+	clog.SyncEvery = 1000
 	clog.Init()
 	defer clog.Close()
 	posting.Init(ps, clog)
 
-	if len(*rdfData) > 0 {
-		f, err := os.Open(*rdfData)
-		if err != nil {
-			glog.Fatal(err)
-		}
-		defer f.Close()
+	if len(*rdfGzips) > 0 {
+		files := strings.Split(*rdfGzips, ",")
+		for _, path := range files {
+			if len(path) == 0 {
+				continue
+			}
+			glog.WithField("path", path).Info("Handling...")
+			f, err := os.Open(path)
+			if err != nil {
+				glog.WithError(err).Fatal("Unable to open rdf file.")
+			}
 
-		count, err := handleRdfReader(f)
-		if err != nil {
-			glog.Fatal(err)
+			r, err := gzip.NewReader(f)
+			if err != nil {
+				glog.WithError(err).Fatal("Unable to create gzip reader.")
+			}
+
+			count, err := handleRdfReader(r)
+			if err != nil {
+				glog.Fatal(err)
+			}
+			glog.WithField("count", count).Info("RDFs parsed")
+			r.Close()
+			f.Close()
 		}
-		glog.WithField("count", count).Debug("RDFs parsed")
 	}
 
 	http.HandleFunc("/rdf", rdfHandler)
 	http.HandleFunc("/query", queryHandler)
-	if err := http.ListenAndServe(":8080", nil); err != nil {
+	glog.WithField("port", *port).Info("Listening for requests...")
+	if err := http.ListenAndServe(":"+*port, nil); err != nil {
 		x.Err(glog, err).Fatal("ListenAndServe")
 	}
 }
diff --git a/uid/assigner.go b/uid/assigner.go
index 277f20cc..953986d5 100644
--- a/uid/assigner.go
+++ b/uid/assigner.go
@@ -65,9 +65,6 @@ func allocateNew(xid string) (uid uint64, rerr error) {
 		if rerr != nil {
 			x.Err(log, rerr).Error("While adding mutation")
 		}
-		if err := pl.CommitIfDirty(); err != nil {
-			x.Err(log, err).Error("While commiting")
-		}
 		return uid, rerr
 	}
 	return 0, errors.New("Some unhandled route lead me here." +
-- 
GitLab