Skip to content
Snippets Groups Projects
Unverified Commit c9c97952 authored by Manish R Jain's avatar Manish R Jain
Browse files

RAFT: Membership information

Keep track of membership information for all nodes of all groups in the
cluster, routing all information via raft group zero. Use this
information to figure out the peers a new node should connect to, or if
it should start as a single-node raft group.

The peer flag is now only used to fill in the membership information of
the cluster, with no assumptions being made for this peer to be part of
any particular RAFT group (this was the behavior before).
parent 831e1ca9
Branches
No related tags found
No related merge requests found
...@@ -28,13 +28,13 @@ func NewUIDList(data []uint64) *UIDList { ...@@ -28,13 +28,13 @@ func NewUIDList(data []uint64) *UIDList {
// FromUints initialize UIDList from []uint64. // FromUints initialize UIDList from []uint64.
func (u *UIDList) FromUints(data []uint64) { func (u *UIDList) FromUints(data []uint64) {
x.Assert(u != nil && u.uints == nil && u.list == nil) x.AssertTrue(u != nil && u.uints == nil && u.list == nil)
u.uints = data u.uints = data
} }
// FromUints initialize UIDList from task.UidList. // FromUints initialize UIDList from task.UidList.
func (u *UIDList) FromTask(data *task.UidList) { func (u *UIDList) FromTask(data *task.UidList) {
x.Assert(u != nil && u.uints == nil && u.list == nil) x.AssertTrue(u != nil && u.uints == nil && u.list == nil)
u.list = data u.list = data
} }
...@@ -43,7 +43,7 @@ func FromTaskResult(r *task.Result) []*UIDList { ...@@ -43,7 +43,7 @@ func FromTaskResult(r *task.Result) []*UIDList {
out := make([]*UIDList, r.UidmatrixLength()) out := make([]*UIDList, r.UidmatrixLength())
for i := 0; i < r.UidmatrixLength(); i++ { for i := 0; i < r.UidmatrixLength(); i++ {
tl := new(task.UidList) tl := new(task.UidList)
x.Assert(r.Uidmatrix(tl, i)) x.AssertTrue(r.Uidmatrix(tl, i))
ul := new(UIDList) ul := new(UIDList)
ul.FromTask(tl) ul.FromTask(tl)
out[i] = ul out[i] = ul
...@@ -56,7 +56,7 @@ func FromSortResult(r *task.SortResult) []*UIDList { ...@@ -56,7 +56,7 @@ func FromSortResult(r *task.SortResult) []*UIDList {
out := make([]*UIDList, r.UidmatrixLength()) out := make([]*UIDList, r.UidmatrixLength())
for i := 0; i < r.UidmatrixLength(); i++ { for i := 0; i < r.UidmatrixLength(); i++ {
tl := new(task.UidList) tl := new(task.UidList)
x.Assert(r.Uidmatrix(tl, i)) x.AssertTrue(r.Uidmatrix(tl, i))
ul := new(UIDList) ul := new(UIDList)
ul.FromTask(tl) ul.FromTask(tl)
out[i] = ul out[i] = ul
...@@ -66,13 +66,13 @@ func FromSortResult(r *task.SortResult) []*UIDList { ...@@ -66,13 +66,13 @@ func FromSortResult(r *task.SortResult) []*UIDList {
// AddSlice adds a list of uint64s to UIDList. // AddSlice adds a list of uint64s to UIDList.
func (u *UIDList) AddSlice(e []uint64) { func (u *UIDList) AddSlice(e []uint64) {
x.Assert(u.uints != nil) x.AssertTrue(u.uints != nil)
u.uints = append(u.uints, e...) u.uints = append(u.uints, e...)
} }
// Add adds a single uint64 to UIDList. // Add adds a single uint64 to UIDList.
func (u *UIDList) Add(e uint64) { func (u *UIDList) Add(e uint64) {
x.Assert(u.uints != nil) x.AssertTrue(u.uints != nil)
u.uints = append(u.uints, e) u.uints = append(u.uints, e)
} }
...@@ -100,7 +100,7 @@ func (u *UIDList) Size() int { ...@@ -100,7 +100,7 @@ func (u *UIDList) Size() int {
// Reslice selects a slice of the data. // Reslice selects a slice of the data.
func (u *UIDList) ApplyFilter(f func(uint64, int) bool) { func (u *UIDList) ApplyFilter(f func(uint64, int) bool) {
x.Assert(u != nil && (u.uints != nil || u.list != nil)) x.AssertTrue(u != nil && (u.uints != nil || u.list != nil))
var out []uint64 var out []uint64
if u.uints != nil { if u.uints != nil {
out = u.uints[:0] out = u.uints[:0]
...@@ -120,15 +120,15 @@ func (u *UIDList) ApplyFilter(f func(uint64, int) bool) { ...@@ -120,15 +120,15 @@ func (u *UIDList) ApplyFilter(f func(uint64, int) bool) {
// Slice selects a slice of the data. // Slice selects a slice of the data.
func (u *UIDList) Slice(start, end int) { func (u *UIDList) Slice(start, end int) {
x.Assert(u != nil && (u.uints != nil || u.list != nil)) x.AssertTrue(u != nil && (u.uints != nil || u.list != nil))
if u.uints != nil { if u.uints != nil {
u.uints = u.uints[start:end] u.uints = u.uints[start:end]
return return
} }
// This is a task list. Let's copy what we want and convert to a []uint64. // This is a task list. Let's copy what we want and convert to a []uint64.
x.Assert(start >= 0) x.AssertTrue(start >= 0)
x.Assert(end <= u.Size()) x.AssertTrue(end <= u.Size())
x.Assert(start <= end) x.AssertTrue(start <= end)
output := make([]uint64, 0, end-start) output := make([]uint64, 0, end-start)
for i := start; i < end; i++ { for i := start; i < end; i++ {
output = append(output, u.list.Uids(i)) output = append(output, u.list.Uids(i))
...@@ -139,7 +139,7 @@ func (u *UIDList) Slice(start, end int) { ...@@ -139,7 +139,7 @@ func (u *UIDList) Slice(start, end int) {
// Intersect intersects with another list and updates this list. // Intersect intersects with another list and updates this list.
func (u *UIDList) Intersect(v *UIDList) { func (u *UIDList) Intersect(v *UIDList) {
x.Assert(u != nil && (u.uints != nil || u.list != nil)) x.AssertTrue(u != nil && (u.uints != nil || u.list != nil))
var out []uint64 var out []uint64
if u.uints != nil { if u.uints != nil {
out = u.uints[:0] out = u.uints[:0]
...@@ -205,7 +205,7 @@ func IntersectLists(lists []*UIDList) *UIDList { ...@@ -205,7 +205,7 @@ func IntersectLists(lists []*UIDList) *UIDList {
for i := 0; i < shortList.Size(); i++ { for i := 0; i < shortList.Size(); i++ {
val := shortList.Get(i) val := shortList.Get(i)
if i > 0 && val == shortList.Get(i-1) { if i > 0 && val == shortList.Get(i-1) {
x.Assertf(false, "We shouldn't have duplicates in UIDLists") x.AssertTruef(false, "We shouldn't have duplicates in UIDLists")
} }
var skip bool // Should we skip val in output? var skip bool // Should we skip val in output?
...@@ -285,7 +285,7 @@ func MergeLists(lists []*UIDList) *UIDList { ...@@ -285,7 +285,7 @@ func MergeLists(lists []*UIDList) *UIDList {
// IndexOf performs a binary search on the uids slice and returns the index at // IndexOf performs a binary search on the uids slice and returns the index at
// which it finds the uid, else returns -1 // which it finds the uid, else returns -1
func (u *UIDList) IndexOf(uid uint64) int { func (u *UIDList) IndexOf(uid uint64) int {
x.Assert(u != nil && (u.uints != nil || u.list != nil)) x.AssertTrue(u != nil && (u.uints != nil || u.list != nil))
i := sort.Search(u.Size(), func(i int) bool { return u.Get(i) >= uid }) i := sort.Search(u.Size(), func(i int) bool { return u.Get(i) >= uid })
if i < u.Size() && u.Get(i) == uid { if i < u.Size() && u.Get(i) == uid {
return i return i
...@@ -295,7 +295,7 @@ func (u *UIDList) IndexOf(uid uint64) int { ...@@ -295,7 +295,7 @@ func (u *UIDList) IndexOf(uid uint64) int {
// UidlistOffset adds a UidList to buffer and returns the offset. // UidlistOffset adds a UidList to buffer and returns the offset.
func (u *UIDList) AddTo(b *flatbuffers.Builder) flatbuffers.UOffsetT { func (u *UIDList) AddTo(b *flatbuffers.Builder) flatbuffers.UOffsetT {
x.Assert(u != nil && (u.uints != nil || u.list != nil)) x.AssertTrue(u != nil && (u.uints != nil || u.list != nil))
n := u.Size() n := u.Size()
task.UidListStartUidsVector(b, n) task.UidListStartUidsVector(b, n)
for i := n - 1; i >= 0; i-- { for i := n - 1; i >= 0; i-- {
...@@ -338,6 +338,6 @@ func ToUintsListForTest(ul []*UIDList) [][]uint64 { ...@@ -338,6 +338,6 @@ func ToUintsListForTest(ul []*UIDList) [][]uint64 {
// Swap swaps two elements. Logs fatal if UIDList is not stored as []uint64. // Swap swaps two elements. Logs fatal if UIDList is not stored as []uint64.
func (u *UIDList) Swap(i, j int) { func (u *UIDList) Swap(i, j int) {
x.Assert(u.uints != nil) x.AssertTrue(u.uints != nil)
u.uints[i], u.uints[j] = u.uints[j], u.uints[i] u.uints[i], u.uints[j] = u.uints[j], u.uints[i]
} }
...@@ -79,7 +79,7 @@ func QueryTokens(f *Filter) ([]string, *QueryData, error) { ...@@ -79,7 +79,7 @@ func QueryTokens(f *Filter) ([]string, *QueryData, error) {
return nil, nil, x.Errorf("Cannot query using a geometry of type %T", v) return nil, nil, x.Errorf("Cannot query using a geometry of type %T", v)
} }
x.Assertf(l != nil || pt != nil, "We should have a point or a loop.") x.AssertTruef(l != nil || pt != nil, "We should have a point or a loop.")
parents, cover, err := indexCells(g) parents, cover, err := indexCells(g)
if err != nil { if err != nil {
...@@ -152,7 +152,7 @@ func (q QueryData) MatchesFilter(g types.Geo) bool { ...@@ -152,7 +152,7 @@ func (q QueryData) MatchesFilter(g types.Geo) bool {
// returns true if the geometry represented by g is within the given loop or cap // returns true if the geometry represented by g is within the given loop or cap
func (q QueryData) isWithin(g types.Geo) bool { func (q QueryData) isWithin(g types.Geo) bool {
x.Assertf(q.pt != nil || q.loop != nil || q.cap != nil, "At least a point, loop or cap should be defined.") x.AssertTruef(q.pt != nil || q.loop != nil || q.cap != nil, "At least a point, loop or cap should be defined.")
gpt, ok := g.T.(*geom.Point) gpt, ok := g.T.(*geom.Point)
if !ok { if !ok {
// We will only consider points for within queries. // We will only consider points for within queries.
...@@ -172,7 +172,7 @@ func (q QueryData) isWithin(g types.Geo) bool { ...@@ -172,7 +172,7 @@ func (q QueryData) isWithin(g types.Geo) bool {
// returns true if the geometry represented by uid/attr contains the given point // returns true if the geometry represented by uid/attr contains the given point
func (q QueryData) contains(g types.Geo) bool { func (q QueryData) contains(g types.Geo) bool {
x.Assertf(q.pt != nil || q.loop != nil, "At least a point or loop should be defined.") x.AssertTruef(q.pt != nil || q.loop != nil, "At least a point or loop should be defined.")
if q.loop != nil { if q.loop != nil {
// We don't support polygons containing polygons yet. // We don't support polygons containing polygons yet.
return false return false
...@@ -193,7 +193,7 @@ func (q QueryData) contains(g types.Geo) bool { ...@@ -193,7 +193,7 @@ func (q QueryData) contains(g types.Geo) bool {
// returns true if the geometry represented by uid/attr intersects the given loop or point // returns true if the geometry represented by uid/attr intersects the given loop or point
func (q QueryData) intersects(g types.Geo) bool { func (q QueryData) intersects(g types.Geo) bool {
x.Assertf(q.pt != nil || q.loop != nil, "At least a point or loop should be defined.") x.AssertTruef(q.pt != nil || q.loop != nil, "At least a point or loop should be defined.")
switch v := g.T.(type) { switch v := g.T.(type) {
case *geom.Point: case *geom.Point:
p := pointFromPoint(v) p := pointFromPoint(v)
......
...@@ -33,7 +33,7 @@ func parentCoverTokens(parents s2.CellUnion, cover s2.CellUnion) []string { ...@@ -33,7 +33,7 @@ func parentCoverTokens(parents s2.CellUnion, cover s2.CellUnion) []string {
tokens := make([]string, 0, len(parents)+len(cover)) tokens := make([]string, 0, len(parents)+len(cover))
tokens = appendTokens(tokens, parents, parentPrefix) tokens = appendTokens(tokens, parents, parentPrefix)
tokens = appendTokens(tokens, cover, coverPrefix) tokens = appendTokens(tokens, cover, coverPrefix)
x.Assertf(len(tokens) == len(parents)+len(cover), "%d %d %d", x.AssertTruef(len(tokens) == len(parents)+len(cover), "%d %d %d",
len(tokens), len(parents), len(cover)) len(tokens), len(parents), len(cover))
return tokens return tokens
} }
......
...@@ -598,7 +598,7 @@ func (t *FilterTree) debugString() string { ...@@ -598,7 +598,7 @@ func (t *FilterTree) debugString() string {
// stringHelper does simple DFS to convert FilterTree to string. // stringHelper does simple DFS to convert FilterTree to string.
func (t *FilterTree) stringHelper(buf *bytes.Buffer) { func (t *FilterTree) stringHelper(buf *bytes.Buffer) {
x.Assert(t != nil) x.AssertTrue(t != nil)
if len(t.FuncName) > 0 { if len(t.FuncName) > 0 {
// Leaf node. // Leaf node.
_, err := buf.WriteRune('(') _, err := buf.WriteRune('(')
...@@ -647,14 +647,14 @@ func (s *filterTreeStack) size() int { return len(s.a) } ...@@ -647,14 +647,14 @@ func (s *filterTreeStack) size() int { return len(s.a) }
func (s *filterTreeStack) push(t *FilterTree) { s.a = append(s.a, t) } func (s *filterTreeStack) push(t *FilterTree) { s.a = append(s.a, t) }
func (s *filterTreeStack) pop() *FilterTree { func (s *filterTreeStack) pop() *FilterTree {
x.Assertf(!s.empty(), "Trying to pop empty stack") x.AssertTruef(!s.empty(), "Trying to pop empty stack")
last := s.a[len(s.a)-1] last := s.a[len(s.a)-1]
s.a = s.a[:len(s.a)-1] s.a = s.a[:len(s.a)-1]
return last return last
} }
func (s *filterTreeStack) peek() *FilterTree { func (s *filterTreeStack) peek() *FilterTree {
x.Assertf(!s.empty(), "Trying to peek empty stack") x.AssertTruef(!s.empty(), "Trying to peek empty stack")
return s.a[len(s.a)-1] return s.a[len(s.a)-1]
} }
...@@ -723,7 +723,7 @@ func parseFilter(l *lex.Lexer) (*FilterTree, error) { ...@@ -723,7 +723,7 @@ func parseFilter(l *lex.Lexer) (*FilterTree, error) {
op = "|" op = "|"
} }
opPred := filterOpPrecedence[op] opPred := filterOpPrecedence[op]
x.Assertf(opPred > 0, "Expected opPred > 0: %d", opPred) x.AssertTruef(opPred > 0, "Expected opPred > 0: %d", opPred)
// Evaluate the stack until we see an operator with strictly lower pred. // Evaluate the stack until we see an operator with strictly lower pred.
for !opStack.empty() { for !opStack.empty() {
topOp := opStack.peek() topOp := opStack.peek()
...@@ -741,7 +741,7 @@ func parseFilter(l *lex.Lexer) (*FilterTree, error) { ...@@ -741,7 +741,7 @@ func parseFilter(l *lex.Lexer) (*FilterTree, error) {
// For other applications, typically after all items are // For other applications, typically after all items are
// consumed, we will run a loop like "while opStack is nonempty, evalStack". // consumed, we will run a loop like "while opStack is nonempty, evalStack".
// This is not needed here. // This is not needed here.
x.Assertf(opStack.empty(), "Op stack should be empty when we exit") x.AssertTruef(opStack.empty(), "Op stack should be empty when we exit")
if valueStack.empty() { if valueStack.empty() {
// This happens when we have @filter(). We can either return an error or // This happens when we have @filter(). We can either return an error or
......
...@@ -47,7 +47,7 @@ func init() { ...@@ -47,7 +47,7 @@ func init() {
// initIndex initializes the index with the given data store. // initIndex initializes the index with the given data store.
func initIndex() { func initIndex() {
x.Assert(pstore != nil) x.AssertTrue(pstore != nil)
// Initialize TokensTables. // Initialize TokensTables.
indexedFields := schema.IndexedFields() indexedFields := schema.IndexedFields()
...@@ -112,7 +112,7 @@ func indexTokens(attr string, p types.Value) ([]string, error) { ...@@ -112,7 +112,7 @@ func indexTokens(attr string, p types.Value) ([]string, error) {
// addIndexMutations adds mutation(s) for a single term, to maintain index. // addIndexMutations adds mutation(s) for a single term, to maintain index.
func addIndexMutations(ctx context.Context, attr string, uid uint64, func addIndexMutations(ctx context.Context, attr string, uid uint64,
p types.Value, del bool) { p types.Value, del bool) {
x.Assert(uid != 0) x.AssertTrue(uid != 0)
tokens, err := indexTokens(attr, p) tokens, err := indexTokens(attr, p)
if err != nil { if err != nil {
// This data is not indexable // This data is not indexable
...@@ -126,7 +126,7 @@ func addIndexMutations(ctx context.Context, attr string, uid uint64, ...@@ -126,7 +126,7 @@ func addIndexMutations(ctx context.Context, attr string, uid uint64,
} }
tokensTable := GetTokensTable(attr) tokensTable := GetTokensTable(attr)
x.Assertf(tokensTable != nil, "TokensTable missing for attr %s", attr) x.AssertTruef(tokensTable != nil, "TokensTable missing for attr %s", attr)
for _, token := range tokens { for _, token := range tokens {
addIndexMutation(ctx, attr, token, tokensTable, &edge, del) addIndexMutation(ctx, attr, token, tokensTable, &edge, del)
...@@ -138,7 +138,7 @@ func addIndexMutation(ctx context.Context, attr, token string, ...@@ -138,7 +138,7 @@ func addIndexMutation(ctx context.Context, attr, token string,
plist, decr := GetOrCreate(types.IndexKey(attr, token)) plist, decr := GetOrCreate(types.IndexKey(attr, token))
defer decr() defer decr()
x.Assertf(plist != nil, "plist is nil [%s] %d %s", x.AssertTruef(plist != nil, "plist is nil [%s] %d %s",
token, edge.ValueId, edge.Attribute) token, edge.ValueId, edge.Attribute)
if del { if del {
_, err := plist.AddMutation(ctx, *edge, Del) _, err := plist.AddMutation(ctx, *edge, Del)
...@@ -166,7 +166,7 @@ func addIndexMutation(ctx context.Context, attr, token string, ...@@ -166,7 +166,7 @@ func addIndexMutation(ctx context.Context, attr, token string,
// AddMutationWithIndex is AddMutation with support for indexing. // AddMutationWithIndex is AddMutation with support for indexing.
func (l *List) AddMutationWithIndex(ctx context.Context, t x.DirectedEdge, op byte) error { func (l *List) AddMutationWithIndex(ctx context.Context, t x.DirectedEdge, op byte) error {
x.Assertf(len(t.Attribute) > 0 && t.Attribute[0] != ':', x.AssertTruef(len(t.Attribute) > 0 && t.Attribute[0] != ':',
"[%s] [%d] [%v] %d %d\n", t.Attribute, t.Entity, t.Value, t.ValueId, op) "[%s] [%d] [%v] %d %d\n", t.Attribute, t.Entity, t.Value, t.ValueId, op)
var vbytes []byte var vbytes []byte
...@@ -209,7 +209,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, t x.DirectedEdge, op by ...@@ -209,7 +209,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, t x.DirectedEdge, op by
// GetTokensTable returns TokensTable for an indexed attribute. // GetTokensTable returns TokensTable for an indexed attribute.
func GetTokensTable(attr string) *TokensTable { func GetTokensTable(attr string) *TokensTable {
x.Assertf(tables != nil, x.AssertTruef(tables != nil,
"TokensTable uninitialized. You need to call InitIndex.") "TokensTable uninitialized. You need to call InitIndex.")
return tables[attr] return tables[attr]
} }
......
...@@ -76,7 +76,7 @@ func (l *List) refCount() int32 { return atomic.LoadInt32(&l.refcount) } ...@@ -76,7 +76,7 @@ func (l *List) refCount() int32 { return atomic.LoadInt32(&l.refcount) }
func (l *List) incr() int32 { return atomic.AddInt32(&l.refcount, 1) } func (l *List) incr() int32 { return atomic.AddInt32(&l.refcount, 1) }
func (l *List) decr() { func (l *List) decr() {
val := atomic.AddInt32(&l.refcount, -1) val := atomic.AddInt32(&l.refcount, -1)
x.Assertf(val >= 0, "List reference should never be less than zero: %v", val) x.AssertTruef(val >= 0, "List reference should never be less than zero: %v", val)
if val > 0 { if val > 0 {
return return
} }
...@@ -93,7 +93,7 @@ func getNew() *List { ...@@ -93,7 +93,7 @@ func getNew() *List {
l := listPool.Get().(*List) l := listPool.Get().(*List)
*l = List{} *l = List{}
l.wg.Add(1) l.wg.Add(1)
x.Assert(len(l.key) == 0) x.AssertTrue(len(l.key) == 0)
l.refcount = 1 l.refcount = 1
return l return l
} }
...@@ -279,7 +279,7 @@ func (l *List) getPostingList() *types.PostingList { ...@@ -279,7 +279,7 @@ func (l *List) getPostingList() *types.PostingList {
if buf == nil || len(buf.d) == 0 { if buf == nil || len(buf.d) == 0 {
nbuf := new(buffer) nbuf := new(buffer)
var err error var err error
x.Assert(l.pstore != nil) x.AssertTrue(l.pstore != nil)
if nbuf.d, err = l.pstore.Get(l.key); err != nil || nbuf.d == nil { if nbuf.d, err = l.pstore.Get(l.key); err != nil || nbuf.d == nil {
// Error. Just set to empty. // Error. Just set to empty.
nbuf.d = make([]byte, len(empty)) nbuf.d = make([]byte, len(empty))
...@@ -301,7 +301,7 @@ func (l *List) SetForDeletion() { ...@@ -301,7 +301,7 @@ func (l *List) SetForDeletion() {
} }
func (l *List) updateMutationLayer(mpost *types.Posting) bool { func (l *List) updateMutationLayer(mpost *types.Posting) bool {
x.Assert(mpost.Op() == Set || mpost.Op() == Del) x.AssertTrue(mpost.Op() == Set || mpost.Op() == Del)
findUid := mpost.Uid() findUid := mpost.Uid()
// First check the mutable layer. // First check the mutable layer.
...@@ -347,7 +347,7 @@ func (l *List) updateMutationLayer(mpost *types.Posting) bool { ...@@ -347,7 +347,7 @@ func (l *List) updateMutationLayer(mpost *types.Posting) bool {
return true return true
} }
// Add followed by Set is considered an Add. Hence, mutate mpost.Op. // Add followed by Set is considered an Add. Hence, mutate mpost.Op.
x.Assert(mpost.MutateOp(Add)) x.AssertTrue(mpost.MutateOp(Add))
} }
l.mlayer[midx] = mpost l.mlayer[midx] = mpost
return true return true
...@@ -357,14 +357,14 @@ func (l *List) updateMutationLayer(mpost *types.Posting) bool { ...@@ -357,14 +357,14 @@ func (l *List) updateMutationLayer(mpost *types.Posting) bool {
pl := l.getPostingList() pl := l.getPostingList()
pidx := sort.Search(pl.PostingsLength(), func(idx int) bool { pidx := sort.Search(pl.PostingsLength(), func(idx int) bool {
var p types.Posting var p types.Posting
x.Assert(pl.Postings(&p, idx)) x.AssertTrue(pl.Postings(&p, idx))
return findUid <= p.Uid() return findUid <= p.Uid()
}) })
var uidFound, psame bool var uidFound, psame bool
if pidx < pl.PostingsLength() { if pidx < pl.PostingsLength() {
p := new(types.Posting) p := new(types.Posting)
x.Assertf(pl.Postings(p, pidx), "Unable to parse Posting at index: %v", pidx) x.AssertTruef(pl.Postings(p, pidx), "Unable to parse Posting at index: %v", pidx)
uidFound = mpost.Uid() == p.Uid() uidFound = mpost.Uid() == p.Uid()
if uidFound { if uidFound {
psame = samePosting(p, mpost) psame = samePosting(p, mpost)
...@@ -490,7 +490,7 @@ func (l *List) iterate(afterUid uint64, f func(obj *types.Posting) bool) { ...@@ -490,7 +490,7 @@ func (l *List) iterate(afterUid uint64, f func(obj *types.Posting) bool) {
if afterUid > 0 { if afterUid > 0 {
pidx = sort.Search(pl.PostingsLength(), func(idx int) bool { pidx = sort.Search(pl.PostingsLength(), func(idx int) bool {
p := new(types.Posting) p := new(types.Posting)
x.Assert(pl.Postings(p, idx)) x.AssertTrue(pl.Postings(p, idx))
return afterUid < p.Uid() return afterUid < p.Uid()
}) })
midx = sort.Search(len(l.mlayer), func(idx int) bool { midx = sort.Search(len(l.mlayer), func(idx int) bool {
...@@ -504,7 +504,7 @@ func (l *List) iterate(afterUid uint64, f func(obj *types.Posting) bool) { ...@@ -504,7 +504,7 @@ func (l *List) iterate(afterUid uint64, f func(obj *types.Posting) bool) {
cont := true cont := true
for cont { for cont {
if pidx < pl.PostingsLength() { if pidx < pl.PostingsLength() {
x.Assert(pl.Postings(pp, pidx)) x.AssertTrue(pl.Postings(pp, pidx))
} else { } else {
pp = empty pp = empty
} }
...@@ -545,7 +545,7 @@ func (l *List) Length(afterUid uint64) int { ...@@ -545,7 +545,7 @@ func (l *List) Length(afterUid uint64) int {
if afterUid > 0 { if afterUid > 0 {
pidx = sort.Search(pl.PostingsLength(), func(idx int) bool { pidx = sort.Search(pl.PostingsLength(), func(idx int) bool {
p := new(types.Posting) p := new(types.Posting)
x.Assert(pl.Postings(p, idx)) x.AssertTrue(pl.Postings(p, idx))
return afterUid < p.Uid() return afterUid < p.Uid()
}) })
midx = sort.Search(len(l.mlayer), func(idx int) bool { midx = sort.Search(len(l.mlayer), func(idx int) bool {
......
...@@ -78,7 +78,7 @@ func fetchIndexEntries(ctx context.Context, attr string, tokens []string) (*algo ...@@ -78,7 +78,7 @@ func fetchIndexEntries(ctx context.Context, attr string, tokens []string) (*algo
} }
} }
x.Assert(len(sg.Result) == len(tokens)) x.AssertTrue(len(sg.Result) == len(tokens))
return algo.MergeLists(sg.Result), nil return algo.MergeLists(sg.Result), nil
} }
...@@ -99,12 +99,12 @@ func fetchValues(ctx context.Context, attr string, uids *algo.UIDList) (*task.Va ...@@ -99,12 +99,12 @@ func fetchValues(ctx context.Context, attr string, uids *algo.UIDList) (*task.Va
} }
values := sg.Values values := sg.Values
x.Assert(values.ValuesLength() == uids.Size()) x.AssertTrue(values.ValuesLength() == uids.Size())
return values, nil return values, nil
} }
func filterUIDs(uids *algo.UIDList, values *task.ValueList, q *geo.QueryData) *algo.UIDList { func filterUIDs(uids *algo.UIDList, values *task.ValueList, q *geo.QueryData) *algo.UIDList {
x.Assert(values.ValuesLength() == uids.Size()) x.AssertTrue(values.ValuesLength() == uids.Size())
var rv []uint64 var rv []uint64
for i := 0; i < values.ValuesLength(); i++ { for i := 0; i < values.ValuesLength(); i++ {
var tv task.Value var tv task.Value
......
...@@ -218,9 +218,9 @@ func postTraverse(sg *SubGraph) (map[uint64]interface{}, error) { ...@@ -218,9 +218,9 @@ func postTraverse(sg *SubGraph) (map[uint64]interface{}, error) {
} }
r := sg.Result r := sg.Result
x.Assertf(sg.srcUIDs.Size() == len(r), x.AssertTruef(sg.srcUIDs.Size() == len(r),
"Result uidmatrixlength: %v. Query uidslength: %v", sg.srcUIDs.Size(), len(r)) "Result uidmatrixlength: %v. Query uidslength: %v", sg.srcUIDs.Size(), len(r))
x.Assertf(sg.srcUIDs.Size() == sg.Values.ValuesLength(), x.AssertTruef(sg.srcUIDs.Size() == sg.Values.ValuesLength(),
"Result valuelength: %v. Query uidslength: %v", sg.srcUIDs.Size(), sg.Values.ValuesLength()) "Result valuelength: %v. Query uidslength: %v", sg.srcUIDs.Size(), sg.Values.ValuesLength())
// Generate a matrix of maps // Generate a matrix of maps
...@@ -272,7 +272,7 @@ func postTraverse(sg *SubGraph) (map[uint64]interface{}, error) { ...@@ -272,7 +272,7 @@ func postTraverse(sg *SubGraph) (map[uint64]interface{}, error) {
m[sg.Attr] = l m[sg.Attr] = l
} }
if sg.GeoFilter != nil { if sg.GeoFilter != nil {
x.Assertf(len(l) == 1, "There should be exactly 1 uid at the top level.") x.AssertTruef(len(l) == 1, "There should be exactly 1 uid at the top level.")
// remove the top level attr from the result, that is only used // remove the top level attr from the result, that is only used
// for filtering the results. // for filtering the results.
result[sg.srcUIDs.Get(i)] = l[0] result[sg.srcUIDs.Get(i)] = l[0]
...@@ -553,7 +553,7 @@ func (sg *SubGraph) ToProtocolBuffer(l *Latency) (*graph.Node, error) { ...@@ -553,7 +553,7 @@ func (sg *SubGraph) ToProtocolBuffer(l *Latency) (*graph.Node, error) {
return n, nil return n, nil
} }
x.Assert(len(sg.Result) == 1) x.AssertTrue(len(sg.Result) == 1)
ul := sg.Result[0] ul := sg.Result[0]
if sg.Params.GetUid || sg.Params.isDebug { if sg.Params.GetUid || sg.Params.isDebug {
n.Uid = ul.Get(0) n.Uid = ul.Get(0)
...@@ -692,7 +692,7 @@ func newGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) { ...@@ -692,7 +692,7 @@ func newGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) {
// This would set the Result field in SubGraph, // This would set the Result field in SubGraph,
// and populate the children for attributes. // and populate the children for attributes.
if len(exid) > 0 { if len(exid) > 0 {
x.Assertf(!strings.HasPrefix(exid, "_new_:"), "Query shouldn't contain _new_") x.AssertTruef(!strings.HasPrefix(exid, "_new_:"), "Query shouldn't contain _new_")
euid = farm.Fingerprint64([]byte(exid)) euid = farm.Fingerprint64([]byte(exid))
x.Trace(ctx, "Xid: %v Uid: %v", exid, euid) x.Trace(ctx, "Xid: %v Uid: %v", exid, euid)
} }
...@@ -759,7 +759,7 @@ func createNilValuesList(count int) *task.ValueList { ...@@ -759,7 +759,7 @@ func createNilValuesList(count int) *task.ValueList {
// createTaskQuery generates the query buffer. // createTaskQuery generates the query buffer.
func createTaskQuery(sg *SubGraph, uids *algo.UIDList, tokens []string, func createTaskQuery(sg *SubGraph, uids *algo.UIDList, tokens []string,
intersect *algo.UIDList) []byte { intersect *algo.UIDList) []byte {
x.Assert(uids == nil || tokens == nil) x.AssertTrue(uids == nil || tokens == nil)
b := flatbuffers.NewBuilder(0) b := flatbuffers.NewBuilder(0)
var vend flatbuffers.UOffsetT var vend flatbuffers.UOffsetT
...@@ -783,8 +783,8 @@ func createTaskQuery(sg *SubGraph, uids *algo.UIDList, tokens []string, ...@@ -783,8 +783,8 @@ func createTaskQuery(sg *SubGraph, uids *algo.UIDList, tokens []string,
var intersectOffset flatbuffers.UOffsetT var intersectOffset flatbuffers.UOffsetT
if intersect != nil { if intersect != nil {
x.Assert(uids == nil) x.AssertTrue(uids == nil)
x.Assert(len(tokens) > 0) x.AssertTrue(len(tokens) > 0)
intersectOffset = intersect.AddTo(b) intersectOffset = intersect.AddTo(b)
} }
...@@ -1016,8 +1016,8 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList, ...@@ -1016,8 +1016,8 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList,
filter.FuncName = strings.ToLower(filter.FuncName) // Not sure if needed. filter.FuncName = strings.ToLower(filter.FuncName) // Not sure if needed.
isAnyOf := filter.FuncName == "anyof" isAnyOf := filter.FuncName == "anyof"
isAllOf := filter.FuncName == "allof" isAllOf := filter.FuncName == "allof"
x.Assertf(isAnyOf || isAllOf, "FuncName invalid: %s", filter.FuncName) x.AssertTruef(isAnyOf || isAllOf, "FuncName invalid: %s", filter.FuncName)
x.Assertf(len(filter.FuncArgs) == 2, x.AssertTruef(len(filter.FuncArgs) == 2,
"Expect exactly two arguments: pred and predValue") "Expect exactly two arguments: pred and predValue")
attr := filter.FuncArgs[0] attr := filter.FuncArgs[0]
...@@ -1030,7 +1030,7 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList, ...@@ -1030,7 +1030,7 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList,
return nil, x.Errorf("Could not create tokenizer: %v", filter.FuncArgs[1]) return nil, x.Errorf("Could not create tokenizer: %v", filter.FuncArgs[1])
} }
defer tokenizer.Destroy() defer tokenizer.Destroy()
x.Assert(tokenizer != nil) x.AssertTrue(tokenizer != nil)
tokens := tokenizer.Tokens() tokens := tokenizer.Tokens()
taskQuery := createTaskQuery(sg, nil, tokens, destUIDs) taskQuery := createTaskQuery(sg, nil, tokens, destUIDs)
go ProcessGraph(ctx, sg, taskQuery, sgChan) go ProcessGraph(ctx, sg, taskQuery, sgChan)
...@@ -1043,7 +1043,7 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList, ...@@ -1043,7 +1043,7 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList,
} }
} }
x.Assert(len(sg.Result) == len(tokens)) x.AssertTrue(len(sg.Result) == len(tokens))
if isAnyOf { if isAnyOf {
return algo.MergeLists(sg.Result), nil return algo.MergeLists(sg.Result), nil
} }
...@@ -1081,7 +1081,7 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList, ...@@ -1081,7 +1081,7 @@ func runFilter(ctx context.Context, destUIDs *algo.UIDList,
if filter.Op == "|" { if filter.Op == "|" {
return algo.MergeLists(lists), nil return algo.MergeLists(lists), nil
} }
x.Assert(filter.Op == "&") x.AssertTrue(filter.Op == "&")
return algo.IntersectLists(lists), nil return algo.IntersectLists(lists), nil
} }
...@@ -1115,7 +1115,7 @@ func (sg *SubGraph) applyPagination(ctx context.Context) error { ...@@ -1115,7 +1115,7 @@ func (sg *SubGraph) applyPagination(ctx context.Context) error {
if params.Count == 0 && params.Offset == 0 { // No pagination. if params.Count == 0 && params.Offset == 0 { // No pagination.
return nil return nil
} }
x.Assert(sg.srcUIDs.Size() == len(sg.Result)) x.AssertTrue(sg.srcUIDs.Size() == len(sg.Result))
for _, l := range sg.Result { for _, l := range sg.Result {
l.Intersect(sg.destUIDs) l.Intersect(sg.destUIDs)
start, end := pageRange(&sg.Params, l.Size()) start, end := pageRange(&sg.Params, l.Size())
...@@ -1166,7 +1166,7 @@ func (sg *SubGraph) applyOrderAndPagination(ctx context.Context) error { ...@@ -1166,7 +1166,7 @@ func (sg *SubGraph) applyOrderAndPagination(ctx context.Context) error {
// Copy result into our UID matrix. // Copy result into our UID matrix.
result := task.GetRootAsSortResult(resultData, 0) result := task.GetRootAsSortResult(resultData, 0)
x.Assert(result.UidmatrixLength() == len(sg.Result)) x.AssertTrue(result.UidmatrixLength() == len(sg.Result))
sg.Result = algo.FromSortResult(result) sg.Result = algo.FromSortResult(result)
// Update sg.destUID. Iterate over the UID matrix (which is not sorted by // Update sg.destUID. Iterate over the UID matrix (which is not sorted by
......
...@@ -6,11 +6,11 @@ table Query { ...@@ -6,11 +6,11 @@ table Query {
offset:int; offset:int;
afterUid:ulong; afterUid:ulong;
getCount:ushort; getCount:ushort;
// Exactly one of uids and terms is populated. // Exactly one of uids and terms is populated.
uids:[ulong]; uids:[ulong];
tokens:[string]; tokens:[string];
// Intersect results with this UID list. If uids is populated, then this is // Intersect results with this UID list. If uids is populated, then this is
// an "intersection query". If terms is populated, this is a "filter query". // an "intersection query". If terms is populated, this is a "filter query".
toIntersect:UidList; toIntersect:UidList;
...@@ -66,14 +66,6 @@ table RaftContext { ...@@ -66,14 +66,6 @@ table RaftContext {
addr:string; addr:string;
} }
table Membership {
id:ulong;
group:uint;
addr:string;
leader:bool;
amdead:bool;
}
table Sort { table Sort {
attr:string; attr:string;
uidmatrix:[UidList]; uidmatrix:[UidList];
...@@ -84,3 +76,23 @@ table Sort { ...@@ -84,3 +76,23 @@ table Sort {
table SortResult { table SortResult {
uidmatrix:[UidList]; uidmatrix:[UidList];
} }
// Membership stores information about RAFT group membership for a single RAFT node.
// Note that each server can be serving multiple RAFT groups. Each group would have
// one RAFT node per server serving that group.
table Membership {
id:ulong;
group:uint;
addr:string;
leader:bool;
amdead:bool;
lastUpdate:ulong;
}
// MembershipUpdate is used to pack together the current membership state of all the nodes
// in the caller server; and the membership updates recorded by the callee server since
// the provided lastUpdate.
table MembershipUpdate {
members:[Membership];
lastUpdate:ulong;
}
...@@ -40,6 +40,9 @@ func (rcv *GroupKeys) Keys(obj *KC, j int) bool { ...@@ -40,6 +40,9 @@ func (rcv *GroupKeys) Keys(obj *KC, j int) bool {
x := rcv._tab.Vector(o) x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4 x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x) x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(KC)
}
obj.Init(rcv._tab.Bytes, x) obj.Init(rcv._tab.Bytes, x)
return true return true
} }
......
...@@ -78,8 +78,20 @@ func (rcv *Membership) MutateAmdead(n byte) bool { ...@@ -78,8 +78,20 @@ func (rcv *Membership) MutateAmdead(n byte) bool {
return rcv._tab.MutateByteSlot(12, n) return rcv._tab.MutateByteSlot(12, n)
} }
func (rcv *Membership) LastUpdate() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Membership) MutateLastUpdate(n uint64) bool {
return rcv._tab.MutateUint64Slot(14, n)
}
func MembershipStart(builder *flatbuffers.Builder) { func MembershipStart(builder *flatbuffers.Builder) {
builder.StartObject(5) builder.StartObject(6)
} }
func MembershipAddId(builder *flatbuffers.Builder, id uint64) { func MembershipAddId(builder *flatbuffers.Builder, id uint64) {
builder.PrependUint64Slot(0, id, 0) builder.PrependUint64Slot(0, id, 0)
...@@ -96,6 +108,9 @@ func MembershipAddLeader(builder *flatbuffers.Builder, leader byte) { ...@@ -96,6 +108,9 @@ func MembershipAddLeader(builder *flatbuffers.Builder, leader byte) {
func MembershipAddAmdead(builder *flatbuffers.Builder, amdead byte) { func MembershipAddAmdead(builder *flatbuffers.Builder, amdead byte) {
builder.PrependByteSlot(4, amdead, 0) builder.PrependByteSlot(4, amdead, 0)
} }
func MembershipAddLastUpdate(builder *flatbuffers.Builder, lastUpdate uint64) {
builder.PrependUint64Slot(5, lastUpdate, 0)
}
func MembershipEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { func MembershipEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject() return builder.EndObject()
} }
// automatically generated by the FlatBuffers compiler, do not modify
package task
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type MembershipUpdate struct {
_tab flatbuffers.Table
}
func GetRootAsMembershipUpdate(buf []byte, offset flatbuffers.UOffsetT) *MembershipUpdate {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &MembershipUpdate{}
x.Init(buf, n+offset)
return x
}
func (rcv *MembershipUpdate) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *MembershipUpdate) Members(obj *Membership, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(Membership)
}
obj.Init(rcv._tab.Bytes, x)
return true
}
return false
}
func (rcv *MembershipUpdate) MembersLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MembershipUpdate) LastUpdate() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *MembershipUpdate) MutateLastUpdate(n uint64) bool {
return rcv._tab.MutateUint64Slot(6, n)
}
func MembershipUpdateStart(builder *flatbuffers.Builder) {
builder.StartObject(2)
}
func MembershipUpdateAddMembers(builder *flatbuffers.Builder, members flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(members), 0)
}
func MembershipUpdateStartMembersVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
}
func MembershipUpdateAddLastUpdate(builder *flatbuffers.Builder, lastUpdate uint64) {
builder.PrependUint64Slot(1, lastUpdate, 0)
}
func MembershipUpdateEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}
...@@ -28,6 +28,9 @@ func (rcv *Result) Uidmatrix(obj *UidList, j int) bool { ...@@ -28,6 +28,9 @@ func (rcv *Result) Uidmatrix(obj *UidList, j int) bool {
x := rcv._tab.Vector(o) x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4 x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x) x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(UidList)
}
obj.Init(rcv._tab.Bytes, x) obj.Init(rcv._tab.Bytes, x)
return true return true
} }
......
...@@ -36,6 +36,9 @@ func (rcv *Sort) Uidmatrix(obj *UidList, j int) bool { ...@@ -36,6 +36,9 @@ func (rcv *Sort) Uidmatrix(obj *UidList, j int) bool {
x := rcv._tab.Vector(o) x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4 x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x) x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(UidList)
}
obj.Init(rcv._tab.Bytes, x) obj.Init(rcv._tab.Bytes, x)
return true return true
} }
......
...@@ -28,6 +28,9 @@ func (rcv *SortResult) Uidmatrix(obj *UidList, j int) bool { ...@@ -28,6 +28,9 @@ func (rcv *SortResult) Uidmatrix(obj *UidList, j int) bool {
x := rcv._tab.Vector(o) x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4 x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x) x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(UidList)
}
obj.Init(rcv._tab.Bytes, x) obj.Init(rcv._tab.Bytes, x)
return true return true
} }
......
...@@ -28,6 +28,9 @@ func (rcv *ValueList) Values(obj *Value, j int) bool { ...@@ -28,6 +28,9 @@ func (rcv *ValueList) Values(obj *Value, j int) bool {
x := rcv._tab.Vector(o) x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4 x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x) x = rcv._tab.Indirect(x)
if obj == nil {
obj = new(Value)
}
obj.Init(rcv._tab.Bytes, x) obj.Init(rcv._tab.Bytes, x)
return true return true
} }
......
...@@ -69,7 +69,7 @@ func normalize(in []byte) ([]byte, error) { ...@@ -69,7 +69,7 @@ func normalize(in []byte) ([]byte, error) {
// NewTokenizer creates a new Tokenizer object from a given input string of bytes. // NewTokenizer creates a new Tokenizer object from a given input string of bytes.
func NewTokenizer(s []byte) (*Tokenizer, error) { func NewTokenizer(s []byte) (*Tokenizer, error) {
x.Assert(s != nil) x.AssertTrue(s != nil)
if disableICU { if disableICU {
// ICU is disabled. Return a dummy tokenizer. // ICU is disabled. Return a dummy tokenizer.
......
...@@ -29,7 +29,7 @@ func IndexKey(attr, term string) []byte { ...@@ -29,7 +29,7 @@ func IndexKey(attr, term string) []byte {
// TokensTable by iterating over keys in RocksDB. // TokensTable by iterating over keys in RocksDB.
func TokenFromKey(key []byte) string { func TokenFromKey(key []byte) string {
i := bytes.IndexRune(key, indexRune) i := bytes.IndexRune(key, indexRune)
x.Assert(i >= 0) x.AssertTrue(i >= 0)
return string(key[i+1:]) return string(key[i+1:])
} }
......
...@@ -125,7 +125,7 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) (rerr er ...@@ -125,7 +125,7 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) (rerr er
} }
} else { } else {
addr := groups().Leader(gid) _, addr := groups().Leader(gid)
p := pools().get(addr) p := pools().get(addr)
conn, err := p.Get() conn, err := p.Get()
if err != nil { if err != nil {
...@@ -143,7 +143,7 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) (rerr er ...@@ -143,7 +143,7 @@ func AssignUidsOverNetwork(ctx context.Context, umap map[string]uint64) (rerr er
} }
ul := task.GetRootAsUidList(reply.Data, 0) ul := task.GetRootAsUidList(reply.Data, 0)
x.Assertf(ul.UidsLength() == int(num.Val()), x.AssertTruef(ul.UidsLength() == int(num.Val()),
"Requested: %d != Retrieved Uids: %d", num.Val(), ul.UidsLength()) "Requested: %d != Retrieved Uids: %d", num.Val(), ul.UidsLength())
i := 0 i := 0
......
...@@ -27,7 +27,7 @@ var groupConfig config ...@@ -27,7 +27,7 @@ var groupConfig config
func parsePredicates(groupId uint32, p string) error { func parsePredicates(groupId uint32, p string) error {
preds := strings.Split(p, ",") preds := strings.Split(p, ",")
x.Assertf(len(preds) > 0, "Length of predicates in config should be > 0") x.AssertTruef(len(preds) > 0, "Length of predicates in config should be > 0")
for _, pred := range preds { for _, pred := range preds {
pred = strings.TrimSpace(pred) pred = strings.TrimSpace(pred)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment