diff --git a/Dockerfile b/Dockerfile index 51c74a4a3a60444eb6994bfabc16ffa8050dd32d..78de52af01aa2b2079a134f8e6d25c00408e8c5a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # Dockerfile for DGraph -FROM golang:1.5.3 +FROM golang:1.6 MAINTAINER Manish Jain <manishrjain@gmail.com> # Get the necessary packages. @@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Install and set up RocksDB. RUN mkdir /installs && cd /installs && \ - git clone --branch v4.1 https://github.com/facebook/rocksdb.git + git clone --branch v4.2 https://github.com/facebook/rocksdb.git RUN cd /installs/rocksdb && make shared_lib && make install ENV LD_LIBRARY_PATH "/usr/local/lib" diff --git a/conn/pool.go b/conn/pool.go index 40e00e8bb458dd6a61068263a2cfcfabcfd7a93f..dc9289e7957b41e6ac10108f7dc88707cde04195 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -35,7 +35,8 @@ func (p *Pool) dialNew() (*rpc.Client, error) { } var nconn net.Conn var err error - for i := 0; i < 10; i++ { + // This loop will retry for 10 minutes before giving up. + for i := 0; i < 60; i++ { nconn, err = d.Dial("tcp", p.Addr) if err == nil { break diff --git a/posting/list.go b/posting/list.go index 73aa2d2f250fdeea0267d16176e6bbf7266abc82..2038c34755cab0b4dd98b89990f912365b47cb22 100644 --- a/posting/list.go +++ b/posting/list.go @@ -247,7 +247,6 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { if clog == nil { return } - glog.Debug("Starting stream entries...") err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(hdr commit.Header, buffer []byte) { @@ -268,7 +267,6 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { if err != nil { glog.WithError(err).Error("While streaming entries.") } - glog.Debug("Done streaming entries.") } // There's no need for lock acquisition for this. diff --git a/posting/lists.go b/posting/lists.go index fab45d584aaf9a9744dd5283a2e95360988cee85..f346141de23fcf45fa0671cec28b6ce09fcc003f 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -53,10 +53,11 @@ func (mr *mergeRoutines) Add(delta int) { } type counters struct { - ticker *time.Ticker - added uint64 - merged uint64 - clean uint64 + ticker *time.Ticker + added uint64 + merged uint64 + clean uint64 + lastVal uint64 } func (c *counters) periodicLog() { @@ -68,6 +69,13 @@ func (c *counters) periodicLog() { func (c *counters) log() { added := atomic.LoadUint64(&c.added) merged := atomic.LoadUint64(&c.merged) + lastVal := atomic.LoadUint64(&c.lastVal) + if merged == lastVal { + // Ignore. + return + } + atomic.StoreUint64(&c.lastVal, merged) + var pending uint64 if added > merged { pending = added - merged diff --git a/query/query.go b/query/query.go index 0c5c3d92aa84dba998fbfdb93a3aea396f3941ea..0e23c13804cde7ac067e18d3ea4841d36cc55902 100644 --- a/query/query.go +++ b/query/query.go @@ -118,7 +118,6 @@ type SubGraph struct { func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} { switch i1.(type) { case map[string]interface{}: - glog.Debug("Got map[string] interface") m1 := i1.(map[string]interface{}) if m2, ok := i2.(map[string]interface{}); ok { for k1, v1 := range m1 { diff --git a/run.sh b/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..6c658763091e6c82f0ec0802bf1e36c50f6bdcb5 --- /dev/null +++ b/run.sh @@ -0,0 +1,3 @@ +go build . && ./server --instanceIdx 0 --mutations ~/dgraph/m0 --port "8080" --postings ~/dgraph/p0 --workers ":12345,:12346,:12347" --uids ~/dgraph/u0 --workerport ":12345" & +go build . && ./server --instanceIdx 1 --mutations ~/dgraph/m1 --port "8081" --postings ~/dgraph/p1 --workers ":12345,:12346,:12347" --uids ~/dgraph/u1 --workerport ":12346" & +go build . && ./server --instanceIdx 2 --mutations ~/dgraph/m2 --port "8082" --postings ~/dgraph/p2 --workers ":12345,:12346,:12347" --uids ~/dgraph/u2 --workerport ":12347" & diff --git a/worker/task.go b/worker/task.go index bdcdcf4ba534ae0442d6594dacd9e3358750c1b5..f6ce950e141cd9b5e86b254a9070ab2c2f09c6b4 100644 --- a/worker/task.go +++ b/worker/task.go @@ -16,8 +16,6 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { attr := string(q.Attr()) idx := farm.Fingerprint64([]byte(attr)) % numInstances - glog.WithField("idx", idx).WithField("attr", attr). - WithField("numInstances", numInstances).Debug("ProcessTaskOverNetwork") var runHere bool if attr == "_xid_" || attr == "_uid_" { @@ -26,6 +24,9 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { } else { runHere = (instanceIdx == idx) } + glog.WithField("runHere", runHere).WithField("attr", attr). + WithField("instanceIdx", instanceIdx). + WithField("numInstances", numInstances).Info("ProcessTaskOverNetwork") if runHere { // No need for a network call, as this should be run from within @@ -42,7 +43,7 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { glog.WithField("call", "Worker.ServeTask").Fatal(err) } glog.WithField("reply_len", len(reply.Data)).WithField("addr", addr). - Debug("Got reply from server") + WithField("attr", attr).Info("Got reply from server") return reply.Data, nil } diff --git a/worker/worker.go b/worker/worker.go index 6a2871013a90345dada6261802bcf85c02494508..f572caf0092c54f84722ba40880956a45f98f4b2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -126,9 +126,11 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { q := new(task.Query) q.Init(query.Data, uo) attr := string(q.Attr()) - glog.WithField("attr", attr).Debug("ServeTask") + glog.WithField("attr", attr).WithField("instanceIdx", instanceIdx).Info("ServeTask") + + if attr == "_xid_" || + farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { - if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { reply.Data, rerr = processTask(query.Data) } else { glog.WithField("attribute", attr).