Skip to content
Snippets Groups Projects
Commit 6f460e6d authored by Manish R Jain's avatar Manish R Jain
Browse files

Randomize input to avoid contention for RDFs containing the same subject....

Randomize input to avoid contention for RDFs containing the same subject. Don't call assigner clean(), because it hangs the entire process.
parent 9fb14d45
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package loader ...@@ -19,6 +19,7 @@ package loader
import ( import (
"bufio" "bufio"
"io" "io"
"math/rand"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
...@@ -48,26 +49,47 @@ type state struct { ...@@ -48,26 +49,47 @@ type state struct {
mod uint64 mod uint64
} }
func printCounters(ticker *time.Ticker, c *counters) { func (s *state) printCounters(ticker *time.Ticker) {
for _ = range ticker.C { for _ = range ticker.C {
parsed := atomic.LoadUint64(&s.ctr.parsed)
ignored := atomic.LoadUint64(&s.ctr.ignored)
processed := atomic.LoadUint64(&s.ctr.processed)
pending := parsed - ignored - processed
glog.WithFields(logrus.Fields{ glog.WithFields(logrus.Fields{
"read": atomic.LoadUint64(&c.read), "read": atomic.LoadUint64(&s.ctr.read),
"parsed": atomic.LoadUint64(&c.parsed), "processed": processed,
"processed": atomic.LoadUint64(&c.processed), "parsed": parsed,
"ignored": atomic.LoadUint64(&c.ignored), "ignored": ignored,
"pending": pending,
"len_cnq": len(s.cnq),
}).Info("Counters") }).Info("Counters")
} }
} }
func (s *state) readLines(r io.Reader) { func (s *state) readLines(r io.Reader) {
var buf []string
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)
// Randomize lines to avoid contention on same subject.
for i := 0; i < 1000; i++ {
if scanner.Scan() {
buf = append(buf, scanner.Text())
} else {
break
}
}
ln := len(buf)
for scanner.Scan() { for scanner.Scan() {
s.input <- scanner.Text() k := rand.Intn(ln)
s.input <- buf[k]
buf[k] = scanner.Text()
atomic.AddUint64(&s.ctr.read, 1) atomic.AddUint64(&s.ctr.read, 1)
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
glog.WithError(err).Fatal("While reading file.") glog.WithError(err).Fatal("While reading file.")
} }
for i := 0; i < len(buf); i++ {
s.input <- buf[i]
}
close(s.input) close(s.input)
} }
...@@ -119,29 +141,33 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { ...@@ -119,29 +141,33 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
s := new(state) s := new(state)
s.ctr = new(counters) s.ctr = new(counters)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
go printCounters(ticker, s.ctr) go s.printCounters(ticker)
// Producer: Start buffering input to channel. // Producer: Start buffering input to channel.
s.mod = mod s.mod = mod
s.input = make(chan string, 10000) s.input = make(chan string, 10000)
go s.readLines(reader) go s.readLines(reader)
numr := runtime.GOMAXPROCS(-1)
s.cnq = make(chan rdf.NQuad, 10000) s.cnq = make(chan rdf.NQuad, 10000)
numr := runtime.GOMAXPROCS(-1)
done := make(chan error, numr) done := make(chan error, numr)
wg := new(sync.WaitGroup)
for i := 0; i < numr; i++ { for i := 0; i < numr; i++ {
wg.Add(1)
go s.parseStream(done) // Input --> NQuads go s.parseStream(done) // Input --> NQuads
go s.handleNQuads(wg) // NQuads --> Posting list [slow].
} }
// The following will block until all ParseStream goroutines finish. wg := new(sync.WaitGroup)
for i := 0; i < 100; i++ {
wg.Add(1)
go s.handleNQuads(wg) // NQuads --> Posting list [slow].
}
// Block until all parseStream goroutines are finished.
for i := 0; i < numr; i++ { for i := 0; i < numr; i++ {
if err := <-done; err != nil { if err := <-done; err != nil {
glog.WithError(err).Fatal("While reading input.") glog.WithError(err).Fatal("While reading input.")
} }
} }
close(s.cnq) close(s.cnq)
// Okay, we've stopped input to cnq, and closed it. // Okay, we've stopped input to cnq, and closed it.
// Now wait for handleNQuads to finish. // Now wait for handleNQuads to finish.
......
/loader
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"compress/gzip"
"flag"
"os"
"runtime"
"runtime/pprof"
"strings"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/dgraph/commit"
"github.com/dgraph-io/dgraph/loader"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)
var glog = x.Log("loader_main")
var rdfGzips = flag.String("rdfgzips", "",
"Comma separated gzip files containing RDF data")
var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.")
var numgo = flag.Int("numgo", 4,
"Number of goroutines to use for reading file.")
var postingDir = flag.String("postings", "", "Directory to store posting lists")
var mutationDir = flag.String("mutations", "", "Directory to store mutations")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
func main() {
flag.Parse()
if !flag.Parsed() {
glog.Fatal("Unable to parse flags")
}
if len(*cpuprofile) > 0 {
f, err := os.Create(*cpuprofile)
if err != nil {
glog.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
logrus.SetLevel(logrus.InfoLevel)
glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs")
if len(*rdfGzips) == 0 {
glog.Fatal("No RDF GZIP files specified")
}
ps := new(store.Store)
ps.Init(*postingDir)
defer ps.Close()
clog := commit.NewLogger(*mutationDir, "dgraph", 50<<20)
clog.SkipWrite = true // Don't write to commit logs.
clog.Init()
defer clog.Close()
posting.Init(ps, clog)
files := strings.Split(*rdfGzips, ",")
for _, path := range files {
if len(path) == 0 {
continue
}
glog.WithField("path", path).Info("Handling...")
f, err := os.Open(path)
if err != nil {
glog.WithError(err).Fatal("Unable to open rdf file.")
}
r, err := gzip.NewReader(f)
if err != nil {
glog.WithError(err).Fatal("Unable to create gzip reader.")
}
count, err := loader.HandleRdfReader(r, *mod)
if err != nil {
glog.Fatal(err)
}
glog.WithField("count", count).Info("RDFs parsed")
r.Close()
f.Close()
}
glog.Info("Calling merge lists")
posting.MergeLists(100)
}
...@@ -89,7 +89,7 @@ func (lm *lockManager) clean() { ...@@ -89,7 +89,7 @@ func (lm *lockManager) clean() {
func init() { func init() {
lmgr = new(lockManager) lmgr = new(lockManager)
lmgr.locks = make(map[string]*entry) lmgr.locks = make(map[string]*entry)
go lmgr.clean() // go lmgr.clean()
} }
func allocateUniqueUid(xid string) (uid uint64, rerr error) { func allocateUniqueUid(xid string) (uid uint64, rerr error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment