diff --git a/server/main.go b/server/main.go index 2c857348e0e99f9fffd9ab054ef6c7899f193a42..18c0bed29fa3146e067e46e5c809e92ae8ed58a0 100644 --- a/server/main.go +++ b/server/main.go @@ -88,6 +88,7 @@ func mutationHandler(mu *gql.Mutation) error { } } if err := worker.GetOrAssignUidsOverNetwork(&xidToUid); err != nil { + glog.WithError(err).Error("GetOrAssignUidsOverNetwork") return err } @@ -143,7 +144,20 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { x.SetStatus(w, x.E_INVALID_REQUEST, err.Error()) return } - mutationHandler(mu) + + // If we have mutations, run them first. + if mu != nil && len(mu.Set) > 0 { + if err = mutationHandler(mu); err != nil { + glog.WithError(err).Error("While handling mutations.") + x.SetStatus(w, x.E_ERROR, err.Error()) + return + } + } + + if gq == nil || (gq.UID == 0 && len(gq.XID) == 0) { + x.SetStatus(w, x.E_OK, "Done") + return + } sg, err := query.ToSubGraph(gq) if err != nil { @@ -204,6 +218,10 @@ func main() { addrs := strings.Split(*workers, ",") lenAddr := uint64(len(addrs)) + if lenAddr == 0 { + // If no worker is specified, then we're it. + lenAddr = 1 + } posting.Init(clog) diff --git a/worker/assign.go b/worker/assign.go index 52c4a721fd87be768a1ff26ff8481a4294571938..d74fea052e2b86c97366e08b14fc585812bd2072 100644 --- a/worker/assign.go +++ b/worker/assign.go @@ -1,6 +1,7 @@ package worker import ( + "fmt" "sync" "github.com/dgraph-io/dgraph/conn" @@ -33,6 +34,10 @@ func createXidListBuffer(xids map[string]uint64) []byte { func getOrAssignUids( xidList *task.XidList) (uidList []byte, rerr error) { + if xidList.XidsLength() == 0 { + return uidList, fmt.Errorf("Empty xid list") + } + wg := new(sync.WaitGroup) uids := make([]uint64, xidList.XidsLength()) che := make(chan error, xidList.XidsLength()) @@ -40,15 +45,15 @@ func getOrAssignUids( wg.Add(1) xid := string(xidList.Xids(i)) - go func() { + go func(idx int) { defer wg.Done() u, err := uid.GetOrAssign(xid, 0, 1) if err != nil { che <- err return } - uids[i] = u - }() + uids[idx] = u + }(i) } wg.Wait() close(che) diff --git a/worker/mutation.go b/worker/mutation.go index 8b204b78a542e3c62167aa945e4e57a1d2e407c0..37d6a4d4a97348b360795d2f657165a68a8155f2 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -42,6 +42,7 @@ func mutate(m *Mutations, left *Mutations) error { return fmt.Errorf("predicate fingerprint doesn't match this instance.") } + glog.WithField("edge", edge).Debug("mutate") key := posting.Key(edge.Entity, edge.Attribute) plist := posting.GetOrCreate(key, dataStore) if err := plist.AddMutation(edge, posting.Set); err != nil { @@ -93,6 +94,7 @@ func MutateOverNetwork( mu := mutationArray[idx] if mu == nil { mu = new(Mutations) + mutationArray[idx] = mu } mu.Set = append(mu.Set, edge) }