Skip to content
Snippets Groups Projects
Commit 93e35d17 authored by Manish R Jain's avatar Manish R Jain
Browse files

Store any errors encountered while running the pipeline. If any error is...

Store any errors encountered while running the pipeline. If any error is found, stop processing immediately. Switch the system to sync.WaitGroup completely, instead of an error channel. Log fatal on error in loader and uidassigner
parent 4e6355ff
No related branches found
No related tags found
No related merge requests found
......@@ -47,11 +47,13 @@ type counters struct {
}
type state struct {
sync.RWMutex
input chan string
cnq chan rdf.NQuad
ctr *counters
instanceIdx uint64
numInstances uint64
err error
}
func Init(uidstore, datastore *store.Store) {
......@@ -59,6 +61,20 @@ func Init(uidstore, datastore *store.Store) {
dataStore = datastore
}
func (s *state) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
func (s *state) SetError(err error) {
s.Lock()
defer s.Unlock()
if s.err == nil {
s.err = err
}
}
func (s *state) printCounters(ticker *time.Ticker) {
var prev uint64
for _ = range ticker.C {
......@@ -81,6 +97,7 @@ func (s *state) printCounters(ticker *time.Ticker) {
}
}
// Only run this in a single goroutine. This function closes s.input channel.
func (s *state) readLines(r io.Reader) {
var buf []string
scanner := bufio.NewScanner(r)
......@@ -108,8 +125,14 @@ func (s *state) readLines(r io.Reader) {
close(s.input)
}
func (s *state) parseStream(done chan error) {
func (s *state) parseStream(wg *sync.WaitGroup) {
defer wg.Done()
for line := range s.input {
if s.Error() != nil {
return
}
line = strings.Trim(line, " \t")
if len(line) == 0 {
glog.Info("Empty line.")
......@@ -119,18 +142,21 @@ func (s *state) parseStream(done chan error) {
glog.Debugf("Got line: %q", line)
nq, err := rdf.Parse(line)
if err != nil {
glog.WithError(err).Errorf("While parsing: %q", line)
done <- err
s.SetError(err)
return
}
s.cnq <- nq
atomic.AddUint64(&s.ctr.parsed, 1)
}
done <- nil
}
func (s *state) handleNQuads(wg *sync.WaitGroup) {
defer wg.Done()
for nq := range s.cnq {
if s.Error() != nil {
return
}
edge, err := nq.ToEdge()
for err != nil {
// Just put in a retry loop to tackle temporary errors.
......@@ -138,6 +164,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
time.Sleep(time.Microsecond)
} else {
s.SetError(err)
glog.WithError(err).WithField("nq", nq).
Error("While converting to edge")
return
......@@ -155,9 +182,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
}
}
wg.Done()
}
func (s *state) assignUid(xid string) error {
......@@ -187,10 +212,15 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
defer wg.Done()
for nq := range s.cnq {
if s.Error() != nil {
return
}
ignored := true
if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx {
if err := s.assignUid(nq.Subject); err != nil {
glog.WithError(err).Fatal("While assigning Uid to subject.")
s.SetError(err)
glog.WithError(err).Error("While assigning Uid to subject.")
return
}
ignored = false
}
......@@ -198,7 +228,9 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
if len(nq.ObjectId) > 0 &&
farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx {
if err := s.assignUid(nq.ObjectId); err != nil {
glog.WithError(err).Fatal("While assigning Uid to object.")
s.SetError(err)
glog.WithError(err).Error("While assigning Uid to object.")
return
}
ignored = false
}
......@@ -228,31 +260,28 @@ func LoadEdges(reader io.Reader, instanceIdx uint64,
s.cnq = make(chan rdf.NQuad, 10000)
numr := runtime.GOMAXPROCS(-1)
done := make(chan error, numr)
var pwg sync.WaitGroup
pwg.Add(numr)
for i := 0; i < numr; i++ {
go s.parseStream(done) // Input --> NQuads
go s.parseStream(&pwg) // Input --> NQuads
}
wg := new(sync.WaitGroup)
for i := 0; i < 3000; i++ {
wg.Add(1)
go s.handleNQuads(wg) // NQuads --> Posting list [slow].
nrt := 3000
var wg sync.WaitGroup
wg.Add(nrt)
for i := 0; i < nrt; i++ {
go s.handleNQuads(&wg) // NQuads --> Posting list [slow].
}
// Block until all parseStream goroutines are finished.
for i := 0; i < numr; i++ {
if err := <-done; err != nil {
glog.WithError(err).Fatal("While reading input.")
}
}
pwg.Wait()
close(s.cnq)
// Okay, we've stopped input to cnq, and closed it.
// Now wait for handleNQuads to finish.
wg.Wait()
ticker.Stop()
return atomic.LoadUint64(&s.ctr.processed), nil
return atomic.LoadUint64(&s.ctr.processed), s.Error()
}
// AssignUids would pick up all the external ids in RDFs read,
......@@ -273,9 +302,10 @@ func AssignUids(reader io.Reader, instanceIdx uint64,
s.cnq = make(chan rdf.NQuad, 10000)
numr := runtime.GOMAXPROCS(-1)
done := make(chan error, numr)
var pwg sync.WaitGroup
pwg.Add(numr)
for i := 0; i < numr; i++ {
go s.parseStream(done) // Input --> NQuads
go s.parseStream(&pwg) // Input --> NQuads
}
wg := new(sync.WaitGroup)
......@@ -285,17 +315,12 @@ func AssignUids(reader io.Reader, instanceIdx uint64,
}
// Block until all parseStream goroutines are finished.
for i := 0; i < numr; i++ {
if err := <-done; err != nil {
glog.WithError(err).Fatal("While reading input.")
}
}
pwg.Wait()
close(s.cnq)
// Okay, we've stopped input to cnq, and closed it.
// Now wait for handleNQuads to finish.
wg.Wait()
ticker.Stop()
return atomic.LoadUint64(&s.ctr.processed), nil
return atomic.LoadUint64(&s.ctr.processed), s.Error()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment