diff --git a/loader/loader.go b/loader/loader.go index 3ea2088f19fade5f3e7064e77ae576c5330739b9..0bf9bd3477e82f8072304813fa41a4403b2833a0 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -47,11 +47,13 @@ type counters struct { } type state struct { + sync.RWMutex input chan string cnq chan rdf.NQuad ctr *counters instanceIdx uint64 numInstances uint64 + err error } func Init(uidstore, datastore *store.Store) { @@ -59,6 +61,20 @@ func Init(uidstore, datastore *store.Store) { dataStore = datastore } +func (s *state) Error() error { + s.RLock() + defer s.RUnlock() + return s.err +} + +func (s *state) SetError(err error) { + s.Lock() + defer s.Unlock() + if s.err == nil { + s.err = err + } +} + func (s *state) printCounters(ticker *time.Ticker) { var prev uint64 for _ = range ticker.C { @@ -81,6 +97,7 @@ func (s *state) printCounters(ticker *time.Ticker) { } } +// Only run this in a single goroutine. This function closes s.input channel. func (s *state) readLines(r io.Reader) { var buf []string scanner := bufio.NewScanner(r) @@ -108,8 +125,14 @@ func (s *state) readLines(r io.Reader) { close(s.input) } -func (s *state) parseStream(done chan error) { +func (s *state) parseStream(wg *sync.WaitGroup) { + defer wg.Done() + for line := range s.input { + if s.Error() != nil { + return + } + line = strings.Trim(line, " \t") if len(line) == 0 { glog.Info("Empty line.") @@ -119,18 +142,21 @@ func (s *state) parseStream(done chan error) { glog.Debugf("Got line: %q", line) nq, err := rdf.Parse(line) if err != nil { - glog.WithError(err).Errorf("While parsing: %q", line) - done <- err + s.SetError(err) return } s.cnq <- nq atomic.AddUint64(&s.ctr.parsed, 1) } - done <- nil } func (s *state) handleNQuads(wg *sync.WaitGroup) { + defer wg.Done() + for nq := range s.cnq { + if s.Error() != nil { + return + } edge, err := nq.ToEdge() for err != nil { // Just put in a retry loop to tackle temporary errors. @@ -138,6 +164,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { time.Sleep(time.Microsecond) } else { + s.SetError(err) glog.WithError(err).WithField("nq", nq). Error("While converting to edge") return @@ -155,9 +182,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { } else { atomic.AddUint64(&s.ctr.ignored, 1) } - } - wg.Done() } func (s *state) assignUid(xid string) error { @@ -187,10 +212,15 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { defer wg.Done() for nq := range s.cnq { + if s.Error() != nil { + return + } ignored := true if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx { if err := s.assignUid(nq.Subject); err != nil { - glog.WithError(err).Fatal("While assigning Uid to subject.") + s.SetError(err) + glog.WithError(err).Error("While assigning Uid to subject.") + return } ignored = false } @@ -198,7 +228,9 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) { if len(nq.ObjectId) > 0 && farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx { if err := s.assignUid(nq.ObjectId); err != nil { - glog.WithError(err).Fatal("While assigning Uid to object.") + s.SetError(err) + glog.WithError(err).Error("While assigning Uid to object.") + return } ignored = false } @@ -228,31 +260,28 @@ func LoadEdges(reader io.Reader, instanceIdx uint64, s.cnq = make(chan rdf.NQuad, 10000) numr := runtime.GOMAXPROCS(-1) - done := make(chan error, numr) + var pwg sync.WaitGroup + pwg.Add(numr) for i := 0; i < numr; i++ { - go s.parseStream(done) // Input --> NQuads + go s.parseStream(&pwg) // Input --> NQuads } - wg := new(sync.WaitGroup) - for i := 0; i < 3000; i++ { - wg.Add(1) - go s.handleNQuads(wg) // NQuads --> Posting list [slow]. + nrt := 3000 + var wg sync.WaitGroup + wg.Add(nrt) + for i := 0; i < nrt; i++ { + go s.handleNQuads(&wg) // NQuads --> Posting list [slow]. } // Block until all parseStream goroutines are finished. - for i := 0; i < numr; i++ { - if err := <-done; err != nil { - glog.WithError(err).Fatal("While reading input.") - } - } - + pwg.Wait() close(s.cnq) // Okay, we've stopped input to cnq, and closed it. // Now wait for handleNQuads to finish. wg.Wait() ticker.Stop() - return atomic.LoadUint64(&s.ctr.processed), nil + return atomic.LoadUint64(&s.ctr.processed), s.Error() } // AssignUids would pick up all the external ids in RDFs read, @@ -273,9 +302,10 @@ func AssignUids(reader io.Reader, instanceIdx uint64, s.cnq = make(chan rdf.NQuad, 10000) numr := runtime.GOMAXPROCS(-1) - done := make(chan error, numr) + var pwg sync.WaitGroup + pwg.Add(numr) for i := 0; i < numr; i++ { - go s.parseStream(done) // Input --> NQuads + go s.parseStream(&pwg) // Input --> NQuads } wg := new(sync.WaitGroup) @@ -285,17 +315,12 @@ func AssignUids(reader io.Reader, instanceIdx uint64, } // Block until all parseStream goroutines are finished. - for i := 0; i < numr; i++ { - if err := <-done; err != nil { - glog.WithError(err).Fatal("While reading input.") - } - } - + pwg.Wait() close(s.cnq) // Okay, we've stopped input to cnq, and closed it. // Now wait for handleNQuads to finish. wg.Wait() ticker.Stop() - return atomic.LoadUint64(&s.ctr.processed), nil + return atomic.LoadUint64(&s.ctr.processed), s.Error() }