From d907674eddb325592e941815d8e91c21a38314ff Mon Sep 17 00:00:00 2001 From: Manish R Jain <manish@dgraph.io> Date: Tue, 22 May 2018 17:09:49 -0700 Subject: [PATCH] Common sense changes - Don't retry indefinitely for aborting transactions during predicate move. Stop after a dozen tries. TODO: Need to ensure that this doesn't leave the move into a weird state. Needs more testing. - Swap the order of waiting for server to catch up. First, wait for Zero to send all data until Txn.ReadTs, then get an agreement among replicas to ensure that we're caught up. - Some more printfs. - Delete `_dummy_` data key, instead of `dummy`, which has more chances of conflict with user data. - Clarify some pieces of code. --- dgraph/cmd/zero/tablet.go | 9 ++++++--- posting/lists.go | 2 +- posting/mvcc.go | 6 ++++-- worker/export.go | 5 ++++- worker/mutation.go | 2 ++ worker/predicate_move.go | 9 ++++++++- worker/task.go | 8 ++++---- 7 files changed, 29 insertions(+), 12 deletions(-) diff --git a/dgraph/cmd/zero/tablet.go b/dgraph/cmd/zero/tablet.go index f69658c7..642a6504 100644 --- a/dgraph/cmd/zero/tablet.go +++ b/dgraph/cmd/zero/tablet.go @@ -215,8 +215,10 @@ func (s *Server) moveTablet(ctx context.Context, predicate string, srcGroup uint dstGroup uint32) error { err := s.movePredicateHelper(ctx, predicate, srcGroup, dstGroup) if err == nil { + // If no error, then return immediately. return nil } + x.Printf("Got error during move: %v", err) if !s.Node.AmLeader() { s.runRecovery() return err @@ -231,8 +233,9 @@ func (s *Server) moveTablet(ctx context.Context, predicate string, srcGroup uint Space: stab.Space, Force: true, } - if err := s.Node.proposeAndWait(context.Background(), p); err != nil { - x.Printf("Error while reverting group %d to RW: %+v\n", srcGroup, err) + if nerr := s.Node.proposeAndWait(context.Background(), p); nerr != nil { + x.Printf("Error while reverting group %d to RW: %+v\n", srcGroup, nerr) + return nerr } return err } @@ -267,7 +270,7 @@ func (s *Server) movePredicateHelper(ctx context.Context, predicate string, srcG DestGroupId: dstGroup, } if _, err := c.MovePredicate(ctx, in); err != nil { - return fmt.Errorf("While caling MovePredicate: %+v\n", err) + return fmt.Errorf("While calling MovePredicate: %+v\n", err) } // Propose that predicate is served by dstGroup in RW. diff --git a/posting/lists.go b/posting/lists.go index 7717d58d..4dae0917 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -305,7 +305,7 @@ func CommitLists(commit func(key []byte) bool) { // use the Delete function. txn := pstore.NewTransactionAt(1, true) defer txn.Discard() - x.Check(txn.Delete(x.DataKey("dummy", 1))) + x.Check(txn.Delete(x.DataKey("_dummy_", 1))) // Nothing is being read, so there can't be an ErrConflict. This should go to disk. if err := txn.CommitAt(1, nil); err != nil { x.Printf("Commit unexpectedly failed with error: %v", err) diff --git a/posting/mvcc.go b/posting/mvcc.go index 4cd0bf9e..20be0453 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -413,9 +413,11 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return nil, err } l.minTs = item.Version() - it.Next() + // No need to do Next here. The outer loop can take care of skipping more versions of + // the same key. break - } else if item.UserMeta()&bitDeltaPosting > 0 { + } + if item.UserMeta()&bitDeltaPosting > 0 { var pl intern.PostingList x.Check(pl.Unmarshal(val)) for _, mpost := range pl.Postings { diff --git a/worker/export.go b/worker/export.go index cf52e470..f869e88a 100644 --- a/worker/export.go +++ b/worker/export.go @@ -267,6 +267,9 @@ func export(bdir string, readTs uint64) error { defer txn.Discard() iterOpts := badger.DefaultIteratorOptions iterOpts.PrefetchValues = false + // We don't ask for all the versions. So, this would only return the 1 version for each key, iff + // that version is valid. So, we don't need to check in the iteration loop if the item is + // deleted or expired. it := txn.NewIterator(iterOpts) defer it.Close() prefix := new(bytes.Buffer) @@ -297,7 +300,7 @@ func export(bdir string, readTs uint64) error { continue } - if pk.Attr == "_predicate_" || pk.Attr == "_dummy_" { + if pk.Attr == "_predicate_" { // Skip the UID mappings. it.Seek(pk.SkipPredicate()) continue diff --git a/worker/mutation.go b/worker/mutation.go index f0ddbbce..2beb25b6 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -608,6 +608,7 @@ func tryAbortTransactions(startTimestamps []uint64) { req := &intern.TxnTimestamps{Ts: startTimestamps} resp, err := zc.TryAbort(context.Background(), req) for err != nil { + x.Printf("Error while trying to abort txns: %v\n", err) resp, err = zc.TryAbort(context.Background(), req) } commitTimestamps := resp.Ts @@ -621,6 +622,7 @@ func tryAbortTransactions(startTimestamps []uint64) { // TODO - Make sure all other errors are transient, we don't want to be stuck in an infinite // loop. for err != nil && err != posting.ErrInvalidTxn { + x.Printf("Error while locally aborting txns: %v\n", err) // This will fail only due to badger error. _, err = commitOrAbort(context.Background(), tctx) } diff --git a/worker/predicate_move.go b/worker/predicate_move.go index be6371cf..c2a5105f 100644 --- a/worker/predicate_move.go +++ b/worker/predicate_move.go @@ -31,6 +31,7 @@ import ( var ( errEmptyPredicate = x.Errorf("Predicate not specified") errNotLeader = x.Errorf("Server is not leader of this group") + errUnableToAbort = x.Errorf("Unable to abort pending transactions") emptyPayload = api.Payload{} ) @@ -304,17 +305,23 @@ func (w *grpcWorker) MovePredicate(ctx context.Context, if err := n.proposeAndWait(ctx, &intern.Proposal{State: in.State}); err != nil { return &emptyPayload, err } - for i := 0; ; i++ { + aborted := false + for i := 0; i < 12; i++ { + // Try a dozen times, then give up. x.Printf("Trying to abort pending mutations. Loop: %d", i) tctxs := posting.Txns().Iterate(func(key []byte) bool { pk := x.Parse(key) return pk.Attr == in.Predicate }) if len(tctxs) == 0 { + aborted = true break } tryAbortTransactions(tctxs) } + if !aborted { + return &emptyPayload, errUnableToAbort + } // We iterate over badger, so need to flush and wait for sync watermark to catch up. n.applyAllMarks(ctx) diff --git a/worker/task.go b/worker/task.go index 17708c7d..64ef61f1 100644 --- a/worker/task.go +++ b/worker/task.go @@ -594,17 +594,17 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *intern.Query, gid uint32) (*intern.Result, error) { n := groups().Node - if err := n.WaitForMinProposal(ctx, q.LinRead); err != nil { + if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return &emptyResult, err } if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Done waiting for applied watermark attr %q\n", q.Attr) + tr.LazyPrintf("Done waiting for maxPending to catch up for Attr %q, readTs: %d\n", q.Attr, q.ReadTs) } - if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { + if err := n.WaitForMinProposal(ctx, q.LinRead); err != nil { return &emptyResult, err } if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Done waiting for maxPending to catch up for Attr %q, readTs: %d\n", q.Attr, q.ReadTs) + tr.LazyPrintf("Done waiting for applied watermark attr %q\n", q.Attr) } // If a group stops serving tablet and it gets partitioned away from group zero, then it // wouldn't know that this group is no longer serving this predicate. -- GitLab