diff --git a/contrib/integration/acctupsert/main.go b/contrib/integration/acctupsert/main.go index 2b000e1675de98f345e0758ac114648da0a606aa..fc9f09e309204359fc8a1f5672a253914f3874c3 100644 --- a/contrib/integration/acctupsert/main.go +++ b/contrib/integration/acctupsert/main.go @@ -12,6 +12,7 @@ import ( "encoding/json" "flag" "fmt" + "math/rand" "strings" "sync" "sync/atomic" @@ -33,6 +34,7 @@ var ( firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"} lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"} ages = []int{20, 25, 30, 35} + types = []string{"CEO", "COO", "CTO", "CFO"} ) type account struct { @@ -135,10 +137,12 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { txn := c.NewTxn() defer txn.Discard(ctx) + // expand(_all_) {uid} q := fmt.Sprintf(` { get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) { uid + expand(_all_) {uid} } } `, acc.first, acc.last, acc.age) @@ -153,6 +157,10 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { x.Check(json.Unmarshal(resp.GetJson(), &decode)) x.AssertTrue(len(decode.Get) <= 1) + s := rand.NewSource(time.Now().Unix()) + r := rand.New(s) // initialize local pseudorandom generator + t := r.Intn(len(types)) + var uid string if len(decode.Get) == 1 { x.AssertTrue(decode.Get[0].Uid != nil) @@ -162,8 +170,9 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { _:acct <first> %q . _:acct <last> %q . _:acct <age> "%d"^^<xs:int> . - `, - acc.first, acc.last, acc.age, + _:acct <%s> "" . + `, + acc.first, acc.last, acc.age, types[t], ) mu := &api.Mutation{SetNquads: []byte(nqs)} assigned, err := txn.Mutate(ctx, mu) @@ -172,7 +181,6 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { } uid = assigned.GetUids()["acct"] x.AssertTrue(uid != "") - } nq := fmt.Sprintf(` diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 1b6a4b9147dbd620c3257df4af51544279398ce1..ac22f1b29bec0a80d3c7d426bd0cfed9632fd789 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -8,6 +8,7 @@ package cmd import ( + goflag "flag" "fmt" "os" @@ -19,6 +20,7 @@ import ( "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/x" "github.com/spf13/cobra" + flag "github.com/spf13/pflag" "github.com/spf13/viper" ) @@ -40,6 +42,7 @@ cluster. // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { + goflag.Parse() if err := RootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) @@ -61,6 +64,7 @@ func init() { RootCmd.PersistentFlags().Bool("expose_trace", false, "Allow trace endpoint to be accessible from remote") rootConf.BindPFlags(RootCmd.PersistentFlags()) + flag.CommandLine.AddGoFlagSet(goflag.CommandLine) var subcommands = []*x.SubCommand{ &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug, diff --git a/posting/list.go b/posting/list.go index e8e96cfede8809e9e23182c2593993b51c0d321d..60df6d71b0e60edf9c20d601225e733de84c12ab 100644 --- a/posting/list.go +++ b/posting/list.go @@ -914,6 +914,8 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) { // If list consists of one or more languages, first available value is returned; if no language // from list match the values, processing is the same as for empty list. func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) { + l.RLock() + defer l.RUnlock() p, err := l.postingFor(readTs, langs) if err != nil { return rval, err @@ -922,8 +924,7 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err } func (l *List) postingFor(readTs uint64, langs []string) (p *intern.Posting, rerr error) { - l.RLock() - defer l.RUnlock() + l.AssertRLock() return l.postingForLangs(readTs, langs) } diff --git a/raftwal/storage.go b/raftwal/storage.go index b3f5760b24dc695e1d98110039a19e6431888330..96751dd6b49155ef268304ecef1397fc14b67917 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -17,6 +17,7 @@ import ( "github.com/coreos/etcd/raft" pb "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger" + "github.com/golang/glog" "golang.org/x/net/trace" "github.com/dgraph-io/dgraph/x" @@ -65,7 +66,20 @@ func (u *txnUnifier) Cancel() { type localCache struct { sync.RWMutex - snap pb.Snapshot + firstIndex uint64 + snap pb.Snapshot +} + +func (c *localCache) setFirst(first uint64) { + c.Lock() + defer c.Unlock() + c.firstIndex = first +} + +func (c *localCache) first() uint64 { + c.RLock() + defer c.RUnlock() + return c.firstIndex } func (c *localCache) setSnapshot(s pb.Snapshot) { @@ -238,9 +252,26 @@ func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint6 func (w *DiskStorage) FirstIndex() (uint64, error) { snap := w.cache.snapshot() if !raft.IsEmptySnap(snap) { + if glog.V(3) { + glog.Infof("Cached. Snapshot index: %d", snap.Metadata.Index) + } return snap.Metadata.Index + 1, nil } + if first := w.cache.first(); first > 0 { + if glog.V(3) { + glog.Infof("Cached. First: %d", first) + } + return first, nil + } index, err := w.seekEntry(nil, 0, false) + if err == nil { + if glog.V(2) { + glog.Infof("Setting first index: %d", index+1) + } + w.cache.setFirst(index + 1) + } else { + glog.Errorf("While seekEntry. Error: %v", err) + } return index + 1, err } @@ -549,11 +580,17 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error } func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error { + if glog.V(2) { + glog.Info("CreateSnapshot i=%d.", i) + } first, err := w.FirstIndex() if err != nil { return err } if i < first { + if glog.V(2) { + glog.Errorf("i=%d<first=%d, ErrSnapOutOfDate", i, first) + } return raft.ErrSnapOutOfDate } diff --git a/worker/draft.go b/worker/draft.go index 3f582ead9fbf2ee69334aa566144fc25c63ec305..666f63d34dda1be32679c0c2535241831e6849c5 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/golang/glog" "golang.org/x/net/context" "golang.org/x/net/trace" @@ -365,10 +366,12 @@ func (n *node) applyMutations(proposal *intern.Proposal, index uint64) error { return dy.ErrConflict } tr.LazyPrintf("Applying %d edges", len(m.Edges)) - for _, edge := range m.Edges { + for idx, edge := range m.Edges { err := posting.ErrRetry for err == posting.ErrRetry { + tr.LazyPrintf("Applying edge=%+v\n", edge) err = runMutation(ctx, edge, txn) + tr.LazyPrintf("Applying idx=%d . Err=%v", idx, err) } if err != nil { tr.SetError() @@ -411,7 +414,7 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error { x.Printf("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs) data, err := snap.Marshal() x.Check(err) - // We can now discard all invalid versions of keys below this ts. + // We can now discard all invalid versions of keys below this ts. pstore.SetDiscardTs(snap.ReadTs) return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data) @@ -959,6 +962,9 @@ func (n *node) InitAndStartNode() { } } n.SetRaft(raft.RestartNode(n.Cfg)) + if glog.V(2) { + glog.Infof("Restart node complete") + } } else { x.Printf("New Node for group: %d\n", n.gid) if _, hasPeer := groups().MyPeer(); hasPeer { diff --git a/worker/task.go b/worker/task.go index a578f4f49d6f4fe5e219e00d3e0058407bab7a5b..99f355d08aaabad79f0a2a250a875851d3674395 100644 --- a/worker/task.go +++ b/worker/task.go @@ -594,6 +594,9 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *intern.Query, gid uint32) (*intern.Result, error) { n := groups().Node + if tr, ok := trace.FromContext(ctx); ok { + tr.LazyPrintf("Attr: %v. Waiting for Oracle waitForTs: %d", q.Attr, q.ReadTs) + } if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return &emptyResult, err } diff --git a/x/lock.go b/x/lock.go index 3acc7f5f90d11cadd88d7f6e9d6ac09a612231b1..d6553ed526c7e3e133c35a654ebe27527e81b217 100644 --- a/x/lock.go +++ b/x/lock.go @@ -10,11 +10,14 @@ package x import ( "sync" "sync/atomic" + + deadlock "github.com/sasha-s/go-deadlock" ) // SafeMutex can be used in place of sync.RWMutex type SafeMutex struct { - m sync.RWMutex + m deadlock.RWMutex + // m sync.RWMutex wait *SafeWait writer int32 readers int32