Skip to content
Snippets Groups Projects
Unverified Commit 47319765 authored by Pawan Rawal's avatar Pawan Rawal Committed by GitHub
Browse files

Bunch of updates to logging and snapshot (#2235)

* Bunch of updates to logging.

* Update

* Try abort 20% of all pending transactions.

* Add some more logs.

* Update comment.

* Update logging.

* Fix
parent 1552f491
No related branches found
No related tags found
No related merge requests found
......@@ -33,10 +33,10 @@ import (
"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgo/y"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
"github.com/dgraph-io/dgo/y"
)
var (
......@@ -131,7 +131,7 @@ func handleError(err error) {
// Irrecoverable
if strings.Contains(errString, "x509") || grpc.Code(err) == codes.Internal {
x.Fatalf(errString)
} else if errString != y.ErrAborted.Error() {
} else if errString != y.ErrAborted.Error() && errString != y.ErrConflict.Error() {
x.Printf("Error while mutating %v\n", errString)
}
}
......
......@@ -348,6 +348,8 @@ func (n *node) applyProposal(e raftpb.Entry) (uint32, error) {
delete(originalGroup.Tablets, p.Tablet.Predicate)
} else {
if tablet.GroupId != p.Tablet.GroupId {
x.Printf("Tablet for attr: [%s], gid: [%d] is already being served by group: [%d]\n",
tablet.Predicate, p.Tablet.GroupId, tablet.GroupId)
return p.Id, errTabletAlreadyServed
}
// This update can come from tablet size.
......@@ -552,7 +554,7 @@ func (n *node) Run() {
} else if entry.Type == raftpb.EntryNormal {
pid, err := n.applyProposal(entry)
if err != nil {
if err != nil && err != errTabletAlreadyServed {
x.Printf("While applying proposal: %v\n", err)
}
n.props.Done(pid, err)
......
......@@ -18,6 +18,7 @@
package zero
import (
"fmt"
"sort"
"time"
......@@ -271,7 +272,7 @@ func (s *Server) movePredicateHelper(ctx context.Context, predicate string, srcG
DestGroupId: dstGroup,
}
if _, err := c.MovePredicate(ctx, in); err != nil {
return err
return fmt.Errorf("While caling MovePredicate: %+v\n", err)
}
// Propose that predicate is served by dstGroup in RW.
......
......@@ -457,6 +457,7 @@ func (s *Server) ShouldServe(
return tablet, err
}
tab = s.ServingTablet(tablet.Predicate)
x.AssertTrue(tab != nil)
return tab, nil
}
......
......@@ -102,16 +102,16 @@ func (t *transactions) MinTs() uint64 {
return minTs
}
// Returns startTs of all pending transactions started upto 1000 raft log
// entries after last snapshot.
func (t *transactions) TxnsSinceSnapshot() []uint64 {
func (t *transactions) TxnsSinceSnapshot(pending uint64) []uint64 {
lastSnapshotIdx := TxnMarks().DoneUntil()
var timestamps []uint64
t.Lock()
defer t.Unlock()
for _, txn := range t.m {
index := txn.startIdx()
if index-lastSnapshotIdx <= 1000 {
// We abort oldest 20% of the transactions.
var oldest float64 = 0.2 * float64(pending)
if index-lastSnapshotIdx <= uint64(oldest) {
timestamps = append(timestamps, txn.StartTs)
}
}
......
......@@ -115,7 +115,6 @@ func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
if err := txn.Set(w.snapshotKey(gid), data); err != nil {
return err
}
x.Printf("Writing snapshot to WAL, metadata: %+v, len(data): %d\n", s.Metadata, len(s.Data))
// Delete all entries before this snapshot to save disk space.
start := w.entryKey(gid, 0, 0)
......
......@@ -30,14 +30,14 @@ import (
"golang.org/x/net/trace"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgo/protos/api"
dy "github.com/dgraph-io/dgo/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
dy "github.com/dgraph-io/dgo/y"
)
type proposalCtx struct {
......@@ -539,14 +539,15 @@ func (n *node) Run() {
n.SaveSnapshot(rd.Snapshot)
}
if len(rd.CommittedEntries) > 0 {
lc := len(rd.CommittedEntries)
if lc > 0 {
if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Found %d committed entries", len(rd.CommittedEntries))
}
}
// Now schedule or apply committed entries.
for _, entry := range rd.CommittedEntries {
for idx, entry := range rd.CommittedEntries {
// Need applied watermarks for schema mutation also for read linearazibility
// Applied watermarks needs to be emitted as soon as possible sequentially.
// If we emit Mark{4, false} and Mark{4, true} before emitting Mark{3, false}
......@@ -563,6 +564,18 @@ func (n *node) Run() {
// Just queue up to be processed. Don't wait on them.
n.applyCh <- entry
}
// Move to debug log later.
// Sometimes after restart there are too many entries to replay, so log so that we
// know Run loop is replaying them.
if lc > 1e5 && idx%5000 == 0 {
x.Printf("In run loop applying committed entries, idx: [%v], pending: [%v]\n",
idx, lc-idx)
}
}
if lc > 1e5 {
x.Println("All committed entries sent to applyCh.")
}
if !leader {
......@@ -637,6 +650,18 @@ func (n *node) snapshotPeriodically(closer *y.Closer) {
}
}
func (n *node) abortOldTransactions(pending uint64) {
pl := groups().Leader(0)
if pl == nil {
return
}
zc := intern.NewZeroClient(pl.Get())
// Aborts if not already committed.
startTimestamps := posting.Txns().TxnsSinceSnapshot(pending)
req := &intern.TxnTimestamps{Ts: startTimestamps}
zc.TryAbort(context.Background(), req)
}
func (n *node) snapshot(skip uint64) {
water := posting.TxnMarks()
le := water.DoneUntil()
......@@ -646,12 +671,17 @@ func (n *node) snapshot(skip uint64) {
si := existing.Metadata.Index
if le <= si+skip {
// If difference grows above 100*skip we try to abort old transactions, so it shouldn't
// ideally go above 110*skip.
applied := n.Applied.DoneUntil()
if applied-le > 110*skip {
x.Printf("Couldn't take snapshot, txn watermark: [%d], applied watermark: [%d]\n",
le, applied)
// If difference grows above 1.5 * ForceAbortDifference we try to abort old transactions
if applied-le > 1.5*x.ForceAbortDifference && skip != 0 {
// Print warning if difference grows above 3 * x.ForceAbortDifference. Shouldn't ideally
// happen as we abort oldest 20% when it grows above 1.5 times.
if applied-le > 3*x.ForceAbortDifference {
x.Printf("Couldn't take snapshot, txn watermark: [%d], applied watermark: [%d]\n",
le, applied)
}
// Try aborting pending transactions here.
n.abortOldTransactions(applied - le)
}
return
}
......
......@@ -26,9 +26,9 @@ import (
"golang.org/x/net/context"
"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/schema"
......@@ -123,7 +123,6 @@ func StartRaftNodes(walStore *badger.ManagedDB, bindall bool) {
go gr.periodicMembershipUpdate() // Now set it to be run periodically.
go gr.cleanupTablets()
go gr.processOracleDeltaStream()
go gr.periodicAbortOldTxns()
gr.proposeInitialSchema()
}
......@@ -155,7 +154,7 @@ func (g *groupi) proposeInitialSchema() {
if err == nil {
break
}
fmt.Println("Error while proposing initial schema: ", err)
x.Println("Error while proposing initial schema: ", err)
time.Sleep(100 * time.Millisecond)
}
}
......@@ -166,6 +165,8 @@ func (g *groupi) groupId() uint32 {
return atomic.LoadUint32(&g.gid)
}
// calculateTabletSizes iterates through badger and gets a size of the space occupied by each
// predicate (including data and indexes). All data for a predicate forms a Tablet.
func (g *groupi) calculateTabletSizes() map[string]*intern.Tablet {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
......@@ -313,7 +314,6 @@ func (g *groupi) Tablet(key string) *intern.Tablet {
return tablet
}
x.Printf("Asking if I can serve tablet for: %v\n", key)
// We don't know about this tablet.
// Check with dgraphzero if we can serve it.
pl := g.AnyServer(0)
......@@ -331,6 +331,10 @@ func (g *groupi) Tablet(key string) *intern.Tablet {
g.Lock()
g.tablets[key] = out
g.Unlock()
if out.GroupId == groups().groupId() {
x.Printf("Serving tablet for: %v\n", key)
}
return out
}
......@@ -701,27 +705,3 @@ START:
time.Sleep(time.Second)
goto START
}
func (g *groupi) periodicAbortOldTxns() {
ticker := time.NewTicker(time.Second * 30)
for range ticker.C {
lastSnapshotIdx := posting.TxnMarks().DoneUntil()
water := groups().Node.Applied.DoneUntil()
// We try to abort transactions if difference between applied and last snapshottted is more
// than 10000. Ideally a snapshot should happen every 100 entries so this suggests that some
// transaction was left in a dangling state(not aborted or committed).
if water-lastSnapshotIdx < 10000 {
continue
}
pl := groups().Leader(0)
if pl == nil {
return
}
zc := intern.NewZeroClient(pl.Get())
// Aborts if not already committed.
startTimestamps := posting.Txns().TxnsSinceSnapshot()
req := &intern.TxnTimestamps{Ts: startTimestamps}
zc.TryAbort(context.Background(), req)
}
}
......@@ -91,7 +91,7 @@ func movePredicateHelper(ctx context.Context, predicate string, gid uint32) erro
c := intern.NewWorkerClient(pl.Get())
stream, err := c.ReceivePredicate(ctx)
if err != nil {
return err
return fmt.Errorf("While calling ReceivePredicate: %+v", err)
}
count := 0
......@@ -278,6 +278,8 @@ func (w *grpcWorker) MovePredicate(ctx context.Context,
return &emptyPayload, errNotLeader
}
x.Printf("Move predicate request for pred: [%v], src: [%v], dst: [%v]\n", in.Predicate,
in.SourceGroupId, in.DestGroupId)
// Ensures that all future mutations beyond this point are rejected.
if err := n.proposeAndWait(ctx, &intern.Proposal{State: in.State}); err != nil {
return &emptyPayload, err
......
......@@ -63,6 +63,9 @@ const (
PortInternal = 7080
PortHTTP = 8080
PortGrpc = 9080
// If the difference between AppliedUntil - TxnMarks.DoneUntil() is greater than this, we
// start aborting old transactions.
ForceAbortDifference = 5000
)
var (
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment