From bb8c40801c88be45cc472be3b3a8dd501917c7da Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Wed, 9 Mar 2016 12:44:31 +1100 Subject: [PATCH] Handle language when forming NQuad. Move the fp mod earlier in the process, to avoid paying for ToEdge() function call. --- loader/loader.go | 22 +++++++++++++--------- rdf/parse.go | 16 ++++------------ rdf/parse_test.go | 6 ++---- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 0bf9bd34..662f9f75 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -145,8 +145,13 @@ func (s *state) parseStream(wg *sync.WaitGroup) { s.SetError(err) return } - s.cnq <- nq - atomic.AddUint64(&s.ctr.parsed, 1) + if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances == + s.instanceIdx { + s.cnq <- nq + atomic.AddUint64(&s.ctr.parsed, 1) + } else { + atomic.AddUint64(&s.ctr.ignored, 1) + } } } @@ -173,15 +178,14 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { } // Only handle this edge if the attribute satisfies the modulo rule - if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances == + if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances != s.instanceIdx { - key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key, dataStore) - plist.AddMutation(edge, posting.Set) - atomic.AddUint64(&s.ctr.processed, 1) - } else { - atomic.AddUint64(&s.ctr.ignored, 1) + glog.WithField("edge", edge).Fatal("We shouldn't be receiving this edge.") } + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.GetOrCreate(key, dataStore) + plist.AddMutation(edge, posting.Set) + atomic.AddUint64(&s.ctr.processed, 1) } } diff --git a/rdf/parse.go b/rdf/parse.go index 3afe48bc..485cd8f7 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -33,7 +33,6 @@ type NQuad struct { ObjectId string ObjectValue interface{} Label string - Language string } func getUid(xid string) (uint64, error) { @@ -63,11 +62,7 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { } else { result.Value = nq.ObjectValue } - if len(nq.Language) > 0 { - result.Attribute = nq.Predicate + "." + nq.Language - } else { - result.Attribute = nq.Predicate - } + result.Attribute = nq.Predicate result.Source = nq.Label result.Timestamp = time.Now() return result, nil @@ -102,11 +97,7 @@ func (nq NQuad) ToEdgeUsing( } result.ValueId = uid } - if len(nq.Language) > 0 { - result.Attribute = nq.Predicate + "." + nq.Language - } else { - result.Attribute = nq.Predicate - } + result.Attribute = nq.Predicate result.Source = nq.Label result.Timestamp = time.Now() return result, nil @@ -143,7 +134,7 @@ func Parse(line string) (rnq NQuad, rerr error) { oval = item.Val } if item.Typ == itemLanguage { - rnq.Language = item.Val + rnq.Predicate += "." + item.Val } if item.Typ == itemObjectType { // TODO: Strictly parse common types like integers, floats etc. @@ -176,6 +167,7 @@ func Parse(line string) (rnq NQuad, rerr error) { if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil { return rnq, fmt.Errorf("No Object in NQuad") } + return rnq, nil } diff --git a/rdf/parse_test.go b/rdf/parse_test.go index 6c4e6192..21a8f9f5 100644 --- a/rdf/parse_test.go +++ b/rdf/parse_test.go @@ -66,10 +66,9 @@ var testNQuads = []struct { input: `_:alice <name> "Alice In Wonderland"@en-0 .`, nq: NQuad{ Subject: "_:alice", - Predicate: "name", + Predicate: "name.en-0", ObjectId: "", ObjectValue: "Alice In Wonderland", - Language: "en-0", }, }, { @@ -85,10 +84,9 @@ var testNQuads = []struct { input: `<http://www.w3.org/2001/sw/RDFCore/nedges/> <http://purl.org/dc/terms/title> "N-Edges"@en-US .`, nq: NQuad{ Subject: "http://www.w3.org/2001/sw/RDFCore/nedges/", - Predicate: "http://purl.org/dc/terms/title", + Predicate: "http://purl.org/dc/terms/title.en-US", ObjectId: "", ObjectValue: "N-Edges", - Language: "en-US", }, }, { -- GitLab