Skip to content
Snippets Groups Projects
Commit bb8c4080 authored by Manish R Jain's avatar Manish R Jain
Browse files

Handle language when forming NQuad. Move the fp mod earlier in the process, to...

Handle language when forming NQuad. Move the fp mod earlier in the process, to avoid paying for ToEdge() function call.
parent 60abe29e
No related branches found
No related tags found
No related merge requests found
...@@ -145,8 +145,13 @@ func (s *state) parseStream(wg *sync.WaitGroup) { ...@@ -145,8 +145,13 @@ func (s *state) parseStream(wg *sync.WaitGroup) {
s.SetError(err) s.SetError(err)
return return
} }
s.cnq <- nq if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances ==
atomic.AddUint64(&s.ctr.parsed, 1) s.instanceIdx {
s.cnq <- nq
atomic.AddUint64(&s.ctr.parsed, 1)
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
}
} }
} }
...@@ -173,15 +178,14 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { ...@@ -173,15 +178,14 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
} }
// Only handle this edge if the attribute satisfies the modulo rule // Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances == if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances !=
s.instanceIdx { s.instanceIdx {
key := posting.Key(edge.Entity, edge.Attribute) glog.WithField("edge", edge).Fatal("We shouldn't be receiving this edge.")
plist := posting.GetOrCreate(key, dataStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
} }
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
} }
} }
......
...@@ -33,7 +33,6 @@ type NQuad struct { ...@@ -33,7 +33,6 @@ type NQuad struct {
ObjectId string ObjectId string
ObjectValue interface{} ObjectValue interface{}
Label string Label string
Language string
} }
func getUid(xid string) (uint64, error) { func getUid(xid string) (uint64, error) {
...@@ -63,11 +62,7 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { ...@@ -63,11 +62,7 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
} else { } else {
result.Value = nq.ObjectValue result.Value = nq.ObjectValue
} }
if len(nq.Language) > 0 { result.Attribute = nq.Predicate
result.Attribute = nq.Predicate + "." + nq.Language
} else {
result.Attribute = nq.Predicate
}
result.Source = nq.Label result.Source = nq.Label
result.Timestamp = time.Now() result.Timestamp = time.Now()
return result, nil return result, nil
...@@ -102,11 +97,7 @@ func (nq NQuad) ToEdgeUsing( ...@@ -102,11 +97,7 @@ func (nq NQuad) ToEdgeUsing(
} }
result.ValueId = uid result.ValueId = uid
} }
if len(nq.Language) > 0 { result.Attribute = nq.Predicate
result.Attribute = nq.Predicate + "." + nq.Language
} else {
result.Attribute = nq.Predicate
}
result.Source = nq.Label result.Source = nq.Label
result.Timestamp = time.Now() result.Timestamp = time.Now()
return result, nil return result, nil
...@@ -143,7 +134,7 @@ func Parse(line string) (rnq NQuad, rerr error) { ...@@ -143,7 +134,7 @@ func Parse(line string) (rnq NQuad, rerr error) {
oval = item.Val oval = item.Val
} }
if item.Typ == itemLanguage { if item.Typ == itemLanguage {
rnq.Language = item.Val rnq.Predicate += "." + item.Val
} }
if item.Typ == itemObjectType { if item.Typ == itemObjectType {
// TODO: Strictly parse common types like integers, floats etc. // TODO: Strictly parse common types like integers, floats etc.
...@@ -176,6 +167,7 @@ func Parse(line string) (rnq NQuad, rerr error) { ...@@ -176,6 +167,7 @@ func Parse(line string) (rnq NQuad, rerr error) {
if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil { if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil {
return rnq, fmt.Errorf("No Object in NQuad") return rnq, fmt.Errorf("No Object in NQuad")
} }
return rnq, nil return rnq, nil
} }
......
...@@ -66,10 +66,9 @@ var testNQuads = []struct { ...@@ -66,10 +66,9 @@ var testNQuads = []struct {
input: `_:alice <name> "Alice In Wonderland"@en-0 .`, input: `_:alice <name> "Alice In Wonderland"@en-0 .`,
nq: NQuad{ nq: NQuad{
Subject: "_:alice", Subject: "_:alice",
Predicate: "name", Predicate: "name.en-0",
ObjectId: "", ObjectId: "",
ObjectValue: "Alice In Wonderland", ObjectValue: "Alice In Wonderland",
Language: "en-0",
}, },
}, },
{ {
...@@ -85,10 +84,9 @@ var testNQuads = []struct { ...@@ -85,10 +84,9 @@ var testNQuads = []struct {
input: `<http://www.w3.org/2001/sw/RDFCore/nedges/> <http://purl.org/dc/terms/title> "N-Edges"@en-US .`, input: `<http://www.w3.org/2001/sw/RDFCore/nedges/> <http://purl.org/dc/terms/title> "N-Edges"@en-US .`,
nq: NQuad{ nq: NQuad{
Subject: "http://www.w3.org/2001/sw/RDFCore/nedges/", Subject: "http://www.w3.org/2001/sw/RDFCore/nedges/",
Predicate: "http://purl.org/dc/terms/title", Predicate: "http://purl.org/dc/terms/title.en-US",
ObjectId: "", ObjectId: "",
ObjectValue: "N-Edges", ObjectValue: "N-Edges",
Language: "en-US",
}, },
}, },
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment