diff --git a/conn/client.go b/conn/client.go index 2b668dc05c65cd02f80f01529581d81369589696..27bdd904426bde046b767db6e653d7b29c957430 100644 --- a/conn/client.go +++ b/conn/client.go @@ -42,11 +42,7 @@ func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error { func (c *ClientCodec) ReadResponseBody(body interface{}) error { buf := make([]byte, c.payloadLen) - n, err := c.Rwc.Read(buf) - if n != int(c.payloadLen) { - return fmt.Errorf("ClientCodec expected: %d. Got: %d\n", c.payloadLen, n) - } - + _, err := io.ReadFull(c.Rwc, buf) reply := body.(*Reply) reply.Data = buf return err diff --git a/conn/server.go b/conn/server.go index e3e7c4dba08316abf6c9a933b21c8751fd5a9147..7085d35305d27dac7717f0c03a1aca6e40e4757a 100644 --- a/conn/server.go +++ b/conn/server.go @@ -1,7 +1,6 @@ package conn import ( - "errors" "io" "log" "net/rpc" @@ -18,12 +17,9 @@ func (c *ServerCodec) ReadRequestHeader(r *rpc.Request) error { func (c *ServerCodec) ReadRequestBody(data interface{}) error { b := make([]byte, c.payloadLen) - n, err := c.Rwc.Read(b) + _, err := io.ReadFull(c.Rwc, b) if err != nil { - log.Fatal("server", err) - } - if n != int(c.payloadLen) { - return errors.New("ServerCodec unable to read request.") + return err } if data == nil { diff --git a/loader/loader.go b/loader/loader.go index 0bf9bd3477e82f8072304813fa41a4403b2833a0..dc37c90a6675c6e774be4604baa2fa59ed170240 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -157,6 +157,12 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { if s.Error() != nil { return } + // Only handle this edge if the attribute satisfies the modulo rule + if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances != s.instanceIdx { + atomic.AddUint64(&s.ctr.ignored, 1) + continue + } + edge, err := nq.ToEdge() for err != nil { // Just put in a retry loop to tackle temporary errors. @@ -172,16 +178,10 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { edge, err = nq.ToEdge() } - // Only handle this edge if the attribute satisfies the modulo rule - if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances == - s.instanceIdx { - key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.GetOrCreate(key, dataStore) - plist.AddMutation(edge, posting.Set) - atomic.AddUint64(&s.ctr.processed, 1) - } else { - atomic.AddUint64(&s.ctr.ignored, 1) - } + key := posting.Key(edge.Entity, edge.Attribute) + plist := posting.GetOrCreate(key, dataStore) + plist.AddMutation(edge, posting.Set) + atomic.AddUint64(&s.ctr.processed, 1) } } diff --git a/posting/lists.go b/posting/lists.go index f346141de23fcf45fa0671cec28b6ce09fcc003f..f4522c88d751a467d76f463889fd93ee9c821806 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -128,8 +128,8 @@ func gentlyMerge(mr *mergeRoutines) { ctr := NewCounters() defer ctr.ticker.Stop() - // Pick 5% of the dirty map or 400 keys, whichever is higher. - pick := int(float64(dirtymap.Size()) * 0.05) + // Pick 7% of the dirty map or 400 keys, whichever is higher. + pick := int(float64(dirtymap.Size()) * 0.07) if pick < 400 { pick = 400 } diff --git a/rdf/parse.go b/rdf/parse.go index 3afe48bc1c30d5fd78eadeb0dc4ff3dae0f6d667..485cd8f7e3b456e1f675385b84614afc343301ce 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -33,7 +33,6 @@ type NQuad struct { ObjectId string ObjectValue interface{} Label string - Language string } func getUid(xid string) (uint64, error) { @@ -63,11 +62,7 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) { } else { result.Value = nq.ObjectValue } - if len(nq.Language) > 0 { - result.Attribute = nq.Predicate + "." + nq.Language - } else { - result.Attribute = nq.Predicate - } + result.Attribute = nq.Predicate result.Source = nq.Label result.Timestamp = time.Now() return result, nil @@ -102,11 +97,7 @@ func (nq NQuad) ToEdgeUsing( } result.ValueId = uid } - if len(nq.Language) > 0 { - result.Attribute = nq.Predicate + "." + nq.Language - } else { - result.Attribute = nq.Predicate - } + result.Attribute = nq.Predicate result.Source = nq.Label result.Timestamp = time.Now() return result, nil @@ -143,7 +134,7 @@ func Parse(line string) (rnq NQuad, rerr error) { oval = item.Val } if item.Typ == itemLanguage { - rnq.Language = item.Val + rnq.Predicate += "." + item.Val } if item.Typ == itemObjectType { // TODO: Strictly parse common types like integers, floats etc. @@ -176,6 +167,7 @@ func Parse(line string) (rnq NQuad, rerr error) { if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil { return rnq, fmt.Errorf("No Object in NQuad") } + return rnq, nil } diff --git a/rdf/parse_test.go b/rdf/parse_test.go index 6c4e619291a56539fe9eb649ef15e8cbd8ede2b2..21a8f9f5631dd96e62442e12a3eef3e0c1e40df0 100644 --- a/rdf/parse_test.go +++ b/rdf/parse_test.go @@ -66,10 +66,9 @@ var testNQuads = []struct { input: `_:alice <name> "Alice In Wonderland"@en-0 .`, nq: NQuad{ Subject: "_:alice", - Predicate: "name", + Predicate: "name.en-0", ObjectId: "", ObjectValue: "Alice In Wonderland", - Language: "en-0", }, }, { @@ -85,10 +84,9 @@ var testNQuads = []struct { input: `<http://www.w3.org/2001/sw/RDFCore/nedges/> <http://purl.org/dc/terms/title> "N-Edges"@en-US .`, nq: NQuad{ Subject: "http://www.w3.org/2001/sw/RDFCore/nedges/", - Predicate: "http://purl.org/dc/terms/title", + Predicate: "http://purl.org/dc/terms/title.en-US", ObjectId: "", ObjectValue: "N-Edges", - Language: "en-US", }, }, { diff --git a/server/testrun.sh b/server/testrun.sh index 6c658763091e6c82f0ec0802bf1e36c50f6bdcb5..3259b1c81e9d56095089bad58e222232d99c0c92 100644 --- a/server/testrun.sh +++ b/server/testrun.sh @@ -1,3 +1,3 @@ -go build . && ./server --instanceIdx 0 --mutations ~/dgraph/m0 --port "8080" --postings ~/dgraph/p0 --workers ":12345,:12346,:12347" --uids ~/dgraph/u0 --workerport ":12345" & -go build . && ./server --instanceIdx 1 --mutations ~/dgraph/m1 --port "8081" --postings ~/dgraph/p1 --workers ":12345,:12346,:12347" --uids ~/dgraph/u1 --workerport ":12346" & -go build . && ./server --instanceIdx 2 --mutations ~/dgraph/m2 --port "8082" --postings ~/dgraph/p2 --workers ":12345,:12346,:12347" --uids ~/dgraph/u2 --workerport ":12347" & +go build . && ./server --instanceIdx 0 --mutations ~/dgraph/m0 --port "8080" --postings ~/dgraph/p0 --workers ":12345,:12346,:12347" --uids ~/dgraph/uasync.final --workerport ":12345" & +go build . && ./server --instanceIdx 1 --mutations ~/dgraph/m1 --port "8081" --postings ~/dgraph/p1 --workers ":12345,:12346,:12347" --workerport ":12346" & +go build . && ./server --instanceIdx 2 --mutations ~/dgraph/m2 --port "8082" --postings ~/dgraph/p2 --workers ":12345,:12346,:12347" --workerport ":12347" & diff --git a/store/store.go b/store/store.go index 01f2785f76523c80b912812df4b40aa2848fbd4e..ea44df05a7af289f2373dd941b738f8b2d0debe3 100644 --- a/store/store.go +++ b/store/store.go @@ -40,7 +40,7 @@ func (s *Store) Init(filepath string) { s.ropt = rocksdb.NewReadOptions() s.wopt = rocksdb.NewWriteOptions() - s.wopt.SetSync(true) + s.wopt.SetSync(false) // We don't need to do synchronous writes. var err error s.db, err = rocksdb.Open(filepath, s.opt)