diff --git a/contrib/release.sh b/contrib/release.sh index 9fb7330146341262d9de0e6e45d9b0bc99757b91..0c6b54a98667300f2a8adf8821da2e780b7f59e4 100755 --- a/contrib/release.sh +++ b/contrib/release.sh @@ -63,10 +63,10 @@ pushd $basedir/dgraph git pull git checkout $TAG # HEAD here points to whatever is checked out. - lastCommitSHA1=$(git rev-parse --short HEAD); + lastCommitSHA1=$(git rev-parse --short HEAD) gitBranch=$(git rev-parse --abbrev-ref HEAD) lastCommitTime=$(git log -1 --format=%ci) - release_version=$(git describe --abbrev=0); + release_version=$TAG popd # Clone ratel repo. diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 8a980f27ab485fe8e89bff38e09deb90f0af7691..652c52f0c51769a54e262077fbf04802cf71b369 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -29,9 +29,8 @@ type Oracle struct { x.SafeMutex commits map[uint64]uint64 // startTs -> commitTs // TODO: Check if we need LRU. - rowCommit map[string]uint64 // fp(key) -> commitTs. Used to detect conflict. - aborts map[uint64]struct{} // key is startTs - maxAssigned uint64 // max transaction assigned by us. + rowCommit map[string]uint64 // fp(key) -> commitTs. Used to detect conflict. + maxAssigned uint64 // max transaction assigned by us. // timestamp at the time of start of server or when it became leader. Used to detect conflicts. tmax uint64 @@ -46,7 +45,6 @@ type Oracle struct { func (o *Oracle) Init() { o.commits = make(map[uint64]uint64) o.rowCommit = make(map[string]uint64) - o.aborts = make(map[uint64]struct{}) o.subscribers = make(map[int]chan *intern.OracleDelta) o.updates = make(chan *intern.OracleDelta, 100000) // Keeping 1 second worth of updates. o.doneUntil.Init() @@ -74,9 +72,6 @@ func (o *Oracle) hasConflict(src *api.TxnContext) bool { } func (o *Oracle) purgeBelow(minTs uint64) { - x.Printf("purging below ts:%d, len(o.commits):%d, len(o.aborts):%d"+ - ", len(o.rowCommit):%d\n", - minTs, len(o.commits), len(o.aborts), len(o.rowCommit)) o.Lock() defer o.Unlock() @@ -86,11 +81,6 @@ func (o *Oracle) purgeBelow(minTs uint64) { delete(o.commits, ts) } } - for ts := range o.aborts { - if ts < minTs { - delete(o.aborts, ts) - } - } // There is no transaction running with startTs less than minTs // So we can delete everything from rowCommit whose commitTs < minTs for key, ts := range o.rowCommit { @@ -99,6 +89,9 @@ func (o *Oracle) purgeBelow(minTs uint64) { } } o.tmax = minTs + x.Printf("Purged below ts:%d, len(o.commits):%d"+ + ", len(o.rowCommit):%d\n", + minTs, len(o.commits), len(o.rowCommit)) } func (o *Oracle) commit(src *api.TxnContext) error { @@ -114,13 +107,6 @@ func (o *Oracle) commit(src *api.TxnContext) error { return nil } -func (o *Oracle) aborted(startTs uint64) bool { - o.Lock() - defer o.Unlock() - _, ok := o.aborts[startTs] - return ok -} - func sortTxns(delta *intern.OracleDelta) { sort.Slice(delta.Txns, func(i, j int) bool { return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs @@ -134,10 +120,6 @@ func (o *Oracle) currentState() *intern.OracleDelta { resp.Txns = append(resp.Txns, &intern.TxnStatus{StartTs: start, CommitTs: commit}) } - for abort := range o.aborts { - resp.Txns = append(resp.Txns, - &intern.TxnStatus{StartTs: abort, CommitTs: 0}) - } resp.MaxAssigned = o.maxAssigned return resp } @@ -205,11 +187,8 @@ func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) boo if _, ok := o.commits[src.StartTs]; ok { return false } - if _, ok := o.aborts[src.StartTs]; ok { - return false - } if src.Aborted { - o.aborts[src.StartTs] = struct{}{} + o.commits[src.StartTs] = 0 } else { o.commits[src.StartTs] = src.CommitTs } @@ -220,13 +199,10 @@ func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) boo func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) { if o.updateCommitStatusHelper(index, src) { delta := new(intern.OracleDelta) - if src.Aborted { - delta.Txns = append(delta.Txns, - &intern.TxnStatus{StartTs: src.StartTs, CommitTs: 0}) - } else { - delta.Txns = append(delta.Txns, - &intern.TxnStatus{StartTs: src.StartTs, CommitTs: src.CommitTs}) - } + delta.Txns = append(delta.Txns, &intern.TxnStatus{ + StartTs: src.StartTs, + CommitTs: o.commitTs(src.StartTs), + }) o.updates <- delta } } @@ -258,6 +234,8 @@ func (o *Oracle) MaxPending() uint64 { var errConflict = errors.New("Transaction conflict") +// proposeTxn proposes a txn update, and then updates src to reflect the state +// of the commit after proposal is run. func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { var zp intern.ZeroProposal zp.Txn = &api.TxnContext{ @@ -265,11 +243,24 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { CommitTs: src.CommitTs, Aborted: src.Aborted, } - return s.Node.proposeAndWait(ctx, &zp) + if err := s.Node.proposeAndWait(ctx, &zp); err != nil { + return err + } + + // There might be race between this proposal trying to commit and predicate + // move aborting it. A predicate move, triggered by Zero, would abort all + // pending transactions. At the same time, a client which has already done + // mutations, can proceed to commit it. A race condition can happen here, + // with both proposing their respective states, only one can succeed after + // the proposal is done. So, check again to see the fate of the transaction + // here. + src.CommitTs = s.orc.commitTs(src.StartTs) + if src.CommitTs == 0 { + src.Aborted = true + } + return nil } -// TODO: This function implies that src has been modified to represent the latest status of the -// transaction. I don't think that's happening properly. func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { if src.Aborted { return s.proposeTxn(ctx, src) @@ -322,23 +313,14 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { return err } src.CommitTs = assigned.StartId + // Mark the transaction as done, irrespective of whether the proposal succeeded or not. + defer s.orc.doneUntil.Done(src.CommitTs) if err := s.orc.commit(src); err != nil { src.Aborted = true } // Propose txn should be used to set watermark as done. - err = s.proposeTxn(ctx, src) - // There might be race between this proposal trying to commit and predicate - // move aborting it. A predicate move, triggered by Zero, would abort all pending transactions. - // At the same time, a client which has already done mutations, can proceed to commit it. A race - // condition can happen here, with both proposing their respective states, only one can succeed - // after the proposal is done. So, check again to see the fate of the transaction here. - if s.orc.aborted(src.StartTs) { - src.Aborted = true - } - // Mark the transaction as done, irrespective of whether the proposal succeeded or not. - s.orc.doneUntil.Done(src.CommitTs) - return err + return s.proposeTxn(ctx, src) } func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) { @@ -404,7 +386,7 @@ func (s *Server) SyncedUntil() uint64 { } func (s *Server) purgeOracle() { - ticker := time.NewTicker(time.Second * 10) + ticker := time.NewTicker(time.Second * 30) defer ticker.Stop() var lastPurgeTs uint64 diff --git a/edgraph/server.go b/edgraph/server.go index 18d103d5b6ef2ce275f2f30297e6ed2f665f592c..c8c0796c4c3cbab6ff45fb0f547c031a4d1b6463 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -355,9 +355,11 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign // ApplyMutations failed. We now want to abort the transaction, // ignoring any error that might occur during the abort (the user would // care more about the previous error). - ctxn := resp.Context - ctxn.Aborted = true - _, _ = worker.CommitOverNetwork(ctx, ctxn) + if resp.Context == nil { + resp.Context = &api.TxnContext{StartTs: mu.StartTs} + } + resp.Context.Aborted = true + _, _ = worker.CommitOverNetwork(ctx, resp.Context) if err == y.ErrConflict { // We have already aborted the transaction, so the error message should reflect that. diff --git a/posting/oracle.go b/posting/oracle.go index c685fe1d710c91fd78ce7e952a0a8e1a60a1104e..460b860ff0648853a3afc7d2ddab4e0c9ab69273 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -19,6 +19,8 @@ import ( var o *oracle +// TODO: Oracle should probably be located in worker package, instead of posting +// package now that we don't run inSnapshot anymore. func Oracle() *oracle { return o } @@ -81,8 +83,8 @@ func (o *oracle) RegisterStartTs(ts uint64) *Txn { return txn } -// minPendingStartTs returns the min start ts which is currently pending a commit or abort decision. -func (o *oracle) minPendingStartTs() uint64 { +// MinPendingStartTs returns the min start ts which is currently pending a commit or abort decision. +func (o *oracle) MinPendingStartTs() uint64 { o.RLock() defer o.RUnlock() min := uint64(math.MaxUint64) @@ -100,7 +102,7 @@ func (o *oracle) PurgeTs() uint64 { // o.MinPendingStartTs can be inf, but we don't want Zero to delete new // records that haven't yet reached us. So, we also consider MaxAssigned // that we have received so far, so only records below MaxAssigned are purged. - return x.Min(o.minPendingStartTs()-1, o.MaxAssigned()) + return x.Min(o.MinPendingStartTs()-1, o.MaxAssigned()) } func (o *oracle) TxnOlderThan(dur time.Duration) (res []uint64) { diff --git a/worker/draft.go b/worker/draft.go index b70e40c5a9369338810736a2c750ca1cdb568f3a..b11bfcc71fef55dc51ee8c4622fc21719c5e392f 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -761,7 +761,7 @@ func (n *node) abortOldTransactions() { // aborted. This way, we still keep all the mutations corresponding to this // start ts in the Raft logs. This is important, because we don't persist // pre-writes to disk in pstore. -// - Considering the above, find the maximum commit timestamp that we have seen. +// - Find the maximum commit timestamp that we have seen. // That would tell us about the maximum timestamp used to do any commits. This // ts is what we can use for future reads of this snapshot. // - Finally, this function would propose this snapshot index, so the entire @@ -807,22 +807,19 @@ func (n *node) calculateSnapshot(discardN int) (*intern.Snapshot, error) { return nil, err } - // We iterate over Raft entries, parsing them to Proposals. We do two - // things: - // 1. Create a start timestamp -> first raft index map. - // 2. We find the max commit timestamp present, as maxCommitTs (/ ReadTs). - // As we see transaction commits, we remove the entries from the startToIdx - // map, so that by the end of iteration, we only keep the entries in map, - // which correspond to mutations which haven't yet been committed, aka - // pending mutations. We pick the lowest start ts, and the index - // corresponding to that becomes our snapshotIdx. We keep all the Raft - // entries including this one, so that on a replay, we can pick all the - // mutations correctly. - - // This map holds a start ts as key, and Raft index as value. - startToIdx := make(map[uint64]uint64) - done := make(map[uint64]struct{}) - var maxCommitTs, snapshotIdx uint64 + // We can't rely upon the Raft entries to determine the minPendingStart, + // because there are many cases during mutations where we don't commit or + // abort the transaction. This might happen due to an early error thrown. + // Only the mutations which make it to Zero for a commit/abort decision have + // corresponding Delta entries. So, instead of replicating all that logic + // here, we just use the MinPendingStartTs tracked by the Oracle, and look + // for that in the logs. + // + // So, we iterate over logs. If we hit MinPendingStartTs, that generates our + // snapshotIdx. In any case, we continue picking up txn updates, to generate + // a maxCommitTs, which would become the readTs for the snapshot. + minPendingStart := posting.Oracle().MinPendingStartTs() + var maxCommitTs, snapshotIdx, maxCommitIdx uint64 for _, entry := range entries { if entry.Type != raftpb.EntryNormal { continue @@ -833,54 +830,37 @@ func (n *node) calculateSnapshot(discardN int) (*intern.Snapshot, error) { tr.SetError() return nil, err } - // We only track the mutations which contain edges. Not the mutations - // which have schema, or dropall, etc. - if proposal.Mutations != nil && len(proposal.Mutations.Edges) > 0 { - ts := proposal.Mutations.StartTs - if _, has := startToIdx[ts]; !has { - startToIdx[ts] = entry.Index - } - if _, has := done[ts]; has { - x.Errorf("Found a mutation after txn was done: %d. Ignoring.\n", ts) + if proposal.Mutations != nil { + start := proposal.Mutations.StartTs + if start >= minPendingStart && snapshotIdx == 0 { + snapshotIdx = entry.Index - 1 } - } else if proposal.Delta != nil { + } + if proposal.Delta != nil { for _, txn := range proposal.Delta.GetTxns() { - // This mutation is done. We use done map, instead of deleting from startToIdx map, - // so that if we have a mutation which came after an abort of a transaction, we can - // still mark it as done, and not get stuck with that mutation forever. - done[txn.StartTs] = struct{}{} maxCommitTs = x.Max(maxCommitTs, txn.CommitTs) } - snapshotIdx = entry.Index + maxCommitIdx = entry.Index } } if maxCommitTs == 0 { tr.LazyPrintf("maxCommitTs is zero") return nil, nil } - var minPendingTs uint64 = math.MaxUint64 - // It is possible that this loop doesn't execute, because all transactions have been committed. - // In that case, we'll default to snapshotIdx value from above. - for startTs, index := range startToIdx { - if _, ok := done[startTs]; ok { - continue - } - if minPendingTs < startTs { - continue - } - // Pick the lowest startTs, and the corresponding Raft index - 1. We - // deduct one so that the Raft entry can be picked fully during replay. - // This becomes our snapshotIdx. - minPendingTs, snapshotIdx = startTs, index-1 + if snapshotIdx <= 0 { + // It is possible that there are no pending transactions. In that case, + // snapshotIdx would be zero. + tr.LazyPrintf("Using maxCommitIdx as snapshotIdx: %d", maxCommitIdx) + snapshotIdx = maxCommitIdx } numDiscarding := snapshotIdx - first + 1 tr.LazyPrintf("Got snapshotIdx: %d. MaxCommitTs: %d. Discarding: %d. MinPendingStartTs: %d", - snapshotIdx, maxCommitTs, numDiscarding, minPendingTs) + snapshotIdx, maxCommitTs, numDiscarding, minPendingStart) if int(numDiscarding) < discardN { tr.LazyPrintf("Skipping snapshot because insufficient discard entries") x.Printf("Skipping snapshot at index: %d. Insufficient discard entries: %d."+ - " MinPendingStartTs: %d\n", snapshotIdx, numDiscarding, minPendingTs) + " MinPendingStartTs: %d\n", snapshotIdx, numDiscarding, minPendingStart) return nil, nil }