diff --git a/dgraph/cmd/zero/tablet.go b/dgraph/cmd/zero/tablet.go index f69658c72f433b9f6df63ca649e5f88216145d37..642a650405d81a4053094c82d587f6ce070062f4 100644 --- a/dgraph/cmd/zero/tablet.go +++ b/dgraph/cmd/zero/tablet.go @@ -215,8 +215,10 @@ func (s *Server) moveTablet(ctx context.Context, predicate string, srcGroup uint dstGroup uint32) error { err := s.movePredicateHelper(ctx, predicate, srcGroup, dstGroup) if err == nil { + // If no error, then return immediately. return nil } + x.Printf("Got error during move: %v", err) if !s.Node.AmLeader() { s.runRecovery() return err @@ -231,8 +233,9 @@ func (s *Server) moveTablet(ctx context.Context, predicate string, srcGroup uint Space: stab.Space, Force: true, } - if err := s.Node.proposeAndWait(context.Background(), p); err != nil { - x.Printf("Error while reverting group %d to RW: %+v\n", srcGroup, err) + if nerr := s.Node.proposeAndWait(context.Background(), p); nerr != nil { + x.Printf("Error while reverting group %d to RW: %+v\n", srcGroup, nerr) + return nerr } return err } @@ -267,7 +270,7 @@ func (s *Server) movePredicateHelper(ctx context.Context, predicate string, srcG DestGroupId: dstGroup, } if _, err := c.MovePredicate(ctx, in); err != nil { - return fmt.Errorf("While caling MovePredicate: %+v\n", err) + return fmt.Errorf("While calling MovePredicate: %+v\n", err) } // Propose that predicate is served by dstGroup in RW. diff --git a/posting/lists.go b/posting/lists.go index 7717d58d55ce96063db3aea36562312da1f9bd13..4dae0917041a0eb9760f175bd2b8d08e0aa1fe73 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -305,7 +305,7 @@ func CommitLists(commit func(key []byte) bool) { // use the Delete function. txn := pstore.NewTransactionAt(1, true) defer txn.Discard() - x.Check(txn.Delete(x.DataKey("dummy", 1))) + x.Check(txn.Delete(x.DataKey("_dummy_", 1))) // Nothing is being read, so there can't be an ErrConflict. This should go to disk. if err := txn.CommitAt(1, nil); err != nil { x.Printf("Commit unexpectedly failed with error: %v", err) diff --git a/posting/mvcc.go b/posting/mvcc.go index 4cd0bf9e49bc38874cfd5ff79572f8fa809cdb97..20be04532d10a195ceee2eca33360be93640bb66 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -413,9 +413,11 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return nil, err } l.minTs = item.Version() - it.Next() + // No need to do Next here. The outer loop can take care of skipping more versions of + // the same key. break - } else if item.UserMeta()&bitDeltaPosting > 0 { + } + if item.UserMeta()&bitDeltaPosting > 0 { var pl intern.PostingList x.Check(pl.Unmarshal(val)) for _, mpost := range pl.Postings { diff --git a/worker/export.go b/worker/export.go index cf52e470d2a3110cbf0a074fa2d8abbd9a427c60..f869e88a30b6d555160a48aab99a5aea51046aab 100644 --- a/worker/export.go +++ b/worker/export.go @@ -267,6 +267,9 @@ func export(bdir string, readTs uint64) error { defer txn.Discard() iterOpts := badger.DefaultIteratorOptions iterOpts.PrefetchValues = false + // We don't ask for all the versions. So, this would only return the 1 version for each key, iff + // that version is valid. So, we don't need to check in the iteration loop if the item is + // deleted or expired. it := txn.NewIterator(iterOpts) defer it.Close() prefix := new(bytes.Buffer) @@ -297,7 +300,7 @@ func export(bdir string, readTs uint64) error { continue } - if pk.Attr == "_predicate_" || pk.Attr == "_dummy_" { + if pk.Attr == "_predicate_" { // Skip the UID mappings. it.Seek(pk.SkipPredicate()) continue diff --git a/worker/mutation.go b/worker/mutation.go index f0ddbbce287ac350499be9908a847981b1a45002..2beb25b635ef34d7565b9e10b7fcf0c0bab738cf 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -608,6 +608,7 @@ func tryAbortTransactions(startTimestamps []uint64) { req := &intern.TxnTimestamps{Ts: startTimestamps} resp, err := zc.TryAbort(context.Background(), req) for err != nil { + x.Printf("Error while trying to abort txns: %v\n", err) resp, err = zc.TryAbort(context.Background(), req) } commitTimestamps := resp.Ts @@ -621,6 +622,7 @@ func tryAbortTransactions(startTimestamps []uint64) { // TODO - Make sure all other errors are transient, we don't want to be stuck in an infinite // loop. for err != nil && err != posting.ErrInvalidTxn { + x.Printf("Error while locally aborting txns: %v\n", err) // This will fail only due to badger error. _, err = commitOrAbort(context.Background(), tctx) } diff --git a/worker/predicate_move.go b/worker/predicate_move.go index be6371cfe662194edacc5d96e588c61c1efecd50..c2a5105f999ce1997913af52e029b473726d9056 100644 --- a/worker/predicate_move.go +++ b/worker/predicate_move.go @@ -31,6 +31,7 @@ import ( var ( errEmptyPredicate = x.Errorf("Predicate not specified") errNotLeader = x.Errorf("Server is not leader of this group") + errUnableToAbort = x.Errorf("Unable to abort pending transactions") emptyPayload = api.Payload{} ) @@ -304,17 +305,23 @@ func (w *grpcWorker) MovePredicate(ctx context.Context, if err := n.proposeAndWait(ctx, &intern.Proposal{State: in.State}); err != nil { return &emptyPayload, err } - for i := 0; ; i++ { + aborted := false + for i := 0; i < 12; i++ { + // Try a dozen times, then give up. x.Printf("Trying to abort pending mutations. Loop: %d", i) tctxs := posting.Txns().Iterate(func(key []byte) bool { pk := x.Parse(key) return pk.Attr == in.Predicate }) if len(tctxs) == 0 { + aborted = true break } tryAbortTransactions(tctxs) } + if !aborted { + return &emptyPayload, errUnableToAbort + } // We iterate over badger, so need to flush and wait for sync watermark to catch up. n.applyAllMarks(ctx) diff --git a/worker/task.go b/worker/task.go index 17708c7d52d9e84b13c216b59d3cb5393e50670e..64ef61f1436e67691b996155960abe27c5c265d2 100644 --- a/worker/task.go +++ b/worker/task.go @@ -594,17 +594,17 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *intern.Query, gid uint32) (*intern.Result, error) { n := groups().Node - if err := n.WaitForMinProposal(ctx, q.LinRead); err != nil { + if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return &emptyResult, err } if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Done waiting for applied watermark attr %q\n", q.Attr) + tr.LazyPrintf("Done waiting for maxPending to catch up for Attr %q, readTs: %d\n", q.Attr, q.ReadTs) } - if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { + if err := n.WaitForMinProposal(ctx, q.LinRead); err != nil { return &emptyResult, err } if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Done waiting for maxPending to catch up for Attr %q, readTs: %d\n", q.Attr, q.ReadTs) + tr.LazyPrintf("Done waiting for applied watermark attr %q\n", q.Attr) } // If a group stops serving tablet and it gets partitioned away from group zero, then it // wouldn't know that this group is no longer serving this predicate.