From 85ac49351b4b2c6603ae0a7bb3161ef4e9ba589b Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Fri, 4 Mar 2016 04:38:16 +0000 Subject: [PATCH] Small fixes and log level changes --- Dockerfile | 4 ++-- conn/pool.go | 3 ++- posting/list.go | 2 -- posting/lists.go | 16 ++++++++++++---- query/query.go | 1 - run.sh | 3 +++ worker/task.go | 7 ++++--- worker/worker.go | 6 ++++-- 8 files changed, 27 insertions(+), 15 deletions(-) create mode 100644 run.sh diff --git a/Dockerfile b/Dockerfile index 51c74a4a..78de52af 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # Dockerfile for DGraph -FROM golang:1.5.3 +FROM golang:1.6 MAINTAINER Manish Jain <manishrjain@gmail.com> # Get the necessary packages. @@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Install and set up RocksDB. RUN mkdir /installs && cd /installs && \ - git clone --branch v4.1 https://github.com/facebook/rocksdb.git + git clone --branch v4.2 https://github.com/facebook/rocksdb.git RUN cd /installs/rocksdb && make shared_lib && make install ENV LD_LIBRARY_PATH "/usr/local/lib" diff --git a/conn/pool.go b/conn/pool.go index 40e00e8b..dc9289e7 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -35,7 +35,8 @@ func (p *Pool) dialNew() (*rpc.Client, error) { } var nconn net.Conn var err error - for i := 0; i < 10; i++ { + // This loop will retry for 10 minutes before giving up. + for i := 0; i < 60; i++ { nconn, err = d.Dial("tcp", p.Addr) if err == nil { break diff --git a/posting/list.go b/posting/list.go index 73aa2d2f..2038c347 100644 --- a/posting/list.go +++ b/posting/list.go @@ -247,7 +247,6 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { if clog == nil { return } - glog.Debug("Starting stream entries...") err := clog.StreamEntries(posting.CommitTs()+1, l.hash, func(hdr commit.Header, buffer []byte) { @@ -268,7 +267,6 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { if err != nil { glog.WithError(err).Error("While streaming entries.") } - glog.Debug("Done streaming entries.") } // There's no need for lock acquisition for this. diff --git a/posting/lists.go b/posting/lists.go index fab45d58..f346141d 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -53,10 +53,11 @@ func (mr *mergeRoutines) Add(delta int) { } type counters struct { - ticker *time.Ticker - added uint64 - merged uint64 - clean uint64 + ticker *time.Ticker + added uint64 + merged uint64 + clean uint64 + lastVal uint64 } func (c *counters) periodicLog() { @@ -68,6 +69,13 @@ func (c *counters) periodicLog() { func (c *counters) log() { added := atomic.LoadUint64(&c.added) merged := atomic.LoadUint64(&c.merged) + lastVal := atomic.LoadUint64(&c.lastVal) + if merged == lastVal { + // Ignore. + return + } + atomic.StoreUint64(&c.lastVal, merged) + var pending uint64 if added > merged { pending = added - merged diff --git a/query/query.go b/query/query.go index 0c5c3d92..0e23c138 100644 --- a/query/query.go +++ b/query/query.go @@ -118,7 +118,6 @@ type SubGraph struct { func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} { switch i1.(type) { case map[string]interface{}: - glog.Debug("Got map[string] interface") m1 := i1.(map[string]interface{}) if m2, ok := i2.(map[string]interface{}); ok { for k1, v1 := range m1 { diff --git a/run.sh b/run.sh new file mode 100644 index 00000000..6c658763 --- /dev/null +++ b/run.sh @@ -0,0 +1,3 @@ +go build . && ./server --instanceIdx 0 --mutations ~/dgraph/m0 --port "8080" --postings ~/dgraph/p0 --workers ":12345,:12346,:12347" --uids ~/dgraph/u0 --workerport ":12345" & +go build . && ./server --instanceIdx 1 --mutations ~/dgraph/m1 --port "8081" --postings ~/dgraph/p1 --workers ":12345,:12346,:12347" --uids ~/dgraph/u1 --workerport ":12346" & +go build . && ./server --instanceIdx 2 --mutations ~/dgraph/m2 --port "8082" --postings ~/dgraph/p2 --workers ":12345,:12346,:12347" --uids ~/dgraph/u2 --workerport ":12347" & diff --git a/worker/task.go b/worker/task.go index bdcdcf4b..f6ce950e 100644 --- a/worker/task.go +++ b/worker/task.go @@ -16,8 +16,6 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { attr := string(q.Attr()) idx := farm.Fingerprint64([]byte(attr)) % numInstances - glog.WithField("idx", idx).WithField("attr", attr). - WithField("numInstances", numInstances).Debug("ProcessTaskOverNetwork") var runHere bool if attr == "_xid_" || attr == "_uid_" { @@ -26,6 +24,9 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { } else { runHere = (instanceIdx == idx) } + glog.WithField("runHere", runHere).WithField("attr", attr). + WithField("instanceIdx", instanceIdx). + WithField("numInstances", numInstances).Info("ProcessTaskOverNetwork") if runHere { // No need for a network call, as this should be run from within @@ -42,7 +43,7 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { glog.WithField("call", "Worker.ServeTask").Fatal(err) } glog.WithField("reply_len", len(reply.Data)).WithField("addr", addr). - Debug("Got reply from server") + WithField("attr", attr).Info("Got reply from server") return reply.Data, nil } diff --git a/worker/worker.go b/worker/worker.go index 6a287101..f572caf0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -126,9 +126,11 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) { q := new(task.Query) q.Init(query.Data, uo) attr := string(q.Attr()) - glog.WithField("attr", attr).Debug("ServeTask") + glog.WithField("attr", attr).WithField("instanceIdx", instanceIdx).Info("ServeTask") + + if attr == "_xid_" || + farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { - if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx { reply.Data, rerr = processTask(query.Data) } else { glog.WithField("attribute", attr). -- GitLab