diff --git a/dgraph/cmd/server/admin.go b/dgraph/cmd/server/admin.go index 7943b95d95070e47abcd1fca2923d2cedac784ad..8a14f0d0c7727959084c89359f2c8f04217ab882 100644 --- a/dgraph/cmd/server/admin.go +++ b/dgraph/cmd/server/admin.go @@ -14,7 +14,6 @@ import ( "io/ioutil" "net" "net/http" - "os" "strconv" "github.com/dgraph-io/dgraph/edgraph" @@ -43,16 +42,11 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) { return } - shutdownServer() + close(shutdownCh) w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"code": "Success", "message": "Server is shutting down"}`)) } -func shutdownServer() { - x.Printf("Got clean exit request") - sdCh <- os.Interrupt -} - func exportHandler(w http.ResponseWriter, r *http.Request) { if !handlerInit(w, r) { return diff --git a/dgraph/cmd/server/run.go b/dgraph/cmd/server/run.go index f98278938cdd7de2d4b277b71d882788e5d94e32..c583e6672d1a3a860907897c5cd919033bd32775 100644 --- a/dgraph/cmd/server/run.go +++ b/dgraph/cmd/server/run.go @@ -200,8 +200,8 @@ func serveGRPC(l net.Listener, wg *sync.WaitGroup) { grpc.MaxConcurrentStreams(1000)) api.RegisterDgraphServer(s, &edgraph.Server{}) err := s.Serve(l) - log.Printf("gRpc server stopped : %s", err.Error()) - s.GracefulStop() + log.Printf("GRPC listener canceled: %s\n", err.Error()) + s.Stop() } func serveHTTP(l net.Listener, wg *sync.WaitGroup) { @@ -215,9 +215,7 @@ func serveHTTP(l net.Listener, wg *sync.WaitGroup) { log.Printf("Stopped taking more http(s) requests. Err: %s", err.Error()) ctx, cancel := context.WithTimeout(context.Background(), 630*time.Second) defer cancel() - err = srv.Shutdown(ctx) - log.Printf("All http(s) requests finished.") - if err != nil { + if err := srv.Shutdown(ctx); err != nil { log.Printf("Http(s) shutdown err: %v", err.Error()) } } @@ -265,7 +263,7 @@ func setupServer() { go func() { defer wg.Done() - <-sdCh + <-shutdownCh // Stops grpc/http servers; Already accepted connections are not closed. grpcListener.Close() httpListener.Close() @@ -276,7 +274,7 @@ func setupServer() { wg.Wait() } -var sdCh chan os.Signal +var shutdownCh chan struct{} func run() { config := edgraph.Options{ @@ -332,7 +330,9 @@ func run() { worker.Init(edgraph.State.Pstore) // setup shutdown os signal handler - sdCh = make(chan os.Signal, 3) + sdCh := make(chan os.Signal, 3) + shutdownCh = make(chan struct{}) + var numShutDownSig int defer func() { signal.Stop(sdCh) @@ -347,11 +347,14 @@ func run() { if !ok { return } + select { + case <-shutdownCh: + default: + close(shutdownCh) + } numShutDownSig++ x.Println("Caught Ctrl-C. Terminating now (this may take a few seconds)...") - if numShutDownSig == 1 { - shutdownServer() - } else if numShutDownSig == 3 { + if numShutDownSig == 3 { x.Println("Signaled thrice. Aborting!") os.Exit(1) } @@ -363,5 +366,7 @@ func run() { // Setup external communication. go worker.StartRaftNodes(edgraph.State.WALstore, bindall) setupServer() + log.Println("GRPC and HTTP stopped.") worker.BlockingStop() + log.Println("Server shutdown. Bye!") } diff --git a/worker/draft.go b/worker/draft.go index 2205a4a7467c090d962d064665d6c45bf82bdd85..3f582ead9fbf2ee69334aa566144fc25c63ec305 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -12,6 +12,7 @@ import ( "encoding/binary" "errors" "fmt" + "log" "math" "sync/atomic" "time" @@ -21,6 +22,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgo/protos/api" dy "github.com/dgraph-io/dgo/y" "github.com/dgraph-io/dgraph/conn" @@ -43,9 +45,8 @@ type node struct { // Fields which are never changed after init. applyCh chan raftpb.Entry ctx context.Context - stop chan struct{} // to send the stop signal to Run - done chan struct{} // to check whether node is running or not gid uint32 + closer *y.Closer streaming int32 // Used to avoid calculating snapshot @@ -87,9 +88,8 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * // processConfChange etc are not throttled so some extra delta, so that we don't // block tick when applyCh is full applyCh: make(chan raftpb.Entry, Config.NumPendingProposals+1000), - stop: make(chan struct{}), - done: make(chan struct{}), elog: trace.NewEventLog("Dgraph", "ApplyCh"), + closer: y.NewCloser(2), // Matches CLOSER:1 } return n } @@ -422,6 +422,7 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error { } func (n *node) processApplyCh() { + defer n.closer.Done() // CLOSER:1 for e := range n.applyCh { proposal := &intern.Proposal{} if err := proposal.Unmarshal(e.Data); err != nil { @@ -556,6 +557,8 @@ func (n *node) proposeSnapshot() error { } func (n *node) Run() { + defer n.closer.Done() // CLOSER: 1 + firstRun := true var leader bool // See also our configuration of HeartbeatTick and ElectionTick. @@ -565,8 +568,24 @@ func (n *node) Run() { slowTicker := time.NewTicker(time.Minute) defer slowTicker.Stop() + done := make(chan struct{}) + go func() { + <-n.closer.HasBeenClosed() + if peerId, has := groups().MyPeer(); has && n.AmLeader() { + n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId) + time.Sleep(time.Second) // Let transfer happen. + } + n.Raft().Stop() + close(n.applyCh) + close(done) + }() + for { select { + case <-done: + log.Println("Raft node done.") + return + case <-slowTicker.C: n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) if leader { @@ -697,44 +716,10 @@ func (n *node) Run() { tr.LazyPrintf("Advanced Raft. Done.") tr.Finish() } - - case <-n.stop: - if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId) - go func() { - select { - case <-n.ctx.Done(): // time out - if tr, ok := trace.FromContext(n.ctx); ok { - tr.LazyPrintf("context timed out while transfering leadership") - } - case <-time.After(1 * time.Second): - if tr, ok := trace.FromContext(n.ctx); ok { - tr.LazyPrintf("Timed out transfering leadership") - } - } - n.Raft().Stop() - close(n.done) - }() - } else { - n.Raft().Stop() - close(n.done) - } - case <-n.done: - return } } } -func (n *node) Stop() { - select { - case n.stop <- struct{}{}: - case <-n.done: - // already stopped. - return - } - <-n.done // wait for Run to respond. -} - var errConnection = errors.New("No connection exists") func (n *node) blockingAbort(req *intern.TxnTimestamps) error { diff --git a/worker/groups.go b/worker/groups.go index 334321b98f7e8d1651728e41aaea01c6ff51299b..36df0d7eab87ffea6f6c25cffc67c7213419e5e1 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -19,6 +19,7 @@ import ( "golang.org/x/net/trace" "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/posting" @@ -39,6 +40,7 @@ type groupi struct { tablets map[string]*intern.Tablet triggerCh chan struct{} // Used to trigger membership sync delPred chan struct{} // Ensures that predicate move doesn't happen when deletion is ongoing. + closer *y.Closer } var gr *groupi @@ -124,6 +126,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { gr.Node.InitAndStartNode() x.UpdateHealthStatus(true) + gr.closer = y.NewCloser(3) // Match CLOSER:1 in this file. go gr.periodicMembershipUpdate() // Now set it to be run periodically. go gr.cleanupTablets() go gr.processOracleDeltaStream() @@ -479,6 +482,8 @@ func (g *groupi) triggerMembershipSync() { } func (g *groupi) periodicMembershipUpdate() { + defer g.closer.Done() // CLOSER:1 + // Calculating tablet sizes is expensive, hence we do it only every 5 mins. ticker := time.NewTicker(time.Minute * 5) // Node might not be the leader when we are calculating size. @@ -486,6 +491,12 @@ func (g *groupi) periodicMembershipUpdate() { tablets := g.calculateTabletSizes() START: + select { + case <-g.closer.HasBeenClosed(): + return + default: + } + pl := g.AnyServer(0) // We should always have some connection to dgraphzero. if pl == nil { @@ -524,6 +535,13 @@ START: OUTER: for { select { + case <-g.closer.HasBeenClosed(): + stream.CloseSend() + break OUTER + case <-ctx.Done(): + stream.CloseSend() + break OUTER + case <-g.triggerCh: if !g.Node.AmLeader() { tablets = nil @@ -568,9 +586,6 @@ OUTER: stream.CloseSend() break OUTER } - case <-ctx.Done(): - stream.CloseSend() - break OUTER } } goto START @@ -602,8 +617,14 @@ func (g *groupi) hasReadOnlyTablets() bool { } func (g *groupi) cleanupTablets() { + defer g.closer.Done() // CLOSER:1 + ticker := time.NewTimer(time.Minute * 10) + defer ticker.Stop() + select { + case <-g.closer.HasBeenClosed(): + return case <-ticker.C: func() { opt := badger.DefaultIteratorOptions @@ -673,6 +694,8 @@ func (g *groupi) sendMembership(tablets map[string]*intern.Tablet, // processOracleDeltaStream is used to process oracle delta stream from Zero. // Zero sends information about aborted/committed transactions and maxPending. func (g *groupi) processOracleDeltaStream() { + defer g.closer.Done() // CLOSER:1 + blockingReceiveAndPropose := func() { elog := trace.NewEventLog("Dgraph", "ProcessOracleStream") defer elog.Finish() @@ -739,6 +762,8 @@ func (g *groupi) processOracleDeltaStream() { batch++ case <-ctx.Done(): return + case <-g.closer.HasBeenClosed(): + return } SLURP: @@ -776,7 +801,7 @@ func (g *groupi) processOracleDeltaStream() { defer ticker.Stop() for { select { - case <-g.Node.stop: + case <-g.closer.HasBeenClosed(): return case <-ticker.C: // Only the leader needs to connect to Zero and get transaction diff --git a/worker/worker.go b/worker/worker.go index 71ceb58ef4fb10d31cfbea2e275ed3329fd6d4e7..e35c2f9ff17ab6beb5ea1f70f3f2ad5b6d13405c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -15,9 +15,6 @@ import ( "math" "net" "sync" - "time" - - "golang.org/x/net/context" "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/conn" @@ -100,12 +97,15 @@ func StoreStats() string { // BlockingStop stops all the nodes, server between other workers and syncs all marks. func BlockingStop() { - // Sleep for 5 seconds to ensure that commit/abort is proposed. - time.Sleep(5 * time.Second) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - groups().Node.Stop() // blocking stop raft node. - workerServer.GracefulStop() // blocking stop server - groups().Node.applyAllMarks(ctx) + log.Println("Stopping group...") + groups().closer.SignalAndWait() + + log.Println("Stopping node...") + groups().Node.closer.SignalAndWait() + + log.Printf("Stopping worker server...") + workerServer.Stop() + + // TODO: What is this for? posting.StopLRUEviction() }