From 49c5904434d54892595eb08ecff692dbe37ba8af Mon Sep 17 00:00:00 2001 From: Ashwin <ashwin2007ray@gmail.com> Date: Mon, 1 Feb 2016 05:30:25 +0000 Subject: [PATCH] Make getUid global --- loader/loader.go | 47 ++++++++++++++++++++++++++++++++--------------- rdf/parse.go | 6 +++--- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index bbf3309b..fe386973 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -152,25 +152,42 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { - // Ignore due to mod sampling. + // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) continue - } - - edge, err := nq.ToEdge() - for err != nil { - // Just put in a retry loop to tackle temporary errors. - if err == posting.E_TMP_ERROR { - time.Sleep(time.Microsecond) - - } else { - glog.WithError(err).WithField("nq", nq). - Error("While converting to edge") - return + } else { + _, err := rdf.GetUid(nq.Subject) + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + } else { + glog.WithError(err).WithField("nq.Subject", nq.Subject). + Error("While getting UID") + return + } + _, err = rdf.GetUid(nq.Subject) } - edge, err = nq.ToEdge() } - glog.Info(edge); + + if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.mod != 0 { + // This instance shouldnt or cant assign UID to this string + atomic.AddUint64(&s.ctr.ignored, 1) + continue + } else { + _, err := rdf.GetUid(nq.ObjectId) + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + } else { + glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId). + Error("While getting UID") + return + } + _, err = rdf.GetUid(nq.ObjectId) + } + } } wg.Done() } diff --git a/rdf/parse.go b/rdf/parse.go index af27e4aa..809ef63e 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,7 +36,7 @@ type NQuad struct { Language string } -func getUid(s string) (uint64, error) { +func GetUid(s string) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } @@ -44,13 +44,13 @@ func getUid(s string) (uint64, error) { } func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := getUid(nq.Subject) + sid, err := GetUid(nq.Subject) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := getUid(nq.ObjectId) + oid, err := GetUid(nq.ObjectId) if err != nil { return result, err } -- GitLab