Skip to content
Snippets Groups Projects
Commit d69b6847 authored by Manish R Jain's avatar Manish R Jain
Browse files

Merge pull request #60 from dgraph-io/dmuts

Performance improvements and bug fixes
parents 22299861 e52c71a2
Branches
No related tags found
No related merge requests found
......@@ -42,11 +42,7 @@ func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error {
func (c *ClientCodec) ReadResponseBody(body interface{}) error {
buf := make([]byte, c.payloadLen)
n, err := c.Rwc.Read(buf)
if n != int(c.payloadLen) {
return fmt.Errorf("ClientCodec expected: %d. Got: %d\n", c.payloadLen, n)
}
_, err := io.ReadFull(c.Rwc, buf)
reply := body.(*Reply)
reply.Data = buf
return err
......
package conn
import (
"errors"
"io"
"log"
"net/rpc"
......@@ -18,12 +17,9 @@ func (c *ServerCodec) ReadRequestHeader(r *rpc.Request) error {
func (c *ServerCodec) ReadRequestBody(data interface{}) error {
b := make([]byte, c.payloadLen)
n, err := c.Rwc.Read(b)
_, err := io.ReadFull(c.Rwc, b)
if err != nil {
log.Fatal("server", err)
}
if n != int(c.payloadLen) {
return errors.New("ServerCodec unable to read request.")
return err
}
if data == nil {
......
......@@ -157,6 +157,12 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
if s.Error() != nil {
return
}
// Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(nq.Predicate))%s.numInstances != s.instanceIdx {
atomic.AddUint64(&s.ctr.ignored, 1)
continue
}
edge, err := nq.ToEdge()
for err != nil {
// Just put in a retry loop to tackle temporary errors.
......@@ -172,16 +178,10 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
edge, err = nq.ToEdge()
}
// Only handle this edge if the attribute satisfies the modulo rule
if farm.Fingerprint64([]byte(edge.Attribute))%s.numInstances ==
s.instanceIdx {
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
} else {
atomic.AddUint64(&s.ctr.ignored, 1)
}
key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)
plist.AddMutation(edge, posting.Set)
atomic.AddUint64(&s.ctr.processed, 1)
}
}
......
......@@ -128,8 +128,8 @@ func gentlyMerge(mr *mergeRoutines) {
ctr := NewCounters()
defer ctr.ticker.Stop()
// Pick 5% of the dirty map or 400 keys, whichever is higher.
pick := int(float64(dirtymap.Size()) * 0.05)
// Pick 7% of the dirty map or 400 keys, whichever is higher.
pick := int(float64(dirtymap.Size()) * 0.07)
if pick < 400 {
pick = 400
}
......
......@@ -33,7 +33,6 @@ type NQuad struct {
ObjectId string
ObjectValue interface{}
Label string
Language string
}
func getUid(xid string) (uint64, error) {
......@@ -63,11 +62,7 @@ func (nq NQuad) ToEdge() (result x.DirectedEdge, rerr error) {
} else {
result.Value = nq.ObjectValue
}
if len(nq.Language) > 0 {
result.Attribute = nq.Predicate + "." + nq.Language
} else {
result.Attribute = nq.Predicate
}
result.Attribute = nq.Predicate
result.Source = nq.Label
result.Timestamp = time.Now()
return result, nil
......@@ -102,11 +97,7 @@ func (nq NQuad) ToEdgeUsing(
}
result.ValueId = uid
}
if len(nq.Language) > 0 {
result.Attribute = nq.Predicate + "." + nq.Language
} else {
result.Attribute = nq.Predicate
}
result.Attribute = nq.Predicate
result.Source = nq.Label
result.Timestamp = time.Now()
return result, nil
......@@ -143,7 +134,7 @@ func Parse(line string) (rnq NQuad, rerr error) {
oval = item.Val
}
if item.Typ == itemLanguage {
rnq.Language = item.Val
rnq.Predicate += "." + item.Val
}
if item.Typ == itemObjectType {
// TODO: Strictly parse common types like integers, floats etc.
......@@ -176,6 +167,7 @@ func Parse(line string) (rnq NQuad, rerr error) {
if len(rnq.ObjectId) == 0 && rnq.ObjectValue == nil {
return rnq, fmt.Errorf("No Object in NQuad")
}
return rnq, nil
}
......
......@@ -66,10 +66,9 @@ var testNQuads = []struct {
input: `_:alice <name> "Alice In Wonderland"@en-0 .`,
nq: NQuad{
Subject: "_:alice",
Predicate: "name",
Predicate: "name.en-0",
ObjectId: "",
ObjectValue: "Alice In Wonderland",
Language: "en-0",
},
},
{
......@@ -85,10 +84,9 @@ var testNQuads = []struct {
input: `<http://www.w3.org/2001/sw/RDFCore/nedges/> <http://purl.org/dc/terms/title> "N-Edges"@en-US .`,
nq: NQuad{
Subject: "http://www.w3.org/2001/sw/RDFCore/nedges/",
Predicate: "http://purl.org/dc/terms/title",
Predicate: "http://purl.org/dc/terms/title.en-US",
ObjectId: "",
ObjectValue: "N-Edges",
Language: "en-US",
},
},
{
......
go build . && ./server --instanceIdx 0 --mutations ~/dgraph/m0 --port "8080" --postings ~/dgraph/p0 --workers ":12345,:12346,:12347" --uids ~/dgraph/u0 --workerport ":12345" &
go build . && ./server --instanceIdx 1 --mutations ~/dgraph/m1 --port "8081" --postings ~/dgraph/p1 --workers ":12345,:12346,:12347" --uids ~/dgraph/u1 --workerport ":12346" &
go build . && ./server --instanceIdx 2 --mutations ~/dgraph/m2 --port "8082" --postings ~/dgraph/p2 --workers ":12345,:12346,:12347" --uids ~/dgraph/u2 --workerport ":12347" &
go build . && ./server --instanceIdx 0 --mutations ~/dgraph/m0 --port "8080" --postings ~/dgraph/p0 --workers ":12345,:12346,:12347" --uids ~/dgraph/uasync.final --workerport ":12345" &
go build . && ./server --instanceIdx 1 --mutations ~/dgraph/m1 --port "8081" --postings ~/dgraph/p1 --workers ":12345,:12346,:12347" --workerport ":12346" &
go build . && ./server --instanceIdx 2 --mutations ~/dgraph/m2 --port "8082" --postings ~/dgraph/p2 --workers ":12345,:12346,:12347" --workerport ":12347" &
......@@ -40,7 +40,7 @@ func (s *Store) Init(filepath string) {
s.ropt = rocksdb.NewReadOptions()
s.wopt = rocksdb.NewWriteOptions()
s.wopt.SetSync(true)
s.wopt.SetSync(false) // We don't need to do synchronous writes.
var err error
s.db, err = rocksdb.Open(filepath, s.opt)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment