diff --git a/loader/loader.go b/loader/loader.go index 0ff5353e1592edafdb5339d3bc33f5cf3955f6d6..bbf3309be25073d765f64919d0b7023558b778fd 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -149,6 +149,32 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { wg.Done() } +func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { + for nq := range s.cnq { + if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { + // Ignore due to mod sampling. + atomic.AddUint64(&s.ctr.ignored, 1) + continue + } + + edge, err := nq.ToEdge() + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + + } else { + glog.WithError(err).WithField("nq", nq). + Error("While converting to edge") + return + } + edge, err = nq.ToEdge() + } + glog.Info(edge); + } + wg.Done() +} + // Blocking function. func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { s := new(state) @@ -189,3 +215,45 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { ticker.Stop() return atomic.LoadUint64(&s.ctr.processed), nil } + +// Blocking function. +func HandleRdfReaderWhileAssign(reader io.Reader, mod uint64) (uint64, error) { + s := new(state) + s.ctr = new(counters) + ticker := time.NewTicker(time.Second) + go s.printCounters(ticker) + + // Producer: Start buffering input to channel. + s.mod = mod + s.input = make(chan string, 10000) + go s.readLines(reader) + + s.cnq = make(chan rdf.NQuad, 10000) + numr := runtime.GOMAXPROCS(-1) + done := make(chan error, numr) + for i := 0; i < numr; i++ { + go s.parseStream(done) // Input --> NQuads + } + + wg := new(sync.WaitGroup) + for i := 0; i < 3000; i++ { + wg.Add(1) + go s.handleNQuadsWhileAssign(wg) //Different compared to HandleRdfReader + } + + // Block until all parseStream goroutines are finished. + for i := 0; i < numr; i++ { + if err := <-done; err != nil { + glog.WithError(err).Fatal("While reading input.") + } + } + + close(s.cnq) + // Okay, we've stopped input to cnq, and closed it. + // Now wait for handleNQuads to finish. + wg.Wait() + + ticker.Stop() + return atomic.LoadUint64(&s.ctr.processed), nil +} + diff --git a/server/loader/.main.go.swp b/server/loader/.main.go.swp new file mode 100644 index 0000000000000000000000000000000000000000..2f79c005230d613b1ec91a204fa5c9de368b0154 Binary files /dev/null and b/server/loader/.main.go.swp differ diff --git a/server/uidassigner/main.go b/server/uidassigner/main.go new file mode 100644 index 0000000000000000000000000000000000000000..80bbe18b72e79c92c8b104231bb8f2618be04245 --- /dev/null +++ b/server/uidassigner/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "compress/gzip" + "flag" + "os" + "runtime" + "runtime/pprof" + "strings" + + "github.com/Sirupsen/logrus" + "github.com/dgraph-io/dgraph/loader" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/store" + "github.com/dgraph-io/dgraph/x" +) + +var glog = x.Log("uidassigner_main") + +var rdfGzips = flag.String("rdfgzips", "", + "Comma separated gzip files containing RDF data") +var mod = flag.Uint64("mod", 1, "Only pick entities, where uid % mod == 0.") +var uidDir = flag.String("uidpostings", "", "Directory to store xid to uid posting lists") +var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") + +func main() { + flag.Parse() + if !flag.Parsed() { + glog.Fatal("Unable to parse flags") + } + if len(*cpuprofile) > 0 { + f, err := os.Create(*cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + + logrus.SetLevel(logrus.InfoLevel) + numCpus := runtime.NumCPU() + prevProcs := runtime.GOMAXPROCS(numCpus) + glog.WithField("num_cpus", numCpus). + WithField("prev_maxprocs", prevProcs). + Info("Set max procs to num cpus") + + if len(*rdfGzips) == 0 { + glog.Fatal("No RDF GZIP files specified") + } + + ps := new(store.Store) + ps.Init(*uidDir) + defer ps.Close() + + posting.Init(ps, nil) + + files := strings.Split(*rdfGzips, ",") + for _, path := range files { + if len(path) == 0 { + continue + } + glog.WithField("path", path).Info("Handling...") + f, err := os.Open(path) + if err != nil { + glog.WithError(err).Fatal("Unable to open rdf file.") + } + + r, err := gzip.NewReader(f) + if err != nil { + glog.WithError(err).Fatal("Unable to create gzip reader.") + } + + count, err := loader.HandleRdfReaderWhileAssign(r, *mod) + if err != nil { + glog.WithError(err).Fatal("While handling rdf reader.") + } + glog.WithField("count", count).Info("RDFs parsed") + r.Close() + f.Close() + } + glog.Info("Calling merge lists") + posting.MergeLists(100 * numCpus) // 100 per core. +} +