From 93e35d179ed3c66921e17733e955df0be9e1f52a Mon Sep 17 00:00:00 2001
From: Manish R Jain <manishrjain@gmail.com>
Date: Thu, 3 Mar 2016 06:54:24 +0000
Subject: [PATCH] Store any errors encountered while running the pipeline. If
 any error is found, stop processing immediately. Switch the system to
 sync.WaitGroup completely, instead of an error channel. Log fatal on error in
 loader and uidassigner

---
 loader/loader.go | 85 +++++++++++++++++++++++++++++++-----------------
 1 file changed, 55 insertions(+), 30 deletions(-)

diff --git a/loader/loader.go b/loader/loader.go
index 3ea2088f..0bf9bd34 100644
--- a/loader/loader.go
+++ b/loader/loader.go
@@ -47,11 +47,13 @@ type counters struct {
 }
 
 type state struct {
+	sync.RWMutex
 	input        chan string
 	cnq          chan rdf.NQuad
 	ctr          *counters
 	instanceIdx  uint64
 	numInstances uint64
+	err          error
 }
 
 func Init(uidstore, datastore *store.Store) {
@@ -59,6 +61,20 @@ func Init(uidstore, datastore *store.Store) {
 	dataStore = datastore
 }
 
+func (s *state) Error() error {
+	s.RLock()
+	defer s.RUnlock()
+	return s.err
+}
+
+func (s *state) SetError(err error) {
+	s.Lock()
+	defer s.Unlock()
+	if s.err == nil {
+		s.err = err
+	}
+}
+
 func (s *state) printCounters(ticker *time.Ticker) {
 	var prev uint64
 	for _ = range ticker.C {
@@ -81,6 +97,7 @@ func (s *state) printCounters(ticker *time.Ticker) {
 	}
 }
 
+// Only run this in a single goroutine. This function closes s.input channel.
 func (s *state) readLines(r io.Reader) {
 	var buf []string
 	scanner := bufio.NewScanner(r)
@@ -108,8 +125,14 @@ func (s *state) readLines(r io.Reader) {
 	close(s.input)
 }
 
-func (s *state) parseStream(done chan error) {
+func (s *state) parseStream(wg *sync.WaitGroup) {
+	defer wg.Done()
+
 	for line := range s.input {
+		if s.Error() != nil {
+			return
+		}
+
 		line = strings.Trim(line, " \t")
 		if len(line) == 0 {
 			glog.Info("Empty line.")
@@ -119,18 +142,21 @@ func (s *state) parseStream(done chan error) {
 		glog.Debugf("Got line: %q", line)
 		nq, err := rdf.Parse(line)
 		if err != nil {
-			glog.WithError(err).Errorf("While parsing: %q", line)
-			done <- err
+			s.SetError(err)
 			return
 		}
 		s.cnq <- nq
 		atomic.AddUint64(&s.ctr.parsed, 1)
 	}
-	done <- nil
 }
 
 func (s *state) handleNQuads(wg *sync.WaitGroup) {
+	defer wg.Done()
+
 	for nq := range s.cnq {
+		if s.Error() != nil {
+			return
+		}
 		edge, err := nq.ToEdge()
 		for err != nil {
 			// Just put in a retry loop to tackle temporary errors.
@@ -138,6 +164,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 				time.Sleep(time.Microsecond)
 
 			} else {
+				s.SetError(err)
 				glog.WithError(err).WithField("nq", nq).
 					Error("While converting to edge")
 				return
@@ -155,9 +182,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
 		} else {
 			atomic.AddUint64(&s.ctr.ignored, 1)
 		}
-
 	}
-	wg.Done()
 }
 
 func (s *state) assignUid(xid string) error {
@@ -187,10 +212,15 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
 	defer wg.Done()
 
 	for nq := range s.cnq {
+		if s.Error() != nil {
+			return
+		}
 		ignored := true
 		if farm.Fingerprint64([]byte(nq.Subject))%s.numInstances == s.instanceIdx {
 			if err := s.assignUid(nq.Subject); err != nil {
-				glog.WithError(err).Fatal("While assigning Uid to subject.")
+				s.SetError(err)
+				glog.WithError(err).Error("While assigning Uid to subject.")
+				return
 			}
 			ignored = false
 		}
@@ -198,7 +228,9 @@ func (s *state) assignUidsOnly(wg *sync.WaitGroup) {
 		if len(nq.ObjectId) > 0 &&
 			farm.Fingerprint64([]byte(nq.ObjectId))%s.numInstances == s.instanceIdx {
 			if err := s.assignUid(nq.ObjectId); err != nil {
-				glog.WithError(err).Fatal("While assigning Uid to object.")
+				s.SetError(err)
+				glog.WithError(err).Error("While assigning Uid to object.")
+				return
 			}
 			ignored = false
 		}
@@ -228,31 +260,28 @@ func LoadEdges(reader io.Reader, instanceIdx uint64,
 
 	s.cnq = make(chan rdf.NQuad, 10000)
 	numr := runtime.GOMAXPROCS(-1)
-	done := make(chan error, numr)
+	var pwg sync.WaitGroup
+	pwg.Add(numr)
 	for i := 0; i < numr; i++ {
-		go s.parseStream(done) // Input --> NQuads
+		go s.parseStream(&pwg) // Input --> NQuads
 	}
 
-	wg := new(sync.WaitGroup)
-	for i := 0; i < 3000; i++ {
-		wg.Add(1)
-		go s.handleNQuads(wg) // NQuads --> Posting list [slow].
+	nrt := 3000
+	var wg sync.WaitGroup
+	wg.Add(nrt)
+	for i := 0; i < nrt; i++ {
+		go s.handleNQuads(&wg) // NQuads --> Posting list [slow].
 	}
 
 	// Block until all parseStream goroutines are finished.
-	for i := 0; i < numr; i++ {
-		if err := <-done; err != nil {
-			glog.WithError(err).Fatal("While reading input.")
-		}
-	}
-
+	pwg.Wait()
 	close(s.cnq)
 	// Okay, we've stopped input to cnq, and closed it.
 	// Now wait for handleNQuads to finish.
 	wg.Wait()
 
 	ticker.Stop()
-	return atomic.LoadUint64(&s.ctr.processed), nil
+	return atomic.LoadUint64(&s.ctr.processed), s.Error()
 }
 
 // AssignUids would pick up all the external ids in RDFs read,
@@ -273,9 +302,10 @@ func AssignUids(reader io.Reader, instanceIdx uint64,
 
 	s.cnq = make(chan rdf.NQuad, 10000)
 	numr := runtime.GOMAXPROCS(-1)
-	done := make(chan error, numr)
+	var pwg sync.WaitGroup
+	pwg.Add(numr)
 	for i := 0; i < numr; i++ {
-		go s.parseStream(done) // Input --> NQuads
+		go s.parseStream(&pwg) // Input --> NQuads
 	}
 
 	wg := new(sync.WaitGroup)
@@ -285,17 +315,12 @@ func AssignUids(reader io.Reader, instanceIdx uint64,
 	}
 
 	// Block until all parseStream goroutines are finished.
-	for i := 0; i < numr; i++ {
-		if err := <-done; err != nil {
-			glog.WithError(err).Fatal("While reading input.")
-		}
-	}
-
+	pwg.Wait()
 	close(s.cnq)
 	// Okay, we've stopped input to cnq, and closed it.
 	// Now wait for handleNQuads to finish.
 	wg.Wait()
 
 	ticker.Stop()
-	return atomic.LoadUint64(&s.ctr.processed), nil
+	return atomic.LoadUint64(&s.ctr.processed), s.Error()
 }
-- 
GitLab