diff --git a/commit/log.go b/commit/log.go index 410b2627d14f37c0c18b096ac567a6f0b5edc4fa..552f0d1efc83280e7900d72c66d2c5d70ed0719f 100644 --- a/commit/log.go +++ b/commit/log.go @@ -75,6 +75,9 @@ type Logger struct { // Sync every d duration. SyncDur time.Duration + // Skip write to commit log to allow for testing. + SkipWrite bool + sync.RWMutex list []*logFile curFile *os.File @@ -332,6 +335,9 @@ func (l *Logger) AddLog(ts int64, hash uint32, value []byte) error { if ts < l.lastLogTs { return fmt.Errorf("Timestamp lower than last log timestamp.") } + if l.SkipWrite { + return nil + } buf := new(bytes.Buffer) var err error diff --git a/posting/list.go b/posting/list.go index 2ad8ffc03531dedcf34d3b3d320fa508475ced30..f6c307cd0c5150a6c886c3c4ce5be6e111fdc656 100644 --- a/posting/list.go +++ b/posting/list.go @@ -591,17 +591,17 @@ func (l *List) DirtyRatio() float64 { return float64(d) / float64(ln) } -func (l *List) CompactIfDirty() error { +func (l *List) MergeIfDirty() error { if !l.IsDirty() { glog.WithField("dirty", false).Debug("Not Committing") return nil } else { glog.WithField("dirty", true).Debug("Committing") } - return l.compact() + return l.merge() } -func (l *List) compact() error { +func (l *List) merge() error { l.Lock() defer l.Unlock() diff --git a/posting/list_test.go b/posting/list_test.go index 0ed2ee6be04150727b96197729f05e022b0cbbd6..a3d46bbfe1e47cfbef604c0bcc83e71fad5c7e84 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -181,7 +181,7 @@ func TestAddMutation(t *testing.T) { t.Error(err) } - if err := dl.CommitIfDirty(); err != nil { + if err := dl.MergeIfDirty(); err != nil { t.Error(err) } if err := checkUids(t, dl, uids...); err != nil { @@ -234,7 +234,7 @@ func TestAddMutation_Value(t *testing.T) { } // Run the same check after committing. - if err := ol.CommitIfDirty(); err != nil { + if err := ol.MergeIfDirty(); err != nil { t.Error(err) } { diff --git a/posting/lists.go b/posting/lists.go index 78f73d09af41f99d7aa7b781bc57c21b6a14ab7d..e445aa10a1d25bcea0f9fe520eb9fd21f8c3c7a4 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -124,7 +124,7 @@ func process() { } glog.WithField("eid", eid).WithField("pending", len(ch)). Info("Commiting list") - if err := l.CompactIfDirty(); err != nil { + if err := l.MergeIfDirty(); err != nil { glog.WithError(err).Error("While commiting dirty list.") } } diff --git a/rdf/parse.go b/rdf/parse.go index 8e12969c619620d80f603fa955e7d4f4d596c763..5d4dbe9a96d4d72c9c3a40138a511be5356fdc35 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -17,10 +17,7 @@ package rdf import ( - "bufio" "fmt" - "io" - "strings" "time" "github.com/dgraph-io/dgraph/lex" @@ -129,29 +126,3 @@ func Parse(line string) (rnq NQuad, rerr error) { func isNewline(r rune) bool { return r == '\n' || r == '\r' } - -func ParseStream(reader io.Reader, cnq chan NQuad, done chan error) { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - line := scanner.Text() - line = strings.Trim(line, " \t") - if len(line) == 0 { - continue - } - - glog.Debugf("Got line: %q", line) - nq, err := Parse(line) - if err != nil { - x.Err(glog, err).Errorf("While parsing: %q", line) - done <- err - return - } - cnq <- nq - } - if err := scanner.Err(); err != nil { - x.Err(glog, err).Error("While scanning input") - done <- err - return - } - done <- nil -} diff --git a/rdf/parse_test.go b/rdf/parse_test.go index 66934d1e79fb592d906a84131170332e4d1296ea..6c4e619291a56539fe9eb649ef15e8cbd8ede2b2 100644 --- a/rdf/parse_test.go +++ b/rdf/parse_test.go @@ -18,7 +18,6 @@ package rdf import ( "reflect" - "strings" "testing" ) @@ -214,36 +213,3 @@ func TestLex(t *testing.T) { } } } - -func TestParseStream(t *testing.T) { - cnq := make(chan NQuad, 10) - done := make(chan error) - - data := ` - - <alice> <follows> <bob> . - <bob> <follows> <fred> . - <bob> <status> "cool_person" . - <charlie> <follows> <bob> . - <charlie> <follows> <dani> . - <dani> <follows> <bob> . - <dani> <follows> <greg> . - <dani> <status> "cool_person" . - <emily> <follows> <fred> . - <fred> <follows> <greg> . - <greg> <status> "cool_person" . - ` - go ParseStream(strings.NewReader(data), cnq, done) -Loop: - for { - select { - case nq := <-cnq: - t.Logf("Got nquad: %v", nq) - case err := <-done: - if err != nil { - t.Errorf("While parsing data: %v", err) - } - break Loop - } - } -} diff --git a/server/main.go b/server/main.go index 8e7fb2fa0c29f109dc80e6c9aa5834e7591ee339..f8582fd9a6777e931909fcc77c6d1114faa858f1 100644 --- a/server/main.go +++ b/server/main.go @@ -17,6 +17,7 @@ package main import ( + "bufio" "compress/gzip" "flag" "fmt" @@ -24,7 +25,9 @@ import ( "io/ioutil" "net/http" "os" + "runtime" "strings" + "sync" "sync/atomic" "time" @@ -47,9 +50,12 @@ var rdfGzips = flag.String("rdfgzips", "", "Comma separated gzip files containing RDF data") var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") var port = flag.String("port", "8080", "Port to run server on.") +var numgo = flag.Int("numgo", 4, + "Number of goroutines to use for reading file.") type counters struct { read uint64 + parsed uint64 processed uint64 ignored uint64 } @@ -58,50 +64,102 @@ func printCounters(ticker *time.Ticker, c *counters) { for _ = range ticker.C { glog.WithFields(logrus.Fields{ "read": atomic.LoadUint64(&c.read), + "parsed": atomic.LoadUint64(&c.parsed), "processed": atomic.LoadUint64(&c.processed), "ignored": atomic.LoadUint64(&c.ignored), }).Info("Counters") } } +func readLines(r io.Reader, input chan string, c *counters) { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + input <- scanner.Text() + atomic.AddUint64(&c.read, 1) + } + if err := scanner.Err(); err != nil { + glog.WithError(err).Fatal("While reading file.") + } + close(input) +} + +func parseStream(input chan string, cnq chan rdf.NQuad, + done chan error, c *counters) { + + for line := range input { + line = strings.Trim(line, " \t") + if len(line) == 0 { + glog.Info("Empty line.") + continue + } + + glog.Debugf("Got line: %q", line) + nq, err := rdf.Parse(line) + if err != nil { + x.Err(glog, err).Errorf("While parsing: %q", line) + done <- err + return + } + cnq <- nq + atomic.AddUint64(&c.parsed, 1) + } + done <- nil +} + +func handleNQuads(cnq chan rdf.NQuad, ctr *counters, wg *sync.WaitGroup) { + for nq := range cnq { + if farm.Fingerprint64([]byte(nq.Subject))%*mod != 0 { + // Ignore due to mod sampling. + atomic.AddUint64(&ctr.ignored, 1) + continue + } + + edge, err := nq.ToEdge() + if err != nil { + x.Err(glog, err).WithField("nq", nq).Error("While converting to edge") + return + } + + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.Get(key) + plist.AddMutation(edge, posting.Set) + atomic.AddUint64(&ctr.processed, 1) + } + wg.Done() +} + // Blocking function. func handleRdfReader(reader io.Reader) (uint64, error) { - cnq := make(chan rdf.NQuad, 1000) - done := make(chan error) ctr := new(counters) ticker := time.NewTicker(time.Second) - go rdf.ParseStream(reader, cnq, done) go printCounters(ticker, ctr) -Loop: - for { - select { - case nq := <-cnq: - atomic.AddUint64(&ctr.read, 1) - - if farm.Fingerprint64([]byte(nq.Subject))%*mod != 0 { - // Ignore due to mod sampling. - atomic.AddUint64(&ctr.ignored, 1) - break - } - edge, err := nq.ToEdge() - if err != nil { - x.Err(glog, err).WithField("nq", nq).Error("While converting to edge") - return 0, err - } - key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.Get(key) - plist.AddMutation(edge, posting.Set) - atomic.AddUint64(&ctr.processed, 1) + // Producer: Start buffering input to channel. + input := make(chan string, 10000) + go readLines(reader, input, ctr) - case err := <-done: - if err != nil { - x.Err(glog, err).Error("While reading request") - return 0, err - } - break Loop + cnq := make(chan rdf.NQuad, 10000) + done := make(chan error, *numgo) + wg := new(sync.WaitGroup) + for i := 0; i < *numgo; i++ { + wg.Add(1) + go parseStream(input, cnq, done, ctr) // Input --> NQuads + go handleNQuads(cnq, ctr, wg) // NQuads --> Posting list + } + + // The following will block until all ParseStream goroutines finish. + for i := 0; i < *numgo; i++ { + if err := <-done; err != nil { + glog.WithError(err).Fatal("While reading input.") } } + close(cnq) + // Okay, we've stopped input to cnq, and closed it. + // Now wait for handleNQuads to finish. + wg.Wait() + + // We're doing things in this complicated way, because generating + // string -> NQuad is the slower task here. ticker.Stop() return atomic.LoadUint64(&ctr.processed), nil } @@ -168,11 +226,12 @@ func main() { glog.Fatal("Unable to parse flags") } logrus.SetLevel(logrus.InfoLevel) + glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs") ps := new(store.Store) ps.Init(*postingDir) clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20) - clog.SyncEvery = 1000 + clog.SkipWrite = true // Don't write to commit logs. clog.Init() defer clog.Close() posting.Init(ps, clog) @@ -203,6 +262,9 @@ func main() { f.Close() } } + // Okay, now start accumulating mutations. + clog.SkipWrite = false + clog.SyncEvery = 1 http.HandleFunc("/rdf", rdfHandler) http.HandleFunc("/query", queryHandler) diff --git a/server/thoughts.md b/server/thoughts.md new file mode 100644 index 0000000000000000000000000000000000000000..548bc3ee9e73ac297dc165b9ff6c5676e0f88b5b --- /dev/null +++ b/server/thoughts.md @@ -0,0 +1,21 @@ +1. Select v/s Range + +2. sync.WaitGroup. + +func handle(..) { + wg.Add(1) + ... + wg.Done() +} + +func main() { + wg := new(sync.WaitGroup) + for i := 0; i < N; i++ { + go handle(..) + } + wg.Wait() +} + +The above wouldn't work, because goroutines don't necessarily get scheduled immediately. +So, wg.Add(1) wouldn't get called, which means wg.Wait() wouldn't block, and the program +would finish execution before goroutines had a chance to be run. diff --git a/uid/assigner.go b/uid/assigner.go index 953986d55de1f06f5ad53fc67e5f7ff5f6d523ba..cf6aace70415dc54c043c3b8401f0a6cfc54d19a 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "math" + "sync" "time" "github.com/dgraph-io/dgraph/posting" @@ -28,15 +29,76 @@ import ( "github.com/dgryski/go-farm" ) -var log = x.Log("uid") +var glog = x.Log("uid") +var lmgr *lockManager -func allocateNew(xid string) (uid uint64, rerr error) { +type entry struct { + sync.Mutex + ts time.Time +} + +func (e entry) isOld() bool { + e.Lock() + defer e.Unlock() + return time.Now().Sub(e.ts) > time.Minute +} + +type lockManager struct { + sync.RWMutex + locks map[string]*entry +} + +func (lm *lockManager) newOrExisting(xid string) *entry { + lm.RLock() + if e, ok := lm.locks[xid]; ok { + lm.RUnlock() + return e + } + lm.RUnlock() + + lm.Lock() + defer lm.Unlock() + if e, ok := lm.locks[xid]; ok { + return e + } + e := new(entry) + e.ts = time.Now() + lm.locks[xid] = e + return e +} + +func (lm *lockManager) clean() { + ticker := time.NewTicker(time.Minute) + for _ = range ticker.C { + count := 0 + lm.Lock() + for xid, e := range lm.locks { + if e.isOld() { + count += 1 + delete(lm.locks, xid) + } + } + lm.Unlock() + // A minute is enough to avoid the race condition issue for + // uid allocation to an xid. + glog.WithField("count", count).Info("Deleted old locks.") + } +} + +// package level init +func init() { + lmgr = new(lockManager) + lmgr.locks = make(map[string]*entry) + go lmgr.clean() +} + +func allocateUniqueUid(xid string) (uid uint64, rerr error) { for sp := ""; ; sp += " " { txid := xid + sp uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. - log.WithField("txid", txid).WithField("uid", uid).Debug("Generated") + glog.WithField("txid", txid).WithField("uid", uid).Debug("Generated") if uid == math.MaxUint64 { - log.Debug("Hit uint64max while generating fingerprint. Ignoring...") + glog.Debug("Hit uint64max while generating fingerprint. Ignoring...") continue } @@ -51,7 +113,7 @@ func allocateNew(xid string) (uid uint64, rerr error) { var tmp interface{} posting.ParseValue(&tmp, p.ValueBytes()) - log.Debug("Found existing xid: [%q]. Continuing...", tmp.(string)) + glog.Debug("Found existing xid: [%q]. Continuing...", tmp.(string)) continue } @@ -63,7 +125,7 @@ func allocateNew(xid string) (uid uint64, rerr error) { } rerr = pl.AddMutation(t, posting.Set) if rerr != nil { - x.Err(log, rerr).Error("While adding mutation") + glog.WithError(rerr).Error("While adding mutation") } return uid, rerr } @@ -71,6 +133,38 @@ func allocateNew(xid string) (uid uint64, rerr error) { " Wake the stupid developer up.") } +func assignNew(pl *posting.List, xid string) (uint64, error) { + entry := lmgr.newOrExisting(xid) + entry.Lock() + entry.ts = time.Now() + defer entry.Unlock() + + if pl.Length() > 1 { + glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + + } else if pl.Length() > 0 { + var p types.Posting + if ok := pl.Get(&p, 0); !ok { + return 0, errors.New("While retrieving entry from posting list.") + } + return p.Uid(), nil + } + + // No current id exists. Create one. + uid, err := allocateUniqueUid(xid) + if err != nil { + return 0, err + } + + t := x.DirectedEdge{ + ValueId: uid, + Source: "_assigner_", + Timestamp: time.Now(), + } + rerr := pl.AddMutation(t, posting.Set) + return uid, rerr +} + func stringKey(xid string) []byte { buf := new(bytes.Buffer) buf.WriteString("_uid_") @@ -79,26 +173,14 @@ func stringKey(xid string) []byte { return buf.Bytes() } -// TODO: Currently one posting list is modified after another, without func GetOrAssign(xid string) (uid uint64, rerr error) { key := stringKey(xid) pl := posting.Get(key) if pl.Length() == 0 { - // No current id exists. Create one. - uid, err := allocateNew(xid) - if err != nil { - return 0, err - } - t := x.DirectedEdge{ - ValueId: uid, - Source: "_assigner_", - Timestamp: time.Now(), - } - rerr = pl.AddMutation(t, posting.Set) - return uid, rerr + return assignNew(pl, xid) } else if pl.Length() > 1 { - log.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) } else { // We found one posting. @@ -120,18 +202,18 @@ func ExternalId(uid uint64) (xid string, rerr error) { } if pl.Length() > 1 { - log.WithField("uid", uid).Fatal("This shouldn't be happening.") + glog.WithField("uid", uid).Fatal("This shouldn't be happening.") return "", errors.New("Multiple external ids for this uid.") } var p types.Posting if ok := pl.Get(&p, 0); !ok { - log.WithField("uid", uid).Error("While retrieving posting") + glog.WithField("uid", uid).Error("While retrieving posting") return "", errors.New("While retrieving posting") } if p.Uid() != math.MaxUint64 { - log.WithField("uid", uid).Fatal("Value uid must be MaxUint64.") + glog.WithField("uid", uid).Fatal("Value uid must be MaxUint64.") } var t interface{} rerr = posting.ParseValue(&t, p.ValueBytes())