diff --git a/loader/loader.go b/loader/loader.go index bbf3309be25073d765f64919d0b7023558b778fd..fe3869739cdf5f2efd98f0b50455cb96adfafd35 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -152,25 +152,42 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { func (s *state) handleNQuadsWhileAssign(wg *sync.WaitGroup) { for nq := range s.cnq { if farm.Fingerprint64([]byte(nq.Subject))%s.mod != 0 { - // Ignore due to mod sampling. + // This instance shouldnt assign UID to this string atomic.AddUint64(&s.ctr.ignored, 1) continue - } - - edge, err := nq.ToEdge() - for err != nil { - // Just put in a retry loop to tackle temporary errors. - if err == posting.E_TMP_ERROR { - time.Sleep(time.Microsecond) - - } else { - glog.WithError(err).WithField("nq", nq). - Error("While converting to edge") - return + } else { + _, err := rdf.GetUid(nq.Subject) + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + } else { + glog.WithError(err).WithField("nq.Subject", nq.Subject). + Error("While getting UID") + return + } + _, err = rdf.GetUid(nq.Subject) } - edge, err = nq.ToEdge() } - glog.Info(edge); + + if len(nq.ObjectId) == 0 || farm.Fingerprint64([]byte(nq.ObjectId))%s.mod != 0 { + // This instance shouldnt or cant assign UID to this string + atomic.AddUint64(&s.ctr.ignored, 1) + continue + } else { + _, err := rdf.GetUid(nq.ObjectId) + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + } else { + glog.WithError(err).WithField("nq.ObjectId", nq.ObjectId). + Error("While getting UID") + return + } + _, err = rdf.GetUid(nq.ObjectId) + } + } } wg.Done() } diff --git a/rdf/parse.go b/rdf/parse.go index af27e4aa555440582a99d7805fef4dcaafce66b6..809ef63ef17709a1c2d224fb1cb6ab685b4d572a 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -36,7 +36,7 @@ type NQuad struct { Language string } -func getUid(s string) (uint64, error) { +func GetUid(s string) (uint64, error) { if strings.HasPrefix(s, "_uid_:") { return strconv.ParseUint(s[6:], 0, 64) } @@ -44,13 +44,13 @@ func getUid(s string) (uint64, error) { } func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { - sid, err := getUid(nq.Subject) + sid, err := GetUid(nq.Subject) if err != nil { return result, err } result.Entity = sid if len(nq.ObjectId) > 0 { - oid, err := getUid(nq.ObjectId) + oid, err := GetUid(nq.ObjectId) if err != nil { return result, err }