diff --git a/cluster/predicates.go b/cluster/predicates.go index 2290774262b2c68cff135a96153ef5ba16c56bf3..d50d4e9709ac838fddb89b3c44d0342a56620798 100644 --- a/cluster/predicates.go +++ b/cluster/predicates.go @@ -19,18 +19,16 @@ package cluster import ( "bytes" + "log" "github.com/dgraph-io/dgraph/store" - "github.com/dgraph-io/dgraph/x" ) -var glog = x.Log("cluster") - func getPredicate(b []byte) string { buf := bytes.NewBuffer(b) a, err := buf.ReadString('|') if err != nil { - glog.WithField("byte", b).Fatal("error retreiving predicate") + log.Fatalf("Error retrieving predicates. Byte: %v", b) } str := string(a[:len(a)-1]) // omit the trailing '|' return str diff --git a/cmd/dgraph/main.go b/cmd/dgraph/main.go index ec6a5a6fc8def6b1b2ebcc142f650841ab8bcefd..03a06ffe85066f8e47c18234885b80cd0df24d7b 100644 --- a/cmd/dgraph/main.go +++ b/cmd/dgraph/main.go @@ -21,6 +21,8 @@ import ( "flag" "fmt" "io/ioutil" + "log" + "math/rand" "net" "net/http" "runtime" @@ -28,10 +30,10 @@ import ( "time" "golang.org/x/net/context" + "golang.org/x/net/trace" "google.golang.org/grpc" - "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" @@ -44,8 +46,6 @@ import ( "github.com/dgraph-io/dgraph/x" ) -var glog = x.Log("server") - var postingDir = flag.String("postings", "", "Directory to store posting lists") var uidDir = flag.String("uids", "", "XID UID posting lists directory") var mutationDir = flag.String("mutations", "", "Directory to store mutations") @@ -57,6 +57,7 @@ var instanceIdx = flag.Uint64("instanceIdx", 0, var workers = flag.String("workers", "", "Comma separated list of IP addresses of workers") var nomutations = flag.Bool("nomutations", false, "Don't allow mutations on this server.") +var tracing = flag.Float64("trace", 0.5, "The ratio of queries to trace.") func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -68,7 +69,7 @@ func addCorsHeaders(w http.ResponseWriter) { w.Header().Set("Connection", "close") } -func mutationHandler(mu *gql.Mutation) error { +func mutationHandler(ctx context.Context, mu *gql.Mutation) error { if *nomutations { return fmt.Errorf("Mutations are forbidden on this server.") } @@ -83,7 +84,7 @@ func mutationHandler(mu *gql.Mutation) error { } nq, err := rdf.Parse(ln) if err != nil { - glog.WithError(err).Error("While parsing RDF.") + x.Trace(ctx, "Error while parsing RDF: %v", err) return err } nquads = append(nquads, nq) @@ -99,8 +100,8 @@ func mutationHandler(mu *gql.Mutation) error { } } if len(xidToUid) > 0 { - if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil { - glog.WithError(err).Error("GetOrAssignUidsOverNetwork") + if err := worker.GetOrAssignUidsOverNetwork(ctx, &xidToUid); err != nil { + x.Trace(ctx, "Error while GetOrAssignUidsOverNetwork: %v", err) return err } } @@ -109,21 +110,21 @@ func mutationHandler(mu *gql.Mutation) error { for _, nq := range nquads { edge, err := nq.ToEdgeUsing(xidToUid) if err != nil { - glog.WithField("nquad", nq).WithError(err). - Error("While converting to edge") + x.Trace(ctx, "Error while converting to edge: %v %v", nq, err) return err } edges = append(edges, edge) } - left, err := worker.MutateOverNetwork(edges) + left, err := worker.MutateOverNetwork(ctx, edges) if err != nil { + x.Trace(ctx, "Error while MutateOverNetwork: %v", err) return err } if len(left) > 0 { - glog.WithField("left", len(left)).Error("Some edges couldn't be applied") + x.Trace(ctx, "%d edges couldn't be applied", len(left)) for _, e := range left { - glog.WithField("edge", e).Debug("Unable to apply mutation") + x.Trace(ctx, "Unable to apply mutation for edge: %v", e) } return fmt.Errorf("Unapplied mutations") } @@ -140,28 +141,37 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { return } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + if rand.Float64() < *tracing { + tr := trace.New("Dgraph", "Query") + defer tr.Finish() + ctx = trace.NewContext(ctx, tr) + } + var l query.Latency l.Start = time.Now() defer r.Body.Close() q, err := ioutil.ReadAll(r.Body) if err != nil || len(q) == 0 { - x.Err(glog, err).Error("While reading query") + x.Trace(ctx, "Error while reading query: %v", err) x.SetStatus(w, x.E_INVALID_REQUEST, "Invalid request encountered.") return } - glog.WithField("q", string(q)).Debug("Query received.") + x.Trace(ctx, "Query received: %v", string(q)) gq, mu, err := gql.Parse(string(q)) if err != nil { - x.Err(glog, err).Error("While parsing query") + x.Trace(ctx, "Error while parsing query: %v", err) x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) return } // If we have mutations, run them first. if mu != nil && len(mu.Set) > 0 { - if err = mutationHandler(mu); err != nil { - glog.WithError(err).Error("While handling mutations.") + if err = mutationHandler(ctx, mu); err != nil { + x.Trace(ctx, "Error while handling mutations: %v", err) x.SetStatus(w, x.E_ERROR, err.Error()) return } @@ -172,37 +182,33 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { return } - sg, err := query.ToSubGraph(gq) + sg, err := query.ToSubGraph(ctx, gq) if err != nil { - x.Err(glog, err).Error("While conversion to internal format") + x.Trace(ctx, "Error while conversion to internal format: %v", err) x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) return } l.Parsing = time.Since(l.Start) - glog.WithField("q", string(q)).Debug("Query parsed.") + x.Trace(ctx, "Query parsed") rch := make(chan error) - go query.ProcessGraph(sg, rch, time.Minute) + go query.ProcessGraph(ctx, sg, rch) err = <-rch if err != nil { - x.Err(glog, err).Error("While executing query") + x.Trace(ctx, "Error while executing query: %v", err) x.SetStatus(w, x.E_ERROR, err.Error()) return } l.Processing = time.Since(l.Start) - l.Parsing - glog.WithField("q", string(q)).Debug("Graph processed.") + x.Trace(ctx, "Graph processed") js, err := sg.ToJson(&l) if err != nil { - x.Err(glog, err).Error("While converting to Json.") + x.Trace(ctx, "Error while converting to Json: %v", err) x.SetStatus(w, x.E_ERROR, err.Error()) return } - glog.WithFields(logrus.Fields{ - "total": time.Since(l.Start), - "parsing": l.Parsing, - "process": l.Processing, - "json": l.Json, - }).Info("Query Latencies") + x.Trace(ctx, "Latencies: Total: %v Parsing: %v Process: %v Json: %v", + time.Since(l.Start), l.Parsing, l.Processing, l.Json) w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, string(js)) @@ -215,9 +221,16 @@ type server struct{} // client as a protocol buffer message. func (s *server) Query(ctx context.Context, req *graph.Request) (*graph.Response, error) { + + if rand.Float64() < *tracing { + tr := trace.New("Dgraph", "GrpcQuery") + defer tr.Finish() + ctx = trace.NewContext(ctx, tr) + } + resp := new(graph.Response) if len(req.Query) == 0 { - glog.Error("While reading query") + x.Trace(ctx, "Empty query") return resp, fmt.Errorf("Empty query") } @@ -225,17 +238,17 @@ func (s *server) Query(ctx context.Context, l.Start = time.Now() // TODO(pawan): Refactor query parsing and graph processing code to a common // function used by Query and queryHandler - glog.WithField("q", req.Query).Debug("Query received.") + x.Trace(ctx, "Query received: %v", req.Query) gq, mu, err := gql.Parse(req.Query) if err != nil { - x.Err(glog, err).Error("While parsing query") + x.Trace(ctx, "Error while parsing query: %v", err) return resp, err } // If we have mutations, run them first. if mu != nil && len(mu.Set) > 0 { - if err = mutationHandler(mu); err != nil { - glog.WithError(err).Error("While handling mutations.") + if err = mutationHandler(ctx, mu); err != nil { + x.Trace(ctx, "Error while handling mutations: %v", err) return resp, err } } @@ -244,27 +257,27 @@ func (s *server) Query(ctx context.Context, return resp, err } - sg, err := query.ToSubGraph(gq) + sg, err := query.ToSubGraph(ctx, gq) if err != nil { - x.Err(glog, err).Error("While conversion to internal format") + x.Trace(ctx, "Error while conversion to internal format: %v", err) return resp, err } l.Parsing = time.Since(l.Start) - glog.WithField("q", req.Query).Debug("Query parsed.") + x.Trace(ctx, "Query parsed") rch := make(chan error) - go query.ProcessGraph(sg, rch, time.Minute) + go query.ProcessGraph(ctx, sg, rch) err = <-rch if err != nil { - x.Err(glog, err).Error("While executing query") + x.Trace(ctx, "Error while executing query: %v", err) return resp, err } l.Processing = time.Since(l.Start) - l.Parsing - glog.WithField("q", req.Query).Debug("Graph processed.") + x.Trace(ctx, "Graph processed") node, err := sg.ToProtocolBuffer(&l) if err != nil { - x.Err(glog, err).Error("While converting to protocol buffer.") + x.Trace(ctx, "Error while converting to ProtocolBuffer: %v", err) return resp, err } resp.N = node @@ -282,15 +295,15 @@ func (s *server) Query(ctx context.Context, func runGrpcServer(address string) { ln, err := net.Listen("tcp", address) if err != nil { - glog.Fatalf("While running server for client: %v", err) + log.Fatalf("While running server for client: %v", err) return } - glog.WithField("address", ln.Addr()).Info("Client Worker listening") + log.Printf("Client worker listening: %v", ln.Addr()) s := grpc.NewServer() graph.RegisterDGraphServer(s, &server{}) if err = s.Serve(ln); err != nil { - glog.Fatalf("While serving gRpc requests", err) + log.Fatalf("While serving gRpc requests", err) } return } @@ -298,15 +311,13 @@ func runGrpcServer(address string) { func main() { flag.Parse() if !flag.Parsed() { - glog.Fatal("Unable to parse flags") + log.Fatal("Unable to parse flags") } - logrus.SetLevel(logrus.InfoLevel) numCpus := *numcpu prev := runtime.GOMAXPROCS(numCpus) - glog.WithField("num_cpu", numCpus).WithField("prev_maxprocs", prev). - Info("Set max procs to num cpus") + log.Printf("num_cpu: %v. prev_maxprocs: %v. Set max procs to num cpus", numCpus, prev) if *port%2 != 0 { - glog.Fatalf("Port should be an even number: %v", *port) + log.Fatalf("Port should be an even number: %v", *port) } ps := new(store.Store) @@ -343,8 +354,8 @@ func main() { go runGrpcServer(fmt.Sprintf(":%d", *port+1)) http.HandleFunc("/query", queryHandler) - glog.WithField("port", *port).Info("Listening for requests...") + log.Printf("Listening for requests at port: %v", *port) if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil { - x.Err(glog, err).Fatal("ListenAndServe") + log.Fatalf("ListenAndServe: %v", err) } } diff --git a/cmd/dgraph/main_test.go b/cmd/dgraph/main_test.go index 6a9fc921814452593450a063bf1e946e86aaa18b..6e561e19dca32bdfbffff79321886c53069360e0 100644 --- a/cmd/dgraph/main_test.go +++ b/cmd/dgraph/main_test.go @@ -21,7 +21,8 @@ import ( "io/ioutil" "os" "testing" - "time" + + "golang.org/x/net/context" "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/gql" @@ -115,7 +116,8 @@ func TestQuery(t *testing.T) { t.Error(err) return } - g, err := query.ToSubGraph(gq) + ctx := context.Background() + g, err := query.ToSubGraph(ctx, gq) if err != nil { t.Error(err) return @@ -159,7 +161,7 @@ func TestQuery(t *testing.T) { } ch := make(chan error) - go query.ProcessGraph(g, ch, time.Minute) + go query.ProcessGraph(ctx, g, ch) if err := <-ch; err != nil { t.Error(err) return @@ -211,14 +213,15 @@ func BenchmarkQuery(b *testing.B) { b.Error(err) return } - g, err := query.ToSubGraph(gq) + ctx := context.Background() + g, err := query.ToSubGraph(ctx, gq) if err != nil { b.Error(err) return } ch := make(chan error) - go query.ProcessGraph(g, ch, time.Minute) + go query.ProcessGraph(ctx, g, ch) if err := <-ch; err != nil { b.Error(err) return diff --git a/commit/log.go b/commit/log.go index 6fec71e6ff5fa6b5b34f606c416890b2d65a37f7..e408647ea438d559dec9c7ac413e59fec0d9b0ed 100644 --- a/commit/log.go +++ b/commit/log.go @@ -27,6 +27,7 @@ import ( "encoding/binary" "fmt" "io" + "log" "os" "path/filepath" "runtime/debug" @@ -38,13 +39,11 @@ import ( "time" "unsafe" - "github.com/Sirupsen/logrus" - "github.com/dgraph-io/dgraph/x" + "golang.org/x/net/trace" + "github.com/willf/bloom" ) -var glog = x.Log("commitlog") - type logFile struct { sync.RWMutex endTs int64 // never modified after creation. @@ -69,8 +68,7 @@ func (lf *logFile) FillIfEmpty(wg *sync.WaitGroup) { } cache := new(Cache) if err := FillCache(cache, lf.path); err != nil { - glog.WithError(err).WithField("path", lf.path). - Fatal("Unable to fill cache.") + log.Fatalf("Unable to fill cache for path: %v. Err: %v", lf.path, err) } // No need to acquire lock on cache, because it just // got created. @@ -85,7 +83,7 @@ func createAndUpdateBloomFilter(cache *Cache) { if err := streamEntries(cache, 0, 0, func(hdr Header, record []byte) { hashes = append(hashes, hdr.hash) }); err != nil { - glog.WithError(err).Fatal("Unable to create bloom filters.") + log.Fatalf("Unable to create bloom filters: %v", err) } n := 100000 @@ -156,6 +154,8 @@ type Logger struct { cf *CurFile lastLogTs int64 // handled via atomics. ticker *time.Ticker + + events trace.EventLog } func (l *Logger) curFile() *CurFile { @@ -190,11 +190,11 @@ func (l *Logger) DeleteCacheOlderThan(v time.Duration) { } func (l *Logger) periodicSync() { - glog.WithField("dur", l.SyncDur).Debug("Periodic sync.") if l.SyncDur == 0 { - glog.Debug("No Periodic Sync for commit log.") + l.events.Printf("No Periodic Sync for commit log.") return } + l.events.Printf("Periodic Sync at duration: %v", l.SyncDur) l.ticker = time.NewTicker(l.SyncDur) for _ = range l.ticker.C { @@ -207,13 +207,13 @@ func (l *Logger) periodicSync() { cf.Lock() if cf.dirtyLogs > 0 { if err := cf.f.Sync(); err != nil { - glog.WithError(err).Error("While periodically syncing.") + l.events.Errorf("While periodically syncing: %v", err) } else { cf.dirtyLogs = 0 - glog.Debug("Successful periodic sync.") + l.events.Printf("Successful periodic sync.") } } else { - glog.Debug("Skipping periodic sync.") + l.events.Printf("Skipping periodic sync.") } cf.Unlock() } @@ -229,10 +229,11 @@ func (l *Logger) Close() { } if l.cf != nil { if err := l.cf.f.Close(); err != nil { - glog.WithError(err).Error("While closing current file.") + l.events.Errorf("Error while closing current file: %v", err) } l.cf = nil } + l.events.Finish() } func NewLogger(dir string, fileprefix string, maxSize int64) *Logger { @@ -240,6 +241,7 @@ func NewLogger(dir string, fileprefix string, maxSize int64) *Logger { l.dir = dir l.filePrefix = fileprefix l.maxSize = maxSize + l.events = trace.NewEventLog("commit", "Logger") return l } @@ -256,7 +258,7 @@ func (l *Logger) handleFile(path string, info os.FileInfo, err error) error { } lidx := strings.LastIndex(info.Name(), ".log") tstring := info.Name()[len(l.filePrefix)+1 : lidx] - glog.WithField("log_ts", tstring).Debug("Found log.") + l.events.Printf("Found log with ts: %v", tstring) // Handle if we find the current log file. if tstring == "current" { @@ -280,12 +282,12 @@ func (l *Logger) Init() { l.Lock() defer l.Unlock() - glog.Debug("Logger init started.") + l.events.Printf("Logger init started") { // Checking if the directory exists. if _, err := os.Stat(l.dir); err != nil { if os.IsNotExist(err) { - glog.WithError(err).Fatal("Unable to find dir.") + log.Fatalf("Unable to find dir: %v", err) } } // First check if we have a current file. @@ -298,13 +300,13 @@ func (l *Logger) Init() { l.cf.dirtyLogs = 0 cache := new(Cache) if ferr := FillCache(cache, path); ferr != nil { - glog.WithError(ferr).Fatal("Unable to write to cache.") + log.Fatalf("Unable to write to cache: %v", ferr) } createAndUpdateBloomFilter(cache) atomic.StorePointer(&l.cf.cch, unsafe.Pointer(cache)) lastTs, err := lastTimestamp(cache) if err != nil { - glog.WithError(err).Fatal("Unable to read last log timestamp.") + log.Fatalf("Unable to read last log ts: %v", err) } l.updateLastLogTs(lastTs) @@ -312,20 +314,20 @@ func (l *Logger) Init() { l.cf.f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, os.FileMode(0644)) if err != nil { - glog.WithError(err).Fatal("Unable to open current file in append mode.") + log.Fatalf("Unable to open current file in append mode: %v", err) } } } if err := filepath.Walk(l.dir, l.handleFile); err != nil { - glog.WithError(err).Fatal("While walking over directory") + log.Fatal("While walking over directory: %v", err) } sort.Sort(ByTimestamp(l.list)) if l.cf == nil { l.createNew() } go l.periodicSync() - glog.Debug("Logger init finished.") + l.events.Printf("Logger init finished.") } func (l *Logger) filepath(ts int64) string { @@ -346,7 +348,6 @@ func parseHeader(hdr []byte) (Header, error) { setError(&err, binary.Read(buf, binary.LittleEndian, &h.hash)) setError(&err, binary.Read(buf, binary.LittleEndian, &h.size)) if err != nil { - glog.WithError(err).Error("While parsing header.") return h, err } return h, nil @@ -363,10 +364,9 @@ func lastTimestamp(c *Cache) (int64, error) { break } if n < len(header) { - glog.WithField("n", n).Fatal("Unable to read the full 16 byte header.") + log.Fatalf("Unable to read full 16 byte header. Read %v", n) } if err != nil { - glog.WithError(err).Error("While reading header.") return 0, err } count += 1 @@ -379,11 +379,8 @@ func lastTimestamp(c *Cache) (int64, error) { maxTs = h.ts } else if h.ts < maxTs { - glog.WithFields(logrus.Fields{ - "ts": h.ts, - "maxts": maxTs, - "numrecords": count, - }).Fatal("Log file doesn't have monotonically increasing records.") + log.Fatalf("Log file doesn't have monotonically increasing records."+ + " ts: %v. maxts: %v. numrecords: %v", h.ts, maxTs, count) } reader.Discard(int(h.size)) } @@ -411,8 +408,7 @@ func (l *Logger) rotateCurrent() error { return err } if err := os.Rename(cf.f.Name(), newpath); err != nil { - glog.WithError(err).WithField("curfile", l.cf.f.Name()). - WithField("newfile", newpath).Error("While renaming.") + l.events.Errorf("Error while renaming: %v", err) return err } @@ -432,12 +428,12 @@ func (l *Logger) rotateCurrent() error { func (l *Logger) createNew() { path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix)) if err := os.MkdirAll(l.dir, 0744); err != nil { - glog.WithError(err).Fatal("Unable to create directory.") + log.Fatalf("Unable to create directory: %v", err) } f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(0644)) if err != nil { - glog.WithError(err).Fatal("Unable to create a new file.") + log.Fatalf("Unable to create a new file: %v", err) } l.cf = new(CurFile) l.cf.f = f @@ -457,14 +453,14 @@ func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) { lbuf := int64(len(value)) + 16 if l.curFile().Size()+lbuf > l.maxSize { if err := l.rotateCurrent(); err != nil { - glog.WithError(err).Error("While rotating current file out.") + l.events.Errorf("Error while rotating current file out: %v", err) return 0, err } } cf := l.curFile() if cf == nil { - glog.Fatalf("Current file isn't initialized.") + log.Fatalf("Current file isn't initialized.") } cf.Lock() @@ -487,14 +483,13 @@ func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) { if err != nil { return ts, err } - glog.WithField("bytes", buf.Len()).Debug("Log entry buffer.") if _, err = cf.f.Write(buf.Bytes()); err != nil { - glog.WithError(err).Error("While writing to current file.") + l.events.Errorf("Error while writing to current file: %v", err) return ts, err } if _, err = cf.cache().Write(hash, buf.Bytes()); err != nil { - glog.WithError(err).Error("While writing to current cache.") + l.events.Errorf("Error while writing to current cache: %v", err) return ts, err } cf.dirtyLogs += 1 @@ -502,7 +497,7 @@ func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) { l.updateLastLogTs(ts) if l.SyncEvery <= 0 || cf.dirtyLogs >= l.SyncEvery { cf.dirtyLogs = 0 - glog.Debug("Syncing file") + l.events.Printf("Syncing file") return ts, cf.f.Sync() } return ts, nil @@ -513,23 +508,20 @@ func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) { func streamEntries(cache *Cache, afterTs int64, hash uint32, iter LogIterator) error { - flog := glog reader := NewReader(cache) header := make([]byte, 16) for { _, err := reader.Read(header) if err == io.EOF { - flog.Debug("Cache read complete.") break } if err != nil { - flog.WithError(err).Fatal("While reading header.") + log.Fatalf("While reading header: %v", err) return err } hdr, err := parseHeader(header) if err != nil { - flog.WithError(err).Error("While parsing header.") return err } @@ -539,7 +531,7 @@ func streamEntries(cache *Cache, data := make([]byte, hdr.size) _, err := reader.Read(data) if err != nil { - flog.WithError(err).Fatal("While reading data.") + log.Fatalf("While reading data: %v", err) return err } iter(hdr, data) @@ -586,7 +578,7 @@ func (l *Logger) StreamEntries(afterTs int64, hash uint32, } for _, cache := range caches { if cache == nil { - glog.Error("Cache is nil") + l.events.Errorf("Cache is nil") continue } if err := streamEntries(cache, afterTs, hash, iter); err != nil { diff --git a/commit/log_test.go b/commit/log_test.go index 532d53b1bcc941302619f03bdedb89a298ca2ab6..2eeded917982b5fe3b5e3fe6e53fe28edf2fe599 100644 --- a/commit/log_test.go +++ b/commit/log_test.go @@ -75,7 +75,6 @@ func TestAddLog(t *testing.T) { time.Sleep(500 * time.Microsecond) } - glog.Debugf("Test curfile path: %v", l.cf.f.Name()) _, err = lastTimestamp(l.cf.cache()) if err != nil { t.Error(err) diff --git a/gql/parser.go b/gql/parser.go index 1e9d86438614b99197fa218bb308a38b376696c0..87ce7580dd05e14bb9939677ed53db962dc7a498 100644 --- a/gql/parser.go +++ b/gql/parser.go @@ -19,14 +19,12 @@ package gql import ( "errors" "fmt" + "log" "strconv" "github.com/dgraph-io/dgraph/lex" - "github.com/dgraph-io/dgraph/x" ) -var glog = x.Log("gql") - // GraphQuery stores the parsed Query in a tree format. This gets // converted to internally used query.SubGraph before processing the query. type GraphQuery struct { @@ -80,7 +78,7 @@ func Parse(input string) (gq *GraphQuery, mu *Mutation, rerr error) { if gq == nil { gq, rerr = getRoot(l) if rerr != nil { - x.Err(glog, rerr).Error("While retrieving subgraph root") + log.Printf("Error while retrieving subgraph root: %v", rerr) return nil, nil, rerr } } else { diff --git a/gql/state.go b/gql/state.go index d469496706350478f7ce86411abe14745b284b34..1e0af8950eed0119b36a38f7b7cb016389d42a8b 100644 --- a/gql/state.go +++ b/gql/state.go @@ -16,7 +16,11 @@ package gql -import "github.com/dgraph-io/dgraph/lex" +import ( + "log" + + "github.com/dgraph-io/dgraph/lex" +) const ( leftCurl = '{' @@ -264,7 +268,7 @@ func lexArgVal(l *lex.Lexer) lex.StateFn { l.Input[l.Start:l.Pos]) } } - glog.Fatal("This shouldn't be reached.") + log.Fatal("This shouldn't be reached.") return nil } diff --git a/lex/lexer.go b/lex/lexer.go index cb02af83757a8a244213ec7a5c0ed74639d48bcb..dc6db4c5a63c5fbe56a1ee75ef523028a1faca94 100644 --- a/lex/lexer.go +++ b/lex/lexer.go @@ -19,12 +19,8 @@ package lex import ( "fmt" "unicode/utf8" - - "github.com/dgraph-io/dgraph/x" ) -var glog = x.Log("lexer") - const EOF = -1 // ItemType is used to set the type of a token. These constants can be defined diff --git a/loader/loader.go b/loader/loader.go index 270d57ed7f8c20f60e51a7ab8fed85a7058b2833..97fecd86f1153d0e9fba308ad7c1ddd8e56b61c8 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "time" + "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/rdf" @@ -153,6 +155,7 @@ func (s *state) parseStream(wg *sync.WaitGroup) { func (s *state) handleNQuads(wg *sync.WaitGroup) { defer wg.Done() + ctx := context.Background() for nq := range s.cnq { if s.Error() != nil { return @@ -180,7 +183,7 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { key := posting.Key(edge.Entity, edge.Attribute) plist := posting.GetOrCreate(key, dataStore) - plist.AddMutation(edge, posting.Set) + plist.AddMutation(ctx, edge, posting.Set) atomic.AddUint64(&s.ctr.processed, 1) } } diff --git a/posting/list.go b/posting/list.go index 7b695a9e0554be5df2823a3602741fc12cbf1cf2..ff5f57b4b321f80ba8cdb33090e6c1dc38e75a11 100644 --- a/posting/list.go +++ b/posting/list.go @@ -20,13 +20,15 @@ import ( "bytes" "encoding/binary" "fmt" + "log" "math" "sync" "sync/atomic" "time" "unsafe" - "github.com/Sirupsen/logrus" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" @@ -36,7 +38,6 @@ import ( "github.com/zond/gotomic" ) -var glog = x.Log("posting") var E_TMP_ERROR = fmt.Errorf("Temporary Error. Please retry.") const Set = 0x01 @@ -106,7 +107,7 @@ func Key(uid uint64, attr string) []byte { buf := bytes.NewBufferString(attr) buf.WriteRune('|') if err := binary.Write(buf, binary.LittleEndian, uid); err != nil { - glog.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid) + log.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid) } return buf.Bytes() } @@ -116,7 +117,7 @@ func newPosting(t x.DirectedEdge, op byte) []byte { var bo flatbuffers.UOffsetT if !bytes.Equal(t.Value, nil) { if t.ValueId != math.MaxUint64 { - glog.Fatal("This should have already been set by the caller.") + log.Fatal("This should have already been set by the caller.") } bo = b.CreateByteVector(t.Value) } @@ -141,7 +142,7 @@ func addEdgeToPosting(b *flatbuffers.Builder, var bo flatbuffers.UOffsetT if !bytes.Equal(t.Value, nil) { if t.ValueId != math.MaxUint64 { - glog.Fatal("This should have already been set by the caller.") + log.Fatal("This should have already been set by the caller.") } bo = b.CreateByteVector(t.Value) } @@ -197,9 +198,6 @@ func init() { b.Finish(of) emptyPosting = b.Bytes[b.Head():] } - - glog.Infof("Empty size: [%d] EmptyPosting size: [%d]", - len(empty), len(emptyPosting)) } func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { @@ -208,7 +206,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { defer l.wg.Done() if len(empty) == 0 { - glog.Fatal("empty should have some bytes.") + log.Fatal("empty should have some bytes.") } l.key = key l.pstore = pstore @@ -233,15 +231,10 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { if m.Ts() > l.maxMutationTs { l.maxMutationTs = m.Ts() } - glog.WithFields(logrus.Fields{ - "uid": m.Uid(), - "source": string(m.Source()), - "ts": m.Ts(), - }).Debug("Got entry from log") l.mergeMutation(m) }) if err != nil { - glog.WithError(err).Error("While streaming entries.") + log.Fatalf("Error while streaming entries: %v", err) } } @@ -254,7 +247,6 @@ func (l *List) getPostingList() *types.PostingList { nbuf := new(buffer) var err error if nbuf.d, err = l.pstore.Get(l.key); err != nil { - // glog.Debugf("While retrieving posting list from db: %v\n", err) // Error. Just set to empty. nbuf.d = make([]byte, len(empty)) copy(nbuf.d, empty) @@ -280,7 +272,7 @@ func (l *List) lePostingIndex(maxUid uint64) (int, uint64) { for left <= right { pos := (left + right) / 2 if ok := posting.Postings(p, pos); !ok { - glog.WithField("idx", pos).Fatal("Unable to parse posting from list.") + log.Fatalf("Unable to parse posting from list idx: %v", pos) } val := p.Uid() if val > maxUid { @@ -297,7 +289,7 @@ func (l *List) lePostingIndex(maxUid uint64) (int, uint64) { return -1, 0 } if ok := posting.Postings(p, sofar); !ok { - glog.WithField("idx", sofar).Fatal("Unable to parse posting from list.") + log.Fatalf("Unable to parse posting from list idx: %v", sofar) } return sofar, p.Uid() } @@ -335,8 +327,6 @@ func (l *List) mindexInsertAt(mlink *MutationLink, mi int) { } func (l *List) mindexDeleteAt(mi int) { - glog.WithField("mi", mi).WithField("size", len(l.mindex)). - Debug("mindexDeleteAt") l.mindex = append(l.mindex[:mi], l.mindex[mi+1:]...) for i := mi; i < len(l.mindex); i++ { l.mindex[i].idx -= 1 @@ -469,7 +459,7 @@ func (l *List) mergeMutation(mp *types.Posting) { } } else { - glog.WithField("op", mp.Op()).Fatal("Invalid operation.") + log.Fatalf("Invalid operation: %v", mp.Op()) } } @@ -519,10 +509,6 @@ func (l *List) get(p *types.Posting, i int) bool { // Found an instruction. Check what is says. if mlink.posting.Op() == Set { // ADD - glog.WithField("idx", i). - WithField("uid", mlink.posting.Uid()). - WithField("source", string(mlink.posting.Source())). - Debug("Returning from mlink") *p = *mlink.posting return true @@ -532,18 +518,13 @@ func (l *List) get(p *types.Posting, i int) bool { // variable. } else { - glog.Fatal("Someone, I mean you, forgot to tackle" + + log.Fatal("Someone, I mean you, forgot to tackle" + " this operation. Stop drinking.") } } move += mlink.moveidx } newidx := i + move - glog.WithFields(logrus.Fields{ - "newidx": newidx, - "idx": i, - "move": move, - }).Debug("Final Indices") // Check if we have any replace instruction in mlayer. if val, ok := l.mlayer[newidx]; ok { @@ -577,9 +558,10 @@ func (l *List) SetForDeletion() { // BenchmarkAddMutations_SyncEvery100LogEntry-6 10000 298352 ns/op // BenchmarkAddMutations_SyncEvery1000LogEntry-6 30000 63544 ns/op // ok github.com/dgraph-io/dgraph/posting 10.291s -func (l *List) AddMutation(t x.DirectedEdge, op byte) error { +func (l *List) AddMutation(ctx context.Context, t x.DirectedEdge, op byte) error { l.wg.Wait() if atomic.LoadInt32(&l.deleteMe) == 1 { + x.Trace(ctx, "DELETEME set to true. Temporary error.") return E_TMP_ERROR } @@ -589,6 +571,7 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { t.ValueId = math.MaxUint64 } if t.ValueId == 0 { + x.Trace(ctx, "ValueId cannot be zero") return fmt.Errorf("ValueId cannot be zero.") } mbuf := newPosting(t, op) @@ -597,6 +580,7 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { if l.clog != nil { ts, err = l.clog.AddLog(l.hash, mbuf) if err != nil { + x.Trace(ctx, "Error while adding log: %v", err) return err } } @@ -606,19 +590,15 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { // a) check if the entity exists in main posting list. // - If yes, store the mutation. // - If no, disregard this mutation. + x.Trace(ctx, "Acquiring lock") l.Lock() defer l.Unlock() + x.Trace(ctx, "Lock acquired") uo := flatbuffers.GetUOffsetT(mbuf) mpost := new(types.Posting) mpost.Init(mbuf, uo) - glog.WithFields(logrus.Fields{ - "uid": mpost.Uid(), - "source": string(mpost.Source()), - "ts": mpost.Ts(), - }).Debug("Add mutation") - l.mergeMutation(mpost) if len(l.mindex)+len(l.mlayer) > 0 { atomic.StoreInt64(&l.dirtyTs, time.Now().UnixNano()) @@ -627,15 +607,16 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { } } l.maxMutationTs = ts + x.Trace(ctx, "Mutation done") return nil } -func (l *List) MergeIfDirty() (merged bool, err error) { +func (l *List) MergeIfDirty(ctx context.Context) (merged bool, err error) { if atomic.LoadInt64(&l.dirtyTs) == 0 { - glog.WithField("dirty", false).Debug("Not Committing") + x.Trace(ctx, "Not committing") return false, nil } else { - glog.WithField("dirty", true).Debug("Committing") + x.Trace(ctx, "Committing") } return l.merge() } @@ -656,7 +637,7 @@ func (l *List) merge() (merged bool, rerr error) { offsets := make([]flatbuffers.UOffsetT, sz) for i := 0; i < sz; i++ { if ok := l.get(&p, i); !ok { - glog.WithField("idx", i).Fatal("Unable to parse posting.") + log.Fatalf("Idx: %d. Unable to parse posting.", i) } offsets[i] = addPosting(b, p) } @@ -673,7 +654,7 @@ func (l *List) merge() (merged bool, rerr error) { b.Finish(end) if err := l.pstore.SetOne(l.key, b.Bytes[b.Head():]); err != nil { - glog.WithField("error", err).Fatal("While storing posting list") + log.Fatalf("Error while storing posting list: %v", err) return true, err } @@ -699,7 +680,7 @@ func (l *List) GetUids(offset, count int) []uint64 { defer l.RUnlock() if offset < 0 { - glog.WithField("offset", offset).Fatal("Unexpected offset") + log.Fatalf("Unexpected offset: %v", offset) return make([]uint64, 0) } diff --git a/posting/list_test.go b/posting/list_test.go index d17837663feade955bf98f711ee942b276b29be0..6901731bf0ff8d10e2599854ce221790b92c8031 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io/ioutil" + "log" "math" "math/rand" "os" @@ -27,6 +28,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" @@ -76,7 +79,6 @@ func checkUids(t *testing.T, l *List, uids ...uint64) error { } func TestAddMutation(t *testing.T) { - // logrus.SetLevel(logrus.DebugLevel) l := NewList() key := Key(1, "name") dir, err := ioutil.TempDir("", "storetest_") @@ -100,7 +102,8 @@ func TestAddMutation(t *testing.T) { Source: "testing", Timestamp: time.Now(), } - if err := l.AddMutation(edge, Set); err != nil { + ctx := context.Background() + if err := l.AddMutation(ctx, edge, Set); err != nil { t.Error(err) } /* @@ -127,7 +130,7 @@ func TestAddMutation(t *testing.T) { // Add another edge now. edge.ValueId = 81 - l.AddMutation(edge, Set) + l.AddMutation(ctx, edge, Set) // l.CommitIfDirty() if l.Length() != 2 { t.Errorf("Length: %d", l.Length()) @@ -152,7 +155,7 @@ func TestAddMutation(t *testing.T) { 9, 49, 81, } edge.ValueId = 49 - if err := l.AddMutation(edge, Set); err != nil { + if err := l.AddMutation(ctx, edge, Set); err != nil { t.Error(err) } /* @@ -167,18 +170,18 @@ func TestAddMutation(t *testing.T) { // Delete an edge, add an edge, replace an edge edge.ValueId = 49 - if err := l.AddMutation(edge, Del); err != nil { + if err := l.AddMutation(ctx, edge, Del); err != nil { t.Error(err) } edge.ValueId = 69 - if err := l.AddMutation(edge, Set); err != nil { + if err := l.AddMutation(ctx, edge, Set); err != nil { t.Error(err) } edge.ValueId = 9 edge.Source = "anti-testing" - if err := l.AddMutation(edge, Set); err != nil { + if err := l.AddMutation(ctx, edge, Set); err != nil { t.Error(err) } /* @@ -209,7 +212,7 @@ func TestAddMutation(t *testing.T) { t.Error(err) } - if _, err := dl.MergeIfDirty(); err != nil { + if _, err := dl.MergeIfDirty(ctx); err != nil { t.Error(err) } if err := checkUids(t, dl, uids...); err != nil { @@ -218,8 +221,6 @@ func TestAddMutation(t *testing.T) { } func TestAddMutation_Value(t *testing.T) { - // logrus.SetLevel(logrus.DebugLevel) - glog.Debug("Running init...") ol := NewList() key := Key(10, "value") dir, err := ioutil.TempDir("", "storetest_") @@ -237,14 +238,15 @@ func TestAddMutation_Value(t *testing.T) { defer clog.Close() ol.init(key, ps, clog) - glog.Debug("Init successful.") + log.Println("Init successful.") edge := x.DirectedEdge{ Value: []byte("oh hey there"), Source: "new-testing", Timestamp: time.Now(), } - if err := ol.AddMutation(edge, Set); err != nil { + ctx := context.Background() + if err := ol.AddMutation(ctx, edge, Set); err != nil { t.Error(err) } var p types.Posting @@ -257,7 +259,7 @@ func TestAddMutation_Value(t *testing.T) { } // Run the same check after committing. - if _, err := ol.MergeIfDirty(); err != nil { + if _, err := ol.MergeIfDirty(ctx); err != nil { t.Error(err) } { @@ -272,7 +274,7 @@ func TestAddMutation_Value(t *testing.T) { // The value made it to the posting list. Changing it now. edge.Value = []byte(strconv.Itoa(119)) - if err := ol.AddMutation(edge, Set); err != nil { + if err := ol.AddMutation(ctx, edge, Set); err != nil { t.Error(err) } if ol.Length() != 1 { @@ -312,13 +314,14 @@ func benchmarkAddMutations(n int, b *testing.B) { b.ResetTimer() ts := time.Now() + ctx := context.Background() for i := 0; i < b.N; i++ { edge := x.DirectedEdge{ ValueId: uint64(rand.Intn(b.N) + 1), Source: "testing", Timestamp: ts.Add(time.Microsecond), } - if err := l.AddMutation(edge, Set); err != nil { + if err := l.AddMutation(ctx, edge, Set); err != nil { b.Error(err) } } diff --git a/posting/lists.go b/posting/lists.go index f034da29acb4e25c9a688490b5e8c62e6a06f014..32487ac21bdac376d4c01f44b8c99180e90a7921 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,6 +18,7 @@ package posting import ( "flag" + "log" "math/rand" "runtime" "runtime/debug" @@ -25,7 +26,8 @@ import ( "sync/atomic" "time" - "github.com/Sirupsen/logrus" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/store" "github.com/dgryski/go-farm" @@ -81,14 +83,10 @@ func (c *counters) log() { pending = added - merged } - glog.WithFields(logrus.Fields{ - "added": added, - "merged": merged, - "clean": atomic.LoadUint64(&c.clean), - "pending": pending, - "mapsize": lhmap.Size(), - "dirtysize": dirtymap.Size(), - }).Info("List Merge counters") + log.Printf("List merge counters. added: %d merged: %d clean: %d"+ + " pending: %d mapsize: %d dirtysize: %d\n", + added, merged, atomic.LoadUint64(&c.clean), + pending, lhmap.Size(), dirtymap.Size()) } func NewCounters() *counters { @@ -106,21 +104,19 @@ func aggressivelyEvict(ms runtime.MemStats) { defer stopTheWorld.Unlock() megs := ms.Alloc / MIB - glog.WithField("allocated_MB", megs). - Info("Memory usage over threshold. STOPPED THE WORLD!") + log.Printf("Memory usage over threshold. STW. Allocated MB: %v\n", megs) - glog.Info("Calling merge on all lists.") + log.Println("Calling merge on all lists.") MergeLists(100 * runtime.GOMAXPROCS(-1)) - glog.Info("Merged lists. Calling GC.") + log.Println("Merged lists. Calling GC.") runtime.GC() // Call GC to do some cleanup. - glog.Info("Trying to free OS memory") + log.Println("Trying to free OS memory") debug.FreeOSMemory() runtime.ReadMemStats(&ms) megs = ms.Alloc / MIB - glog.WithField("allocated_MB", megs). - Info("Memory Usage after calling GC.") + log.Printf("Memory usage after calling GC. Allocated MB: %v", megs) } func gentlyMerge(mr *mergeRoutines) { @@ -193,7 +189,7 @@ func checkMemoryUsage() { // With a value of 18 and duration of 5 seconds, some goroutines are // taking over 1.5 mins to finish. if mr.Count() > 18 { - glog.Info("Skipping gentle merging.") + log.Println("Skipping gentle merging.") continue } mr.Add(1) @@ -240,8 +236,8 @@ func mergeAndUpdate(l *List, c *counters) { if l == nil { return } - if merged, err := l.MergeIfDirty(); err != nil { - glog.WithError(err).Error("While commiting dirty list.") + if merged, err := l.MergeIfDirty(context.Background()); err != nil { + log.Printf("Error while commiting dirty list: %v\n", err) } else if merged { atomic.AddUint64(&c.merged, 1) } else { diff --git a/posting/lmap.go b/posting/lmap.go deleted file mode 100644 index ea59178961c9e5e2f9a59e00462e93a7b1d4e70b..0000000000000000000000000000000000000000 --- a/posting/lmap.go +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2015 DGraph Labs, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package posting - -import ( - "math/rand" - "sync" - "sync/atomic" - "time" - - "github.com/Sirupsen/logrus" -) - -const NBUCKETS = 32 - -type latency struct { - n1 uint64 - u1 uint64 - u10 uint64 - u100 uint64 - m1 uint64 - s1 uint64 -} - -type bucket struct { - sync.RWMutex - m map[uint64]*List - lat *latency -} - -func (l *latency) update(s time.Time) { - e := time.Now().Sub(s) - micros := e.Nanoseconds() / 1000 - if micros > 1000000 { - atomic.AddUint64(&l.s1, 1) - } else if micros > 1000 { - atomic.AddUint64(&l.m1, 1) - } else if micros > 100 { - atomic.AddUint64(&l.u100, 1) - } else if micros > 10 { - atomic.AddUint64(&l.u10, 1) - } else if micros > 1 { - atomic.AddUint64(&l.u1, 1) - } else { - atomic.AddUint64(&l.n1, 1) - } -} - -func (l *latency) log() { - for _ = range time.Tick(5 * time.Second) { - glog.WithFields(logrus.Fields{ - "n1": atomic.LoadUint64(&l.n1), - "u1": atomic.LoadUint64(&l.u1), - "u10": atomic.LoadUint64(&l.u10), - "u100": atomic.LoadUint64(&l.u100), - "m1": atomic.LoadUint64(&l.m1), - "s1": atomic.LoadUint64(&l.s1), - }).Info("Lmap latency") - } -} - -func (b *bucket) get(key uint64) (*List, bool) { - if b.lat != nil { - n := time.Now() - defer b.lat.update(n) - } - - b.RLock() - if l, ok := b.m[key]; ok { - b.RUnlock() - return l, false - } - b.RUnlock() - - b.Lock() - defer b.Unlock() - if l, ok := b.m[key]; ok { - return l, false - } - - l := NewList() - b.m[key] = l - return l, true -} - -type Map struct { - buckets []*bucket -} - -func NewMap(withLog bool) *Map { - var lat *latency - if withLog { - lat = new(latency) - go lat.log() - } else { - lat = nil - } - - m := new(Map) - m.buckets = make([]*bucket, NBUCKETS) - for i := 0; i < NBUCKETS; i++ { - m.buckets[i] = new(bucket) - m.buckets[i].lat = lat - m.buckets[i].m = make(map[uint64]*List) - } - return m -} - -func (m *Map) Get(key uint64) (*List, bool) { - bi := key % NBUCKETS - return m.buckets[bi].get(key) -} - -func (m *Map) StreamUntilCap(ch chan uint64) { - bi := rand.Intn(NBUCKETS) - b := m.buckets[bi] - b.RLock() - defer b.RUnlock() - for u := range b.m { - if len(ch) >= cap(ch) { - break - } - ch <- u - } -} - -func (m *Map) StreamAllKeys(ch chan uint64) { - for i := 0; i < len(m.buckets); i++ { - b := m.buckets[i] - b.RLock() - for u := range b.m { - ch <- u - } - b.RUnlock() - } -} diff --git a/query/query.go b/query/query.go index 58d0db76186bc0d113c229525c4d1e32e00c3ed2..f1b0d97078cca9b3cfd77f892feaae01fa6b642d 100644 --- a/query/query.go +++ b/query/query.go @@ -21,10 +21,11 @@ import ( "container/heap" "encoding/json" "fmt" - "reflect" + "log" "time" - "github.com/Sirupsen/logrus" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/query/graph" "github.com/dgraph-io/dgraph/task" @@ -86,8 +87,6 @@ import ( * Return errors, if any. */ -var glog = x.Log("query") - type Latency struct { Start time.Time `json:"-"` Parsing time.Duration `json:"query_parsing"` @@ -131,8 +130,6 @@ func mergeInterfaces(i1 interface{}, i2 interface{}) interface{} { } break } - glog.Debugf("Got type: %v %v", reflect.TypeOf(i1), reflect.TypeOf(i2)) - glog.Debugf("Got values: %v %v", i1, i2) return []interface{}{i1, i2} } @@ -149,7 +146,6 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { for _, child := range g.Children { m, err := postTraverse(child) if err != nil { - x.Err(glog, err).Error("Error while traversal") return result, err } // Merge results from all children, one by one. @@ -172,11 +168,11 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { r.Init(g.Result, ro) if q.UidsLength() != r.UidmatrixLength() { - glog.Fatalf("Result uidmatrixlength: %v. Query uidslength: %v", + log.Fatalf("Result uidmatrixlength: %v. Query uidslength: %v", r.UidmatrixLength(), q.UidsLength()) } if q.UidsLength() != r.ValuesLength() { - glog.Fatalf("Result valuelength: %v. Query uidslength: %v", + log.Fatalf("Result valuelength: %v. Query uidslength: %v", r.ValuesLength(), q.UidsLength()) } @@ -229,17 +225,12 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { } if pval, present := result[q.Uids(i)]; present { - glog.WithField("prev", pval). - WithField("_uid_", q.Uids(i)). - WithField("new", val). - Fatal("Previous value detected. A uid -> list of uids / value. Not both.") + log.Fatalf("prev: %v _uid_: %v new: %v"+ + " Previous value detected. A uid -> list of uids / value. Not both", + pval, q.Uids(i), val) } m := make(map[string]interface{}) m["_uid_"] = fmt.Sprintf("%#x", q.Uids(i)) - glog.WithFields(logrus.Fields{ - "_uid_": q.Uids(i), - "val": val, - }).Debug("Got value") m[g.Attr] = string(val) result[q.Uids(i)] = m } @@ -249,25 +240,22 @@ func postTraverse(g *SubGraph) (result map[uint64]interface{}, rerr error) { func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) { r, err := postTraverse(g) if err != nil { - x.Err(glog, err).Error("While doing traversal") return js, err } l.Json = time.Since(l.Start) - l.Parsing - l.Processing - if len(r) == 1 { - for _, ival := range r { - var m map[string]interface{} - if ival != nil { - m = ival.(map[string]interface{}) - } - m["server_latency"] = l.ToMap() - return json.Marshal(m) - } - } else { - glog.Fatal("We don't currently support more than 1 uid at root.") + if len(r) != 1 { + log.Fatal("We don't currently support more than 1 uid at root.") } - glog.Fatal("Shouldn't reach here.") - return json.Marshal(r) + ival := r[0] + var m map[string]interface{} + if ival != nil { + m = ival.(map[string]interface{}) + } else { + m = make(map[string]interface{}) + } + m["server_latency"] = l.ToMap() + return json.Marshal(m) } // This function performs a binary search on the uids slice and returns the @@ -305,11 +293,7 @@ func (g *SubGraph) preTraverse(uid uint64, dst *graph.Node) error { idx := indexOf(uid, q) if idx == -1 { - glog.WithFields(logrus.Fields{ - "uid": uid, - "attribute": g.Attr, - "childAttribute": pc.Attr, - }).Fatal("Attribute with uid not found in child Query uids") + log.Fatal("Attribute with uid not found in child Query uids.") return fmt.Errorf("Attribute with uid not found") } @@ -328,7 +312,7 @@ func (g *SubGraph) preTraverse(uid uint64, dst *graph.Node) error { uc.Attribute = pc.Attr uc.Uid = uid if rerr := pc.preTraverse(uid, uc); rerr != nil { - x.Err(glog, rerr).Error("Error while traversal") + log.Printf("Error while traversal: %v", rerr) return rerr } @@ -371,7 +355,6 @@ func (g *SubGraph) ToProtocolBuffer(l *Latency) (n *graph.Node, rerr error) { n.Uid = ul.Uids(0) if rerr = g.preTraverse(n.Uid, n); rerr != nil { - x.Err(glog, rerr).Error("Error while traversal") return n, rerr } @@ -393,8 +376,8 @@ func treeCopy(gq *gql.GraphQuery, sg *SubGraph) { } } -func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { - sg, err := newGraph(gq.UID, gq.XID) +func ToSubGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) { + sg, err := newGraph(ctx, gq.UID, gq.XID) if err != nil { return nil, err } @@ -402,24 +385,24 @@ func ToSubGraph(gq *gql.GraphQuery) (*SubGraph, error) { return sg, nil } -func newGraph(euid uint64, exid string) (*SubGraph, error) { +func newGraph(ctx context.Context, euid uint64, exid string) (*SubGraph, error) { // This would set the Result field in SubGraph, // and populate the children for attributes. if len(exid) > 0 { xidToUid := make(map[string]uint64) xidToUid[exid] = 0 - if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil { - glog.WithError(err).Error("While getting uids over network") + if err := worker.GetOrAssignUidsOverNetwork(ctx, &xidToUid); err != nil { + x.Trace(ctx, "Error while getting uids over network: %v", err) return nil, err } euid = xidToUid[exid] - glog.WithField("xid", exid).WithField("uid", euid).Debug("GetOrAssign") + x.Trace(ctx, "Xid: %v Uid: %v", exid, euid) } if euid == 0 { err := fmt.Errorf("Query internal id is zero") - x.Err(glog, err).Error("Invalid query") + x.Trace(ctx, "Invalid query: %v", err) return nil, err } @@ -535,14 +518,12 @@ func sortedUniqueUids(r *task.Result) (sorted []uint64, rerr error) { return sorted, nil } -func ProcessGraph(sg *SubGraph, rch chan error, td time.Duration) { - timeout := time.Now().Add(td) - +func ProcessGraph(ctx context.Context, sg *SubGraph, rch chan error) { var err error if len(sg.Query) > 0 && sg.Attr != "_root_" { - sg.Result, err = worker.ProcessTaskOverNetwork(sg.Query) + sg.Result, err = worker.ProcessTaskOverNetwork(ctx, sg.Query) if err != nil { - x.Err(glog, err).Error("While processing task.") + x.Trace(ctx, "Error while processing task: %v", err) rch <- err return } @@ -555,37 +536,24 @@ func ProcessGraph(sg *SubGraph, rch chan error, td time.Duration) { if r.ValuesLength() > 0 { var v task.Value if r.Values(&v, 0) { - glog.WithField("attr", sg.Attr).WithField("val", string(v.ValBytes())). - Info("Sample value") + x.Trace(ctx, "Sample value for attr: %v Val: %v", sg.Attr, string(v.ValBytes())) } } sorted, err := sortedUniqueUids(r) if err != nil { - x.Err(glog, err).Error("While processing task.") + x.Trace(ctx, "Error while processing task: %v", err) rch <- err return } if len(sorted) == 0 { // Looks like we're done here. - if len(sg.Children) > 0 { - glog.Debugf("Have some children but no results. Life got cut short early."+ - "Current attribute: %q", sg.Attr) - } else { - glog.Debugf("No more things to process for Attr: %v", sg.Attr) - } + x.Trace(ctx, "Zero uids. Num attr children: %v", len(sg.Children)) rch <- nil return } - timeleft := timeout.Sub(time.Now()) - if timeleft < 0 { - glog.WithField("attr", sg.Attr).Error("Query timeout before children") - rch <- fmt.Errorf("Query timeout before children") - return - } - // Let's execute it in a tree fashion. Each SubGraph would break off // as many goroutines as it's children; which would then recursively // do the same thing. @@ -594,28 +562,22 @@ func ProcessGraph(sg *SubGraph, rch chan error, td time.Duration) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] child.Query = createTaskQuery(child, sorted) - go ProcessGraph(child, childchan, timeleft) + go ProcessGraph(ctx, child, childchan) } - tchan := time.After(timeleft) // Now get all the results back. for i := 0; i < len(sg.Children); i++ { select { case err = <-childchan: - glog.WithFields(logrus.Fields{ - "num_children": len(sg.Children), - "index": i, - "attr": sg.Children[i].Attr, - "err": err, - }).Debug("Reply from child") + x.Trace(ctx, "Reply from child. Index: %v Attr: %v", i, sg.Children[i].Attr) if err != nil { - x.Err(glog, err).Error("While processing child task.") + x.Trace(ctx, "Error while processing child task: %v", err) rch <- err return } - case <-tchan: - glog.WithField("attr", sg.Attr).Error("Query timeout after children") - rch <- fmt.Errorf("Query timeout after children") + case <-ctx.Done(): + x.Trace(ctx, "Context done before full execution: %v", ctx.Err()) + rch <- ctx.Err() return } } diff --git a/query/query_test.go b/query/query_test.go index 8276c52ba36b69048917a89d10471c238e086457..2e3474d1abf16a469800770be97d53652170788c 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" @@ -44,7 +46,7 @@ func setErr(err *error, nerr error) { } func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) { - if err := l.AddMutation(edge, posting.Set); err != nil { + if err := l.AddMutation(context.Background(), edge, posting.Set); err != nil { t.Error(err) } } @@ -97,7 +99,8 @@ func TestNewGraph(t *testing.T) { ps := new(store.Store) ps.Init(dir) - sg, err := newGraph(ex, "") + ctx := context.Background() + sg, err := newGraph(ctx, ex, "") if err != nil { t.Error(err) } @@ -210,13 +213,14 @@ func TestProcessGraph(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + ctx := context.Background() + sg, err := ToSubGraph(ctx, gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, time.Minute) + go ProcessGraph(ctx, sg, ch) err = <-ch if err != nil { t.Error(err) @@ -302,13 +306,14 @@ func TestToJson(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + ctx := context.Background() + sg, err := ToSubGraph(ctx, gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, time.Minute) + go ProcessGraph(ctx, sg, ch) err = <-ch if err != nil { t.Error(err) @@ -355,13 +360,14 @@ func TestToPB(t *testing.T) { if err != nil { t.Error(err) } - sg, err := ToSubGraph(gq) + ctx := context.Background() + sg, err := ToSubGraph(ctx, gq) if err != nil { t.Error(err) } ch := make(chan error) - go ProcessGraph(sg, ch, time.Minute) + go ProcessGraph(ctx, sg, ch) err = <-ch if err != nil { t.Error(err) diff --git a/rdf/parse.go b/rdf/parse.go index 402025a6227c0877f262064b13d52e5590052191..32ab1d8132c619531bfac4c03d6b46ace62c5847 100644 --- a/rdf/parse.go +++ b/rdf/parse.go @@ -18,6 +18,7 @@ package rdf import ( "fmt" + "log" "strconv" "strings" "time" @@ -138,7 +139,7 @@ func Parse(line string) (rnq NQuad, rerr error) { if item.Typ == itemObjectType { // TODO: Strictly parse common types like integers, floats etc. if len(oval) == 0 { - glog.Fatalf( + log.Fatalf( "itemObject should be emitted before itemObjectType. Input: [%s]", line) } diff --git a/rdf/state.go b/rdf/state.go index 737d901871496a906004f2244ef9feecdd3a6515..44e58557a446352cf198bb1b347c0c77b5d67bf6 100644 --- a/rdf/state.go +++ b/rdf/state.go @@ -23,7 +23,6 @@ import ( "strings" "github.com/dgraph-io/dgraph/lex" - "github.com/dgraph-io/dgraph/x" ) const ( @@ -45,8 +44,6 @@ const ( AT_LABEL ) -var glog = x.Log("rdf") - func run(l *lex.Lexer) { for state := lexText; state != nil; { state = state(l) diff --git a/store/store.go b/store/store.go index 4b06b618b0d1d02e34bc870159e6d5305c156723..b046b12070faa37e9f154b1222d7b80fa818f64e 100644 --- a/store/store.go +++ b/store/store.go @@ -50,8 +50,7 @@ func (s *Store) Init(filepath string) { var err error s.db, err = rocksdb.OpenDb(s.opt, filepath) if err != nil { - x.Err(log, err).WithField("filepath", filepath). - Fatal("While opening store") + log.Fatalf("Error while opening filepath: %v", filepath) return } } @@ -62,8 +61,7 @@ func (s *Store) InitReadOnly(filepath string) { s.db, err = rocksdb.OpenDbForReadOnly(s.opt, filepath, false) // TODO(Ashwin):When will it be true if err != nil { - x.Err(log, err).WithField("filepath", filepath). - Fatal("While opening store") + log.Fatalf("Error while opening filepath: %v", filepath) return } } diff --git a/uid/assigner.go b/uid/assigner.go index 8bcb4cc90f8bfedf405613e48de061824a2a6c5e..0c0ebd8cf0da83a7559770b195f1466ee0e45e59 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -19,10 +19,13 @@ package uid import ( "errors" "fmt" + "log" "math" "sync" "time" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting/types" "github.com/dgraph-io/dgraph/store" @@ -30,7 +33,6 @@ import ( "github.com/dgryski/go-farm" ) -var glog = x.Log("uid") var lmgr *lockManager var uidStore *store.Store var eidPool = sync.Pool{ @@ -89,7 +91,6 @@ func (lm *lockManager) clean() { lm.Unlock() // A minute is enough to avoid the race condition issue for // uid allocation to an xid. - glog.WithField("count", count).Info("Deleted old locks.") } } @@ -116,9 +117,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, uid1 := farm.Fingerprint64([]byte(txid)) // Generate from hash. uid = (uid1 % mod) + minIdx - glog.WithField("txid", txid).WithField("uid", uid).Debug("Generated") if uid == math.MaxUint64 { - glog.Debug("Hit uint64max while generating fingerprint. Ignoring...") continue } @@ -130,7 +129,6 @@ func allocateUniqueUid(xid string, instanceIdx uint64, // Something already present here. var p types.Posting pl.Get(&p, 0) - glog.Debug("Found existing xid: [%q]. Continuing...", string(p.ValueBytes())) continue } @@ -140,10 +138,7 @@ func allocateUniqueUid(xid string, instanceIdx uint64, Source: "_assigner_", Timestamp: time.Now(), } - rerr = pl.AddMutation(t, posting.Set) - if rerr != nil { - glog.WithError(rerr).Error("While adding mutation") - } + rerr = pl.AddMutation(context.Background(), t, posting.Set) return uid, rerr } return 0, errors.New("Some unhandled route lead me here." + @@ -159,7 +154,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, defer entry.Unlock() if pl.Length() > 1 { - glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + log.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) } else if pl.Length() > 0 { var p types.Posting @@ -180,7 +175,7 @@ func assignNew(pl *posting.List, xid string, instanceIdx uint64, Source: "_assigner_", Timestamp: time.Now(), } - rerr := pl.AddMutation(t, posting.Set) + rerr := pl.AddMutation(context.Background(), t, posting.Set) return uid, rerr } @@ -195,7 +190,7 @@ func Get(xid string) (uid uint64, rerr error) { return 0, fmt.Errorf("xid: %v doesn't have any uid assigned.", xid) } if pl.Length() > 1 { - glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + log.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) } var p types.Posting if ok := pl.Get(&p, 0); !ok { @@ -213,7 +208,7 @@ func GetOrAssign(xid string, instanceIdx uint64, return assignNew(pl, xid, instanceIdx, numInstances) } else if pl.Length() > 1 { - glog.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) + log.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid) } else { // We found one posting. @@ -235,18 +230,17 @@ func ExternalId(uid uint64) (xid string, rerr error) { } if pl.Length() > 1 { - glog.WithField("uid", uid).Fatal("This shouldn't be happening.") + log.Fatalf("This shouldn't be happening. Uid: %v", uid) return "", errors.New("Multiple external ids for this uid.") } var p types.Posting if ok := pl.Get(&p, 0); !ok { - glog.WithField("uid", uid).Error("While retrieving posting") return "", errors.New("While retrieving posting") } if p.Uid() != math.MaxUint64 { - glog.WithField("uid", uid).Fatal("Value uid must be MaxUint64.") + log.Fatalf("Value uid must be MaxUint64. Uid: %v", p.Uid()) } return string(p.ValueBytes()), rerr } diff --git a/worker/assign.go b/worker/assign.go index 45ed5b5fb93afa06d8072f3c179cd49288e64970..70bcaa7921ebdb29ab3ab5d646474d9b469c07f9 100644 --- a/worker/assign.go +++ b/worker/assign.go @@ -18,12 +18,14 @@ package worker import ( "fmt" + "log" "sync" "golang.org/x/net/context" "github.com/dgraph-io/dgraph/task" "github.com/dgraph-io/dgraph/uid" + "github.com/dgraph-io/dgraph/x" "github.com/google/flatbuffers/go" ) @@ -48,7 +50,7 @@ func createXidListBuffer(xids map[string]uint64) []byte { return b.Bytes[b.Head():] } -func getOrAssignUids( +func getOrAssignUids(ctx context.Context, xidList *task.XidList) (uidList []byte, rerr error) { if xidList.XidsLength() == 0 { @@ -75,7 +77,7 @@ func getOrAssignUids( wg.Wait() close(che) for err := range che { - glog.WithError(err).Error("Encountered errors while getOrAssignUids") + x.Trace(ctx, "Error while getOrAssignUids: %v", err) return uidList, err } @@ -93,7 +95,7 @@ func getOrAssignUids( return b.Bytes[b.Head():], nil } -func GetOrAssignUidsOverNetwork(xidToUid *map[string]uint64) (rerr error) { +func GetOrAssignUidsOverNetwork(ctx context.Context, xidToUid *map[string]uint64) (rerr error) { query := new(Payload) query.Data = createXidListBuffer(*xidToUid) uo := flatbuffers.GetUOffsetT(query.Data) @@ -106,7 +108,7 @@ func GetOrAssignUidsOverNetwork(xidToUid *map[string]uint64) (rerr error) { xidList := new(task.XidList) xidList.Init(query.Data, uo) - reply.Data, rerr = getOrAssignUids(xidList) + reply.Data, rerr = getOrAssignUids(ctx, xidList) if rerr != nil { return rerr } @@ -115,15 +117,14 @@ func GetOrAssignUidsOverNetwork(xidToUid *map[string]uint64) (rerr error) { pool := pools[0] conn, err := pool.Get() if err != nil { - glog.WithError(err).Error("Unable to retrieve connection.") + x.Trace(ctx, "Error while retrieving connection: %v", err) return err } c := NewWorkerClient(conn) reply, rerr = c.GetOrAssign(context.Background(), query) if rerr != nil { - glog.WithField("method", "GetOrAssign").WithError(rerr). - Error("While getting uids") + x.Trace(ctx, "Error while getting uids: %v", rerr) return rerr } } @@ -133,9 +134,7 @@ func GetOrAssignUidsOverNetwork(xidToUid *map[string]uint64) (rerr error) { uidList.Init(reply.Data, uo) if xidList.XidsLength() != uidList.UidsLength() { - glog.WithField("num_xids", xidList.XidsLength()). - WithField("num_uids", uidList.UidsLength()). - Fatal("Num xids don't match num uids") + log.Fatal("Xids: %d != Uids: %d", xidList.XidsLength(), uidList.UidsLength()) } for i := 0; i < xidList.XidsLength(); i++ { xid := string(xidList.Xids(i)) diff --git a/worker/conn.go b/worker/conn.go index 482fecb9c52b469644b941e71880dfb5fe23719d..cbc3fee3d45b4bedd55c0c6f1dae9147cd9eaa1e 100644 --- a/worker/conn.go +++ b/worker/conn.go @@ -40,7 +40,7 @@ func NewPool(addr string, maxCap int) *Pool { p.conns = make(chan *grpc.ClientConn, maxCap) conn, err := p.dialNew() if err != nil { - glog.Fatal(err) + log.Fatal(err) return nil } p.conns <- conn diff --git a/worker/mutation.go b/worker/mutation.go index 78c2a6a5cad9a2e41c3ec06faa92dc5294cb5099..a9b2993f3d0ea85c19e6793f601c4b910ab36fc6 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -20,7 +20,7 @@ import ( "bytes" "encoding/gob" "fmt" - "sync" + "log" "golang.org/x/net/context" @@ -47,38 +47,32 @@ func (m *Mutations) Decode(data []byte) error { return dec.Decode(m) } -func mutate(m *Mutations, left *Mutations) error { +func mutate(ctx context.Context, m *Mutations, left *Mutations) error { // For now, assume it's all only Set instructions. for _, edge := range m.Set { if farm.Fingerprint64( []byte(edge.Attribute))%numInstances != instanceIdx { - glog.WithField("instanceIdx", instanceIdx). - WithField("attr", edge.Attribute). - Info("Predicate fingerprint doesn't match instanceIdx") return fmt.Errorf("predicate fingerprint doesn't match this instance.") } - glog.WithField("edge", edge).Debug("mutate") key := posting.Key(edge.Entity, edge.Attribute) plist := posting.GetOrCreate(key, dataStore) - if err := plist.AddMutation(edge, posting.Set); err != nil { + if err := plist.AddMutation(ctx, edge, posting.Set); err != nil { left.Set = append(left.Set, edge) - glog.WithError(err).WithField("edge", edge). - Error("While adding mutation.") + log.Printf("Error while adding mutation: %v %v", edge, err) continue } } return nil } -func runMutate(idx int, m *Mutations, wg *sync.WaitGroup, +func runMutate(ctx context.Context, idx int, m *Mutations, replies chan *Payload, che chan error) { - defer wg.Done() left := new(Mutations) if idx == int(instanceIdx) { - che <- mutate(m, left) + che <- mutate(ctx, m, left) return } @@ -99,18 +93,16 @@ func runMutate(idx int, m *Mutations, wg *sync.WaitGroup, defer pool.Put(conn) c := NewWorkerClient(conn) - reply, err := c.Mutate(context.Background(), query) + reply, err := c.Mutate(ctx, query) if err != nil { - glog.WithField("call", "Worker.Mutate"). - WithField("addr", pool.Addr). - WithError(err).Error("While calling mutate") che <- err return } replies <- reply + che <- nil } -func MutateOverNetwork( +func MutateOverNetwork(ctx context.Context, edges []x.DirectedEdge) (left []x.DirectedEdge, rerr error) { mutationArray := make([]*Mutations, numInstances) @@ -124,26 +116,32 @@ func MutateOverNetwork( mu.Set = append(mu.Set, edge) } - var wg sync.WaitGroup replies := make(chan *Payload, numInstances) errors := make(chan error, numInstances) + count := 0 for idx, mu := range mutationArray { if mu == nil || len(mu.Set) == 0 { continue } - wg.Add(1) - go runMutate(idx, mu, &wg, replies, errors) + count += 1 + go runMutate(ctx, idx, mu, replies, errors) } - wg.Wait() - close(replies) - close(errors) - for err := range errors { - if err != nil { - glog.WithError(err).Error("While running all mutations") - return left, err + // Wait for all the goroutines to reply back. + for i := 0; i < count; i++ { + select { + case err := <-errors: + if err != nil { + x.Trace(ctx, "Error while running all mutations: %v", err) + return left, err + } + case <-ctx.Done(): + return left, ctx.Err() } } + close(replies) + close(errors) + for reply := range replies { l := new(Mutations) if err := l.Decode(reply.Data); err != nil { diff --git a/worker/task.go b/worker/task.go index 039e00539c762f8027556e1a459003399d83a361..f29519ba57dc8dea412a4afaf767430fdfd972c0 100644 --- a/worker/task.go +++ b/worker/task.go @@ -25,7 +25,7 @@ import ( "golang.org/x/net/context" ) -func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { +func ProcessTaskOverNetwork(ctx context.Context, qu []byte) (result []byte, rerr error) { uo := flatbuffers.GetUOffsetT(qu) q := new(task.Query) q.Init(qu, uo) @@ -40,9 +40,8 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { } else { runHere = (instanceIdx == idx) } - glog.WithField("runHere", runHere).WithField("attr", attr). - WithField("instanceIdx", instanceIdx). - WithField("numInstances", numInstances).Info("ProcessTaskOverNetwork") + x.Trace(ctx, "runHere: %v attr: %v instanceIdx: %v numInstances: %v", + runHere, attr, instanceIdx, numInstances) if runHere { // No need for a network call, as this should be run from within @@ -63,12 +62,12 @@ func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) { c := NewWorkerClient(conn) reply, err := c.ServeTask(context.Background(), query) if err != nil { - glog.WithField("call", "Worker.ServeTask").Error(err) + x.Trace(ctx, "Error while calling Worker.ServeTask: %v", err) return []byte(""), err } - glog.WithField("reply_len", len(reply.Data)).WithField("addr", addr). - WithField("attr", attr).Info("Got reply from server") + x.Trace(ctx, "Reply from server. length: %v Addr: %v Attr: %v", + len(reply.Data), addr, attr) return reply.Data, nil } diff --git a/worker/worker.go b/worker/worker.go index 549fa5c74864a8e600cd5ba1d0e2ee8860436461..bca4b81d0306ca410ec73b1816b037ae2aa9b1af 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -18,6 +18,7 @@ package worker import ( "flag" + "log" "net" "google.golang.org/grpc" @@ -34,7 +35,6 @@ import ( var workerPort = flag.String("workerport", ":12345", "Port used by worker for internal communication.") -var glog = x.Log("worker") var dataStore, uidStore *store.Store var pools []*Pool var numInstances, instanceIdx uint64 @@ -82,14 +82,12 @@ func (w *worker) GetOrAssign(ctx context.Context, query *Payload) (*Payload, err xids.Init(query.Data, uo) if instanceIdx != 0 { - glog.WithField("instanceIdx", instanceIdx). - WithField("GetOrAssign", true). - Fatal("We shouldn't be receiving this request.") + log.Fatalf("instanceIdx: %v. GetOrAssign. We shouldn't be getting this req", instanceIdx) } reply := new(Payload) var rerr error - reply.Data, rerr = getOrAssignUids(xids) + reply.Data, rerr = getOrAssignUids(ctx, xids) return reply, rerr } @@ -100,7 +98,7 @@ func (w *worker) Mutate(ctx context.Context, query *Payload) (*Payload, error) { } left := new(Mutations) - if err := mutate(m, left); err != nil { + if err := mutate(ctx, m, left); err != nil { return nil, err } @@ -115,8 +113,8 @@ func (w *worker) ServeTask(ctx context.Context, query *Payload) (*Payload, error q := new(task.Query) q.Init(query.Data, uo) attr := string(q.Attr()) - glog.WithField("attr", attr).WithField("num_uids", q.UidsLength()). - WithField("instanceIdx", instanceIdx).Info("ServeTask") + x.Trace(ctx, "Attribute: %v NumUids: %v InstanceIdx: %v ServeTask", + attr, q.UidsLength(), instanceIdx) reply := new(Payload) var rerr error @@ -126,9 +124,7 @@ func (w *worker) ServeTask(ctx context.Context, query *Payload) (*Payload, error reply.Data, rerr = processTask(query.Data) } else { - glog.WithField("attribute", attr). - WithField("instanceIdx", instanceIdx). - Fatalf("Request sent to wrong server") + log.Fatalf("attr: %v instanceIdx: %v Request sent to wrong server.", attr, instanceIdx) } return reply, rerr } @@ -136,10 +132,10 @@ func (w *worker) ServeTask(ctx context.Context, query *Payload) (*Payload, error func runServer(port string) { ln, err := net.Listen("tcp", port) if err != nil { - glog.Fatalf("While running server: %v", err) + log.Fatalf("While running server: %v", err) return } - glog.WithField("address", ln.Addr()).Info("Worker listening") + log.Printf("Worker listening at address: %v", ln.Addr()) s := grpc.NewServer(grpc.CustomCodec(&PayloadCodec{})) RegisterWorkerServer(s, &worker{}) @@ -160,20 +156,18 @@ func Connect(workerList []string) { conn, err := pool.Get() if err != nil { - glog.WithError(err).Fatal("Unable to connect.") + log.Fatalf("Unable to connect: %v", err) } c := NewWorkerClient(conn) - reply, err := c.Hello(context.Background(), query) + _, err = c.Hello(context.Background(), query) if err != nil { - glog.WithError(err).Fatal("Unable to contact.") + log.Fatalf("Unable to connect: %v", err) } _ = pool.Put(conn) - glog.WithField("reply", string(reply.Data)).WithField("addr", addr). - Info("Got reply from server") pools = append(pools, pool) } - glog.Info("Server started. Clients connected.") + log.Print("Server started. Clients connected.") } diff --git a/worker/worker_test.go b/worker/worker_test.go index 763dfc89e19bcfbec7a9f2cbca6f733ad0703761..bd28c0490d25b8ebeb693e063563a533809721e0 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/dgraph-io/dgraph/commit" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/store" @@ -33,7 +35,7 @@ import ( ) func addEdge(t *testing.T, edge x.DirectedEdge, l *posting.List) { - if err := l.AddMutation(edge, posting.Set); err != nil { + if err := l.AddMutation(context.Background(), edge, posting.Set); err != nil { t.Error(err) } } diff --git a/x/x.go b/x/x.go index 0a428be789cda788d2c99a089f2b9e6f94879766..7ddc022913c0174c1e0019b4f2c3fca5ac414a8d 100644 --- a/x/x.go +++ b/x/x.go @@ -22,6 +22,9 @@ import ( "net/http" "time" + "golang.org/x/net/context" + "golang.org/x/net/trace" + "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/task" "github.com/google/flatbuffers/go" @@ -114,3 +117,11 @@ func UidlistOffset(b *flatbuffers.Builder, } var Nilbyte []byte + +func Trace(ctx context.Context, format string, args ...interface{}) { + tr, ok := trace.FromContext(ctx) + if !ok { + return + } + tr.LazyPrintf(format, args...) +}