diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 7f0b893ceff213ce94fa336142e428f20a2d278f..6d35e2b090e83ab5566d9a4a84215925ca860b5c 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -94,8 +94,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.ZeroProposal return ctx.Err() } - propose := func() error { - cctx, cancel := context.WithTimeout(ctx, 15*time.Second) + propose := func(timeout time.Duration) error { + cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() che := make(chan error, 1) @@ -140,8 +140,10 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.ZeroProposal // Having a timeout here prevents the mutation being stuck forever in case they don't have a // timeout. We should always try with a timeout and optionally retry. err := errInternalRetry + timeout := 4 * time.Second for err == errInternalRetry { - err = propose() + err = propose(timeout) + timeout *= 2 // Exponential backoff } return err } diff --git a/posting/list.go b/posting/list.go index f53e8bb7988f506c6d6ab822c760b5e705f8e03f..e8e96cfede8809e9e23182c2593993b51c0d321d 100644 --- a/posting/list.go +++ b/posting/list.go @@ -222,8 +222,11 @@ func (l *List) EstimatedSize() int32 { // SetForDeletion will mark this List to be deleted, so no more mutations can be applied to this. func (l *List) SetForDeletion() bool { - l.Lock() - defer l.Unlock() + if l.AlreadyLocked() { + return false + } + l.RLock() + defer l.RUnlock() for _, plist := range l.mutationMap { if plist.Commit == 0 { return false @@ -400,8 +403,8 @@ func (l *List) commitMutation(startTs, commitTs uint64) error { l.commitTs = commitTs } - // Calculate 5% of immutable layer - numUids := (bp128.NumIntegers(l.plist.Uids) * 5) / 100 + // Calculate 10% of immutable layer + numUids := (bp128.NumIntegers(l.plist.Uids) * 10) / 100 if numUids < 1000 { numUids = 1000 } diff --git a/posting/mvcc.go b/posting/mvcc.go index 9544c3da27227136cd704ef1d4c9f352f57b868d..336794e0a3fc9c0f16e8938f5cb11c4b47d7b07a 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -68,15 +68,19 @@ func (tx *Txn) CommitToDisk(writer *x.TxnWriter, commitTs uint64) error { if commitTs == 0 { return nil } + var keys []string tx.Lock() - defer tx.Unlock() + for key := range tx.deltas { + keys = append(keys, key) + } + tx.Unlock() // TODO: Simplify this. All we need to do is to get the PL for the key, and if it has the // postings for the startTs, we commit them. Otherwise, we skip. // Also, if the snapshot read ts is above the commit ts, then we just delete the postings from // memory, instead of writing them back again. - for key := range tx.deltas { + for _, key := range keys { plist, err := Get([]byte(key)) if err != nil { return err diff --git a/worker/draft.go b/worker/draft.go index 7bc8a95348938aa03959cf488f7eb73c24d7d44a..2205a4a7467c090d962d064665d6c45bf82bdd85 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -159,8 +159,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er } } - propose := func() error { - cctx, cancel := context.WithTimeout(ctx, 15*time.Second) + propose := func(timeout time.Duration) error { + cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() che := make(chan error, 1) @@ -174,7 +174,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er proposal.Key = key if tr, ok := trace.FromContext(ctx); ok { - tr.LazyPrintf("Proposing data with key: %s", key) + tr.LazyPrintf("Proposing data with key: %s. Timeout: %v", key, timeout) } data, err := proposal.Marshal() @@ -214,8 +214,10 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er // Having a timeout here prevents the mutation being stuck forever in case they don't have a // timeout. We should always try with a timeout and optionally retry. err := errInternalRetry + timeout := 4 * time.Second for err == errInternalRetry { - err = propose() + err = propose(timeout) + timeout *= 2 // Exponential backoff } return err } @@ -245,24 +247,30 @@ func (n *node) applyConfChange(e raftpb.Entry) { n.DoneConfChange(cc.ID, nil) } -func waitForConflictResolution(attr string) error { - for i := 0; i < 10; i++ { - tctxs := posting.Oracle().IterateTxns(func(key []byte) bool { - pk := x.Parse(key) - return pk.Attr == attr - }) - if len(tctxs) == 0 { - return nil - } - tryAbortTransactions(tctxs) +var errHasPendingTxns = errors.New("Pending transactions found. Please retry operation.") + +// We must not wait here. Previously, we used to block until we have aborted the +// transactions. We're now applying all updates serially, so blocking for one +// operation is not an option. +func detectPendingTxns(attr string) error { + tctxs := posting.Oracle().IterateTxns(func(key []byte) bool { + pk := x.Parse(key) + return pk.Attr == attr + }) + if len(tctxs) == 0 { + return nil } - return errors.New("Unable to abort transactions") + go tryAbortTransactions(tctxs) + return errHasPendingTxns } // We don't support schema mutations across nodes in a transaction. // Wait for all transactions to either abort or complete and all write transactions // involving the predicate are aborted until schema mutations are done. func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error { + tr := trace.New("Dgraph.Node", "ApplyMutations") + defer tr.Finish() + if proposal.Mutations.DropAll { // Ensures nothing get written to disk due to commit proposals. posting.Oracle().ResetTxns() @@ -278,6 +286,7 @@ func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error { ctx := n.Ctx(proposal.Key) if len(proposal.Mutations.Schema) > 0 { + tr.LazyPrintf("Applying Schema") for _, supdate := range proposal.Mutations.Schema { // This is neceassry to ensure that there is no race between when we start reading // from badger and new mutation getting commited via raft and getting applied. @@ -289,7 +298,7 @@ func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error { if tablet := groups().Tablet(supdate.Predicate); tablet != nil && tablet.ReadOnly { return errPredicateMoving } - if err := waitForConflictResolution(supdate.Predicate); err != nil { + if err := detectPendingTxns(supdate.Predicate); err != nil { return err } if err := runSchemaMutation(ctx, supdate, startTs); err != nil { @@ -310,13 +319,19 @@ func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error { schemaMap := make(map[string]types.TypeID) for _, edge := range proposal.Mutations.Edges { if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly { + tr.LazyPrintf("Predicate Moving") + tr.SetError() return errPredicateMoving } if edge.Entity == 0 && bytes.Equal(edge.Value, []byte(x.Star)) { - // We should only have one edge drop in one mutation call. - if err := waitForConflictResolution(edge.Attr); err != nil { + // We should only drop the predicate if there is no pending + // transaction. + if err := detectPendingTxns(edge.Attr); err != nil { + tr.LazyPrintf("Found pending transactions which obstruct operation.") + tr.SetError() return err } + tr.LazyPrintf("Deleting predicate") return posting.DeletePredicate(ctx, edge.Attr) } // Dont derive schema when doing deletion. @@ -345,17 +360,22 @@ func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error { m := proposal.Mutations txn := posting.Oracle().RegisterStartTs(m.StartTs) if txn.ShouldAbort() { + tr.LazyPrintf("Should Abort") + tr.SetError() return dy.ErrConflict } + tr.LazyPrintf("Applying %d edges", len(m.Edges)) for _, edge := range m.Edges { err := posting.ErrRetry for err == posting.ErrRetry { err = runMutation(ctx, edge, txn) } if err != nil { + tr.SetError() return err } } + tr.LazyPrintf("Done applying %d edges", len(m.Edges)) return nil } @@ -566,7 +586,7 @@ func (n *node) Run() { var tr trace.Trace if len(rd.Entries) > 0 || !raft.IsEmptySnap(rd.Snapshot) || !raft.IsEmptyHardState(rd.HardState) { // Optionally, trace this run. - tr = trace.New("Dgraph", "RunLoop") + tr = trace.New("Dgraph.Raft", "RunLoop") } if rd.SoftState != nil { @@ -727,7 +747,7 @@ func (n *node) blockingAbort(req *intern.TxnTimestamps) error { defer cancel() delta, err := zc.TryAbort(ctx, req) - x.Printf("Aborted txns with start ts: %v. Error: %v\n", req.Ts, err) + x.Printf("TryAbort %d txns with start ts. Error: %v\n", len(req.Ts), err) if err != nil || len(delta.Txns) == 0 { return err } diff --git a/x/lock.go b/x/lock.go index fae63c77a8e35472bdd62f94d83957a4ea0d463e..3acc7f5f90d11cadd88d7f6e9d6ac09a612231b1 100644 --- a/x/lock.go +++ b/x/lock.go @@ -20,6 +20,10 @@ type SafeMutex struct { readers int32 } +func (s *SafeMutex) AlreadyLocked() bool { + return atomic.LoadInt32(&s.writer) > 0 +} + func (s *SafeMutex) Lock() { s.m.Lock() AssertTrue(atomic.AddInt32(&s.writer, 1) == 1) @@ -31,7 +35,7 @@ func (s *SafeMutex) Unlock() { } func (s *SafeMutex) AssertLock() { - AssertTrue(atomic.LoadInt32(&s.writer) == 1) + AssertTrue(s.AlreadyLocked()) } func (s *SafeMutex) RLock() {