From b21f099d7abbcb53beb2e64089e60c1c75cb15c9 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Wed, 18 Jul 2018 14:50:38 -0700
Subject: [PATCH] Fix proposeTxn and calculateSnapshot (#2493)

- Fix up proposeTxn, so we are updating the CommitTs as per the status in Zero oracle.
- Fix #2474 by dealing with a nil resp.Context in server.go
- Fix calculateSnapshot. It is not a given that every StartTs in Mutation proposal would have a corresponding commit/abort decision. So, use Oracle's MinPendingStartTs to decide how to calculate snapshot.
- Remove o.aborts in Zero. The work can be done by o.commits map.
---
 contrib/release.sh        |  4 +-
 dgraph/cmd/zero/oracle.go | 82 +++++++++++++++------------------------
 edgraph/server.go         |  8 ++--
 posting/oracle.go         |  8 ++--
 worker/draft.go           | 76 +++++++++++++-----------------------
 5 files changed, 72 insertions(+), 106 deletions(-)

diff --git a/contrib/release.sh b/contrib/release.sh
index 9fb73301..0c6b54a9 100755
--- a/contrib/release.sh
+++ b/contrib/release.sh
@@ -63,10 +63,10 @@ pushd $basedir/dgraph
   git pull
   git checkout $TAG
   # HEAD here points to whatever is checked out.
-  lastCommitSHA1=$(git rev-parse --short HEAD);
+  lastCommitSHA1=$(git rev-parse --short HEAD)
   gitBranch=$(git rev-parse --abbrev-ref HEAD)
   lastCommitTime=$(git log -1 --format=%ci)
-  release_version=$(git describe --abbrev=0);
+  release_version=$TAG
 popd
 
 # Clone ratel repo.
diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go
index 8a980f27..652c52f0 100644
--- a/dgraph/cmd/zero/oracle.go
+++ b/dgraph/cmd/zero/oracle.go
@@ -29,9 +29,8 @@ type Oracle struct {
 	x.SafeMutex
 	commits map[uint64]uint64 // startTs -> commitTs
 	// TODO: Check if we need LRU.
-	rowCommit   map[string]uint64   // fp(key) -> commitTs. Used to detect conflict.
-	aborts      map[uint64]struct{} // key is startTs
-	maxAssigned uint64              // max transaction assigned by us.
+	rowCommit   map[string]uint64 // fp(key) -> commitTs. Used to detect conflict.
+	maxAssigned uint64            // max transaction assigned by us.
 
 	// timestamp at the time of start of server or when it became leader. Used to detect conflicts.
 	tmax uint64
@@ -46,7 +45,6 @@ type Oracle struct {
 func (o *Oracle) Init() {
 	o.commits = make(map[uint64]uint64)
 	o.rowCommit = make(map[string]uint64)
-	o.aborts = make(map[uint64]struct{})
 	o.subscribers = make(map[int]chan *intern.OracleDelta)
 	o.updates = make(chan *intern.OracleDelta, 100000) // Keeping 1 second worth of updates.
 	o.doneUntil.Init()
@@ -74,9 +72,6 @@ func (o *Oracle) hasConflict(src *api.TxnContext) bool {
 }
 
 func (o *Oracle) purgeBelow(minTs uint64) {
-	x.Printf("purging below ts:%d, len(o.commits):%d, len(o.aborts):%d"+
-		", len(o.rowCommit):%d\n",
-		minTs, len(o.commits), len(o.aborts), len(o.rowCommit))
 	o.Lock()
 	defer o.Unlock()
 
@@ -86,11 +81,6 @@ func (o *Oracle) purgeBelow(minTs uint64) {
 			delete(o.commits, ts)
 		}
 	}
-	for ts := range o.aborts {
-		if ts < minTs {
-			delete(o.aborts, ts)
-		}
-	}
 	// There is no transaction running with startTs less than minTs
 	// So we can delete everything from rowCommit whose commitTs < minTs
 	for key, ts := range o.rowCommit {
@@ -99,6 +89,9 @@ func (o *Oracle) purgeBelow(minTs uint64) {
 		}
 	}
 	o.tmax = minTs
+	x.Printf("Purged below ts:%d, len(o.commits):%d"+
+		", len(o.rowCommit):%d\n",
+		minTs, len(o.commits), len(o.rowCommit))
 }
 
 func (o *Oracle) commit(src *api.TxnContext) error {
@@ -114,13 +107,6 @@ func (o *Oracle) commit(src *api.TxnContext) error {
 	return nil
 }
 
-func (o *Oracle) aborted(startTs uint64) bool {
-	o.Lock()
-	defer o.Unlock()
-	_, ok := o.aborts[startTs]
-	return ok
-}
-
 func sortTxns(delta *intern.OracleDelta) {
 	sort.Slice(delta.Txns, func(i, j int) bool {
 		return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
@@ -134,10 +120,6 @@ func (o *Oracle) currentState() *intern.OracleDelta {
 		resp.Txns = append(resp.Txns,
 			&intern.TxnStatus{StartTs: start, CommitTs: commit})
 	}
-	for abort := range o.aborts {
-		resp.Txns = append(resp.Txns,
-			&intern.TxnStatus{StartTs: abort, CommitTs: 0})
-	}
 	resp.MaxAssigned = o.maxAssigned
 	return resp
 }
@@ -205,11 +187,8 @@ func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) boo
 	if _, ok := o.commits[src.StartTs]; ok {
 		return false
 	}
-	if _, ok := o.aborts[src.StartTs]; ok {
-		return false
-	}
 	if src.Aborted {
-		o.aborts[src.StartTs] = struct{}{}
+		o.commits[src.StartTs] = 0
 	} else {
 		o.commits[src.StartTs] = src.CommitTs
 	}
@@ -220,13 +199,10 @@ func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) boo
 func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) {
 	if o.updateCommitStatusHelper(index, src) {
 		delta := new(intern.OracleDelta)
-		if src.Aborted {
-			delta.Txns = append(delta.Txns,
-				&intern.TxnStatus{StartTs: src.StartTs, CommitTs: 0})
-		} else {
-			delta.Txns = append(delta.Txns,
-				&intern.TxnStatus{StartTs: src.StartTs, CommitTs: src.CommitTs})
-		}
+		delta.Txns = append(delta.Txns, &intern.TxnStatus{
+			StartTs:  src.StartTs,
+			CommitTs: o.commitTs(src.StartTs),
+		})
 		o.updates <- delta
 	}
 }
@@ -258,6 +234,8 @@ func (o *Oracle) MaxPending() uint64 {
 
 var errConflict = errors.New("Transaction conflict")
 
+// proposeTxn proposes a txn update, and then updates src to reflect the state
+// of the commit after proposal is run.
 func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
 	var zp intern.ZeroProposal
 	zp.Txn = &api.TxnContext{
@@ -265,11 +243,24 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
 		CommitTs: src.CommitTs,
 		Aborted:  src.Aborted,
 	}
-	return s.Node.proposeAndWait(ctx, &zp)
+	if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
+		return err
+	}
+
+	// There might be race between this proposal trying to commit and predicate
+	// move aborting it. A predicate move, triggered by Zero, would abort all
+	// pending transactions.  At the same time, a client which has already done
+	// mutations, can proceed to commit it. A race condition can happen here,
+	// with both proposing their respective states, only one can succeed after
+	// the proposal is done. So, check again to see the fate of the transaction
+	// here.
+	src.CommitTs = s.orc.commitTs(src.StartTs)
+	if src.CommitTs == 0 {
+		src.Aborted = true
+	}
+	return nil
 }
 
-// TODO: This function implies that src has been modified to represent the latest status of the
-// transaction. I don't think that's happening properly.
 func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
 	if src.Aborted {
 		return s.proposeTxn(ctx, src)
@@ -322,23 +313,14 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
 		return err
 	}
 	src.CommitTs = assigned.StartId
+	// Mark the transaction as done, irrespective of whether the proposal succeeded or not.
+	defer s.orc.doneUntil.Done(src.CommitTs)
 
 	if err := s.orc.commit(src); err != nil {
 		src.Aborted = true
 	}
 	// Propose txn should be used to set watermark as done.
-	err = s.proposeTxn(ctx, src)
-	// There might be race between this proposal trying to commit and predicate
-	// move aborting it. A predicate move, triggered by Zero, would abort all pending transactions.
-	// At the same time, a client which has already done mutations, can proceed to commit it. A race
-	// condition can happen here, with both proposing their respective states, only one can succeed
-	// after the proposal is done. So, check again to see the fate of the transaction here.
-	if s.orc.aborted(src.StartTs) {
-		src.Aborted = true
-	}
-	// Mark the transaction as done, irrespective of whether the proposal succeeded or not.
-	s.orc.doneUntil.Done(src.CommitTs)
-	return err
+	return s.proposeTxn(ctx, src)
 }
 
 func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) {
@@ -404,7 +386,7 @@ func (s *Server) SyncedUntil() uint64 {
 }
 
 func (s *Server) purgeOracle() {
-	ticker := time.NewTicker(time.Second * 10)
+	ticker := time.NewTicker(time.Second * 30)
 	defer ticker.Stop()
 
 	var lastPurgeTs uint64
diff --git a/edgraph/server.go b/edgraph/server.go
index 18d103d5..c8c0796c 100644
--- a/edgraph/server.go
+++ b/edgraph/server.go
@@ -355,9 +355,11 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
 		// ApplyMutations failed. We now want to abort the transaction,
 		// ignoring any error that might occur during the abort (the user would
 		// care more about the previous error).
-		ctxn := resp.Context
-		ctxn.Aborted = true
-		_, _ = worker.CommitOverNetwork(ctx, ctxn)
+		if resp.Context == nil {
+			resp.Context = &api.TxnContext{StartTs: mu.StartTs}
+		}
+		resp.Context.Aborted = true
+		_, _ = worker.CommitOverNetwork(ctx, resp.Context)
 
 		if err == y.ErrConflict {
 			// We have already aborted the transaction, so the error message should reflect that.
diff --git a/posting/oracle.go b/posting/oracle.go
index c685fe1d..460b860f 100644
--- a/posting/oracle.go
+++ b/posting/oracle.go
@@ -19,6 +19,8 @@ import (
 
 var o *oracle
 
+// TODO: Oracle should probably be located in worker package, instead of posting
+// package now that we don't run inSnapshot anymore.
 func Oracle() *oracle {
 	return o
 }
@@ -81,8 +83,8 @@ func (o *oracle) RegisterStartTs(ts uint64) *Txn {
 	return txn
 }
 
-// minPendingStartTs returns the min start ts which is currently pending a commit or abort decision.
-func (o *oracle) minPendingStartTs() uint64 {
+// MinPendingStartTs returns the min start ts which is currently pending a commit or abort decision.
+func (o *oracle) MinPendingStartTs() uint64 {
 	o.RLock()
 	defer o.RUnlock()
 	min := uint64(math.MaxUint64)
@@ -100,7 +102,7 @@ func (o *oracle) PurgeTs() uint64 {
 	// o.MinPendingStartTs can be inf, but we don't want Zero to delete new
 	// records that haven't yet reached us. So, we also consider MaxAssigned
 	// that we have received so far, so only records below MaxAssigned are purged.
-	return x.Min(o.minPendingStartTs()-1, o.MaxAssigned())
+	return x.Min(o.MinPendingStartTs()-1, o.MaxAssigned())
 }
 
 func (o *oracle) TxnOlderThan(dur time.Duration) (res []uint64) {
diff --git a/worker/draft.go b/worker/draft.go
index b70e40c5..b11bfcc7 100644
--- a/worker/draft.go
+++ b/worker/draft.go
@@ -761,7 +761,7 @@ func (n *node) abortOldTransactions() {
 // aborted. This way, we still keep all the mutations corresponding to this
 // start ts in the Raft logs. This is important, because we don't persist
 // pre-writes to disk in pstore.
-// - Considering the above, find the maximum commit timestamp that we have seen.
+// - Find the maximum commit timestamp that we have seen.
 // That would tell us about the maximum timestamp used to do any commits. This
 // ts is what we can use for future reads of this snapshot.
 // - Finally, this function would propose this snapshot index, so the entire
@@ -807,22 +807,19 @@ func (n *node) calculateSnapshot(discardN int) (*intern.Snapshot, error) {
 		return nil, err
 	}
 
-	// We iterate over Raft entries, parsing them to Proposals. We do two
-	// things:
-	// 1. Create a start timestamp -> first raft index map.
-	// 2. We find the max commit timestamp present, as maxCommitTs (/ ReadTs).
-	// As we see transaction commits, we remove the entries from the startToIdx
-	// map, so that by the end of iteration, we only keep the entries in map,
-	// which correspond to mutations which haven't yet been committed, aka
-	// pending mutations. We pick the lowest start ts, and the index
-	// corresponding to that becomes our snapshotIdx. We keep all the Raft
-	// entries including this one, so that on a replay, we can pick all the
-	// mutations correctly.
-
-	// This map holds a start ts as key, and Raft index as value.
-	startToIdx := make(map[uint64]uint64)
-	done := make(map[uint64]struct{})
-	var maxCommitTs, snapshotIdx uint64
+	// We can't rely upon the Raft entries to determine the minPendingStart,
+	// because there are many cases during mutations where we don't commit or
+	// abort the transaction. This might happen due to an early error thrown.
+	// Only the mutations which make it to Zero for a commit/abort decision have
+	// corresponding Delta entries. So, instead of replicating all that logic
+	// here, we just use the MinPendingStartTs tracked by the Oracle, and look
+	// for that in the logs.
+	//
+	// So, we iterate over logs. If we hit MinPendingStartTs, that generates our
+	// snapshotIdx. In any case, we continue picking up txn updates, to generate
+	// a maxCommitTs, which would become the readTs for the snapshot.
+	minPendingStart := posting.Oracle().MinPendingStartTs()
+	var maxCommitTs, snapshotIdx, maxCommitIdx uint64
 	for _, entry := range entries {
 		if entry.Type != raftpb.EntryNormal {
 			continue
@@ -833,54 +830,37 @@ func (n *node) calculateSnapshot(discardN int) (*intern.Snapshot, error) {
 			tr.SetError()
 			return nil, err
 		}
-		// We only track the mutations which contain edges. Not the mutations
-		// which have schema, or dropall, etc.
-		if proposal.Mutations != nil && len(proposal.Mutations.Edges) > 0 {
-			ts := proposal.Mutations.StartTs
-			if _, has := startToIdx[ts]; !has {
-				startToIdx[ts] = entry.Index
-			}
-			if _, has := done[ts]; has {
-				x.Errorf("Found a mutation after txn was done: %d. Ignoring.\n", ts)
+		if proposal.Mutations != nil {
+			start := proposal.Mutations.StartTs
+			if start >= minPendingStart && snapshotIdx == 0 {
+				snapshotIdx = entry.Index - 1
 			}
-		} else if proposal.Delta != nil {
+		}
+		if proposal.Delta != nil {
 			for _, txn := range proposal.Delta.GetTxns() {
-				// This mutation is done. We use done map, instead of deleting from startToIdx map,
-				// so that if we have a mutation which came after an abort of a transaction, we can
-				// still mark it as done, and not get stuck with that mutation forever.
-				done[txn.StartTs] = struct{}{}
 				maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)
 			}
-			snapshotIdx = entry.Index
+			maxCommitIdx = entry.Index
 		}
 	}
 	if maxCommitTs == 0 {
 		tr.LazyPrintf("maxCommitTs is zero")
 		return nil, nil
 	}
-	var minPendingTs uint64 = math.MaxUint64
-	// It is possible that this loop doesn't execute, because all transactions have been committed.
-	// In that case, we'll default to snapshotIdx value from above.
-	for startTs, index := range startToIdx {
-		if _, ok := done[startTs]; ok {
-			continue
-		}
-		if minPendingTs < startTs {
-			continue
-		}
-		// Pick the lowest startTs, and the corresponding Raft index - 1. We
-		// deduct one so that the Raft entry can be picked fully during replay.
-		// This becomes our snapshotIdx.
-		minPendingTs, snapshotIdx = startTs, index-1
+	if snapshotIdx <= 0 {
+		// It is possible that there are no pending transactions. In that case,
+		// snapshotIdx would be zero.
+		tr.LazyPrintf("Using maxCommitIdx as snapshotIdx: %d", maxCommitIdx)
+		snapshotIdx = maxCommitIdx
 	}
 
 	numDiscarding := snapshotIdx - first + 1
 	tr.LazyPrintf("Got snapshotIdx: %d. MaxCommitTs: %d. Discarding: %d. MinPendingStartTs: %d",
-		snapshotIdx, maxCommitTs, numDiscarding, minPendingTs)
+		snapshotIdx, maxCommitTs, numDiscarding, minPendingStart)
 	if int(numDiscarding) < discardN {
 		tr.LazyPrintf("Skipping snapshot because insufficient discard entries")
 		x.Printf("Skipping snapshot at index: %d. Insufficient discard entries: %d."+
-			" MinPendingStartTs: %d\n", snapshotIdx, numDiscarding, minPendingTs)
+			" MinPendingStartTs: %d\n", snapshotIdx, numDiscarding, minPendingStart)
 		return nil, nil
 	}
 
-- 
GitLab