Skip to content
Snippets Groups Projects
Unverified Commit bd7f3e11 authored by Manish R Jain's avatar Manish R Jain
Browse files

Various pending TODOs in raft and other places.

parent bbef326a
Branches
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@ package group
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
......@@ -72,8 +73,10 @@ func parseDefaultConfig(l string) (uint64, error) {
return groupConfig.k, nil
}
func parseConfig(f *os.File) error {
scanner := bufio.NewScanner(f)
// ParseConfig parses a group config provided by reader.
func ParseConfig(r io.Reader) error {
groupConfig = config{}
scanner := bufio.NewScanner(r)
// To keep track of last groupId seen across lines. If we the groups ids are
// not sequential, we log.Fatal.
var curGroupId uint64
......@@ -135,7 +138,7 @@ func ParseGroupConfig(file string) error {
return nil
}
x.Check(err)
if err = parseConfig(cf); err != nil {
if err = ParseConfig(cf); err != nil {
return err
}
if groupConfig.n == 0 {
......
// Code generated by protoc-gen-gogo.
// source: types/types.proto
// source: posting/types/types.proto
// DO NOT EDIT!
/*
Package types is a generated protocol buffer package.
It is generated from these files:
types/types.proto
posting/types/types.proto
It has these top-level messages:
Posting
......@@ -34,10 +34,10 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Posting struct {
Uid uint64 `protobuf:"fixed64,1,opt,name=uid,proto3" json:"uid,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
ValType uint32 `protobuf:"varint,3,opt,name=valType,proto3" json:"valType,omitempty"`
ValType uint32 `protobuf:"varint,3,opt,name=val_type,json=valType,proto3" json:"val_type,omitempty"`
Label string `protobuf:"bytes,4,opt,name=label,proto3" json:"label,omitempty"`
Commit uint64 `protobuf:"varint,5,opt,name=commit,proto3" json:"commit,omitempty"`
// op is only used temporarily.
// TODO: op is only used temporarily. See if we can remove it from here.
Op uint32 `protobuf:"varint,12,opt,name=op,proto3" json:"op,omitempty"`
}
......@@ -664,23 +664,24 @@ var (
ErrIntOverflowTypes = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("types/types.proto", fileDescriptorTypes) }
func init() { proto.RegisterFile("posting/types/types.proto", fileDescriptorTypes) }
var fileDescriptorTypes = []byte{
// 236 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0xa9, 0x2c, 0x48,
0x2d, 0xd6, 0x07, 0x93, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0xac, 0x60, 0x8e, 0x52, 0x27,
0x23, 0x17, 0x7b, 0x40, 0x7e, 0x71, 0x49, 0x66, 0x5e, 0xba, 0x90, 0x00, 0x17, 0x73, 0x69, 0x66,
0x8a, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x5b, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x96, 0x98,
0x53, 0x9a, 0x2a, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0xe1, 0x08, 0x49, 0x70, 0xb1, 0x97,
0x25, 0xe6, 0x84, 0x54, 0x16, 0xa4, 0x4a, 0x30, 0x2b, 0x30, 0x6a, 0xf0, 0x06, 0xc1, 0xb8, 0x20,
0xf5, 0x39, 0x89, 0x49, 0xa9, 0x39, 0x12, 0x2c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x10, 0x8e, 0x90,
0x18, 0x17, 0x5b, 0x72, 0x7e, 0x6e, 0x6e, 0x66, 0x89, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4b, 0x10,
0x94, 0x27, 0xc4, 0xc7, 0xc5, 0x94, 0x5f, 0x20, 0xc1, 0x03, 0x36, 0x82, 0x29, 0xbf, 0x40, 0x29,
0x97, 0x8b, 0x1b, 0xea, 0x14, 0x9f, 0xcc, 0xe2, 0x12, 0x21, 0x2d, 0x2e, 0x8e, 0x02, 0x08, 0xb7,
0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x88, 0x4f, 0x0f, 0xe2, 0x03, 0xa8, 0xaa, 0x20, 0xb8,
0xbc, 0x90, 0x14, 0x17, 0x47, 0x72, 0x46, 0x6a, 0x72, 0x76, 0x71, 0x69, 0x2e, 0xd4, 0xad, 0x70,
0x3e, 0x92, 0xf5, 0xcc, 0xc8, 0xd6, 0x3b, 0x09, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c,
0xe3, 0x83, 0x47, 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x83, 0xc6, 0x18, 0x10,
0x00, 0x00, 0xff, 0xff, 0x44, 0xae, 0x9f, 0x23, 0x2f, 0x01, 0x00, 0x00,
// 244 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x92, 0x2c, 0xc8, 0x2f, 0x2e,
0xc9, 0xcc, 0x4b, 0xd7, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0x86, 0x90, 0x7a, 0x05, 0x45, 0xf9, 0x25,
0xf9, 0x42, 0xac, 0x60, 0x8e, 0x52, 0x17, 0x23, 0x17, 0x7b, 0x00, 0x44, 0x91, 0x90, 0x00, 0x17,
0x73, 0x69, 0x66, 0x8a, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x5b, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5,
0x5a, 0x96, 0x98, 0x53, 0x9a, 0x2a, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0xe1, 0x08, 0x49,
0x72, 0x71, 0x94, 0x25, 0xe6, 0xc4, 0x83, 0x0c, 0x90, 0x60, 0x56, 0x60, 0xd4, 0xe0, 0x0d, 0x62,
0x2f, 0x4b, 0xcc, 0x09, 0xa9, 0x2c, 0x48, 0x05, 0x69, 0xc8, 0x49, 0x4c, 0x4a, 0xcd, 0x91, 0x60,
0x51, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x70, 0x84, 0xc4, 0xb8, 0xd8, 0x92, 0xf3, 0x73, 0x73, 0x33,
0x4b, 0x24, 0x58, 0x15, 0x18, 0x35, 0x58, 0x82, 0xa0, 0x3c, 0x21, 0x3e, 0x2e, 0xa6, 0xfc, 0x02,
0x09, 0x1e, 0xb0, 0x11, 0x4c, 0xf9, 0x05, 0x4a, 0xb9, 0x5c, 0xdc, 0x50, 0xb7, 0xf8, 0x64, 0x16,
0x97, 0x08, 0x69, 0x71, 0x71, 0x40, 0xdd, 0x5f, 0x2c, 0xc1, 0xa8, 0xc0, 0xac, 0xc1, 0x6d, 0xc4,
0xa7, 0x07, 0xf1, 0x02, 0x54, 0x55, 0x10, 0x5c, 0x5e, 0x48, 0x8a, 0x8b, 0x23, 0x39, 0x23, 0x35,
0x39, 0xbb, 0xb8, 0x34, 0x17, 0xea, 0x58, 0x38, 0x1f, 0xc9, 0x7a, 0x66, 0x64, 0xeb, 0x9d, 0x04,
0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5,
0x18, 0x92, 0xd8, 0xc0, 0x61, 0x63, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x6a, 0x5a, 0xfe, 0x36,
0x38, 0x01, 0x00, 0x00,
}
......@@ -6,7 +6,7 @@ package types;
message Posting {
fixed64 uid = 1;
bytes value = 2;
uint32 valType = 3;
uint32 val_type = 3;
string label = 4;
uint64 commit = 5; // More inclination towards smaller values.
......
This diff is collapsed.
......@@ -9,7 +9,7 @@ message List {
message Value {
bytes val = 1;
uint32 valType = 2;
uint32 val_type = 2;
}
message Query {
......@@ -21,7 +21,7 @@ message Query {
// Exactly one of uids and terms is populated.
repeated fixed64 uids = 6;
// Function to generate or filter UIDs.
repeated string src_func = 7;
}
......@@ -30,7 +30,7 @@ message Result {
repeated List uid_matrix = 1;
repeated Value values = 2;
repeated uint32 counts = 3;
bool intersectDest = 4;
bool intersect_dest = 4;
}
message Sort {
......@@ -61,7 +61,7 @@ message RaftContext {
// one RAFT node per server serving that group.
message Membership {
fixed64 id = 1;
uint32 group = 2;
uint32 group_id = 2;
string addr = 3;
bool leader = 4;
bool am_dead = 5;
......@@ -111,6 +111,6 @@ message KC {
}
message GroupKeys {
uint64 group_id = 1;
uint32 group_id = 1;
repeated KC keys = 2;
}
......@@ -96,7 +96,7 @@ func (n *node) Connect(pid uint64, addr string) {
n.peers.Set(pid, addr)
}
func (n *node) AddToCluster(pid uint64) error {
func (n *node) AddToCluster(ctx context.Context, pid uint64) error {
addr := n.peers.Get(pid)
x.AssertTruef(len(addr) > 0, "Unable to find conn pool for peer: %d", pid)
rc := &task.RaftContext{
......@@ -106,7 +106,7 @@ func (n *node) AddToCluster(pid uint64) error {
}
rcBytes, err := rc.Marshal()
x.Check(err)
return n.raft.ProposeConfChange(context.TODO(), raftpb.ConfChange{
return n.raft.ProposeConfChange(ctx, raftpb.ConfChange{
ID: pid,
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
......@@ -249,7 +249,7 @@ func (n *node) processMembership(e raftpb.Entry, mm *task.Membership) error {
x.AssertTrue(n.gid == 0)
x.Printf("group: %v Addr: %q leader: %v dead: %v\n",
mm.Group, mm.Addr, mm.Leader, mm.AmDead)
mm.GroupId, mm.Addr, mm.Leader, mm.AmDead)
groups().applyMembershipUpdate(e.Index, mm)
return nil
}
......@@ -329,7 +329,7 @@ func (n *node) processSnapshot(s raftpb.Snapshot) {
}
func (n *node) Run() {
fr := true
firstRun := true
ticker := time.NewTicker(time.Second)
for {
select {
......@@ -341,7 +341,7 @@ func (n *node) Run() {
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
rcBytes, err := n.raftContext.Marshal()
for _, msg := range rd.Messages {
// TODO: Do some optimizations here to drop messages.
// NOTE: We can do some optimizations here to drop messages.
x.Check(err)
msg.Context = rcBytes
n.send(msg)
......@@ -355,9 +355,9 @@ func (n *node) Run() {
}
n.raft.Advance()
if fr && n.canCampaign {
go n.raft.Campaign(context.TODO())
fr = false
if firstRun && n.canCampaign {
go n.raft.Campaign(n.ctx)
firstRun = false
}
case <-n.done:
......@@ -413,6 +413,7 @@ func parsePeer(peer string) (uint64, string) {
}
func (n *node) joinPeers() {
// Get leader information for MY group.
pid, paddr := groups().Leader(n.gid)
n.Connect(pid, paddr)
fmt.Printf("Connected with: %v\n", paddr)
......@@ -421,16 +422,17 @@ func (n *node) joinPeers() {
pool := pools().get(addr)
x.AssertTruef(pool != nil, "Unable to find addr for peer: %d", pid)
// TODO: Ask for the leader, before running populateShard.
// Bring the instance up to speed first.
_, err := populateShard(context.TODO(), pool, 0)
_, err := populateShard(n.ctx, pool, 0)
x.Checkf(err, "Error while populating shard")
conn, err := pool.Get()
x.Check(err)
defer pool.Put(conn)
c := NewWorkerClient(conn)
x.Printf("Calling JoinCluster")
_, err = c.JoinCluster(context.Background(), n.raftContext)
_, err = c.JoinCluster(n.ctx, n.raftContext)
x.Checkf(err, "Error while joining cluster")
x.Printf("Done with JoinCluster call\n")
}
......@@ -453,7 +455,7 @@ func newNode(gid uint32, id uint64, myAddr string) *node {
}
n := &node{
ctx: context.TODO(),
ctx: context.Background(),
id: id,
gid: gid,
store: store,
......@@ -518,6 +520,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
return
}
// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode(wal *raftwal.Wal) {
restart, err := n.initFromWal(wal)
x.Check(err)
......@@ -609,7 +612,7 @@ func (w *grpcWorker) JoinCluster(ctx context.Context, rc *task.RaftContext) (*Pa
node.Connect(rc.Id, rc.Addr)
c := make(chan error, 1)
go func() { c <- node.AddToCluster(rc.Id) }()
go func() { c <- node.AddToCluster(ctx, rc.Id) }()
select {
case <-ctx.Done():
......
......@@ -61,7 +61,8 @@ func groups() *groupi {
// StartRaftNodes will read the WAL dir, create the RAFT groups,
// and either start or restart RAFT nodes.
// This function triggers RAFT nodes to be created.
// This function triggers RAFT nodes to be created, and is the entrace to the RAFT
// world from main.go.
func StartRaftNodes(walDir string) {
gr = new(groupi)
gr.ctx, gr.cancel = context.WithCancel(context.Background())
......@@ -81,9 +82,9 @@ func StartRaftNodes(walDir string) {
// itself in the memberships; and hence this node would think that no one is handling
// group zero. Therefore, we MUST wait to get pass a last update raft index of zero.
for gr.LastUpdate() == 0 {
time.Sleep(time.Second)
fmt.Println("Last update raft index for membership information is zero. Syncing...")
gr.syncMemberships()
time.Sleep(time.Second)
}
fmt.Printf("Last update is now: %d\n", gr.LastUpdate())
}
......@@ -259,10 +260,10 @@ func (g *groupi) syncMemberships() {
go func(rc *task.RaftContext, amleader bool) {
mm := &task.Membership{
Leader: amleader,
Id: rc.Id,
Group: rc.Group,
Addr: rc.Addr,
Leader: amleader,
Id: rc.Id,
GroupId: rc.Group,
Addr: rc.Addr,
}
zero := groups().Node(0)
x.Check(zero.ProposeAndWait(zero.ctx, &task.Proposal{Membership: mm}))
......@@ -281,10 +282,10 @@ func (g *groupi) syncMemberships() {
rc := n.raftContext
mu.Members = append(mu.Members,
&task.Membership{
Leader: n.AmLeader(),
Id: rc.Id,
Group: rc.Group,
Addr: rc.Addr,
Leader: n.AmLeader(),
Id: rc.Id,
GroupId: rc.Group,
Addr: rc.Addr,
})
}
mu.LastUpdate = g.lastUpdate
......@@ -373,10 +374,10 @@ func (g *groupi) applyMembershipUpdate(raftIdx uint64, mm *task.Membership) {
g.all = make(map[uint32]*servers)
}
sl := g.all[mm.Group]
sl := g.all[mm.GroupId]
if sl == nil {
sl = new(servers)
g.all[mm.Group] = sl
g.all[mm.GroupId] = sl
}
for {
......@@ -431,10 +432,10 @@ func (g *groupi) MembershipUpdateAfter(ridx uint64) *task.MembershipUpdate {
}
out.Members = append(out.Members,
&task.Membership{
Leader: s.Leader,
Id: s.NodeId,
Group: gid,
Addr: s.Addr,
Leader: s.Leader,
Id: s.NodeId,
GroupId: gid,
Addr: s.Addr,
})
}
}
......@@ -462,16 +463,16 @@ func (w *grpcWorker) UpdateMembership(ctx context.Context,
che := make(chan error, len(update.Members))
for _, mm := range update.Members {
if groups().isDuplicate(mm.Group, mm.Id, mm.Addr, mm.Leader) {
if groups().isDuplicate(mm.GroupId, mm.Id, mm.Addr, mm.Leader) {
che <- nil
continue
}
mmNew := &task.Membership{
Leader: mm.Leader,
Id: mm.Id,
Group: mm.Group,
Addr: mm.Addr,
Leader: mm.Leader,
Id: mm.Id,
GroupId: mm.GroupId,
Addr: mm.Addr,
}
go func(mmNew *task.Membership) {
......
......@@ -22,6 +22,7 @@ import (
"io"
"sort"
"github.com/dgraph-io/dgraph/group"
"github.com/dgraph-io/dgraph/posting/types"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
......@@ -66,23 +67,35 @@ func writeBatch(ctx context.Context, kv chan *task.KV, che chan error) {
che <- nil
}
func generateGroup(group uint64) (*task.GroupKeys, error) {
func generateGroup(groupId uint32) (*task.GroupKeys, error) {
it := pstore.NewIterator()
defer it.Close()
g := &task.GroupKeys{
GroupId: group,
GroupId: groupId,
}
for it.SeekToFirst(); it.Valid(); it.Next() {
// TODO: Check if this key belongs to the group.
k, v := it.Key(), it.Value()
if idx := bytes.IndexAny(k.Data(), ":|"); idx == -1 {
continue
} else {
pred := string(k.Data()[:idx])
if group.BelongsTo(pred) != g.GroupId {
pred += "~"
it.Seek([]byte(pred)) // Skip over this predicate entirely.
it.Prev() // To tackle it.Next() called by default.
continue
}
}
var pl types.PostingList
x.Check(pl.Unmarshal(v.Data()))
kdup := make([]byte, len(k.Data()))
copy(kdup, k.Data())
key := &task.KC{
Key: k.Data(),
Key: kdup,
Checksum: pl.Checksum,
}
g.Keys = append(g.Keys, key)
......@@ -92,7 +105,7 @@ func generateGroup(group uint64) (*task.GroupKeys, error) {
// PopulateShard gets data for predicate pred from server with id serverId and
// writes it to RocksDB.
func populateShard(ctx context.Context, pl *pool, group uint64) (int, error) {
func populateShard(ctx context.Context, pl *pool, group uint32) (int, error) {
gkeys, err := generateGroup(group)
if err != nil {
return 0, x.Wrapf(err, "While generating keys group")
......@@ -154,8 +167,14 @@ func populateShard(ctx context.Context, pl *pool, group uint64) (int, error) {
// PredicateData can be used to return data corresponding to a predicate over
// a stream.
func (w *grpcWorker) PredicateData(group *task.GroupKeys, stream Worker_PredicateDataServer) error {
// TODO: Use group id.
func (w *grpcWorker) PredicateData(gkeys *task.GroupKeys, stream Worker_PredicateDataServer) error {
if !groups().ServesGroup(gkeys.GroupId) {
return x.Errorf("Group %d not served.", gkeys.GroupId)
}
n := groups().Node(gkeys.GroupId)
if !n.AmLeader() {
return x.Errorf("Not leader of group: %d", gkeys.GroupId)
}
// TODO(pawan) - Shift to CheckPoints once we figure out how to add them to the
// RocksDB library we are using.
......@@ -165,18 +184,29 @@ func (w *grpcWorker) PredicateData(group *task.GroupKeys, stream Worker_Predicat
for it.SeekToFirst(); it.Valid(); it.Next() {
k, v := it.Key(), it.Value()
if idx := bytes.IndexAny(k.Data(), ":|"); idx == -1 {
continue
} else {
pred := string(k.Data()[:idx])
if group.BelongsTo(pred) != gkeys.GroupId {
pred += "~"
it.Seek([]byte(pred)) // Skip over this predicate entirely.
it.Prev() // To tackle it.Next() called by default.
continue
}
}
var pl types.PostingList
x.Check(pl.Unmarshal(v.Data()))
// TODO: Check that key is part of the specified group id.
idx := sort.Search(len(group.Keys), func(i int) bool {
t := group.Keys[i]
idx := sort.Search(len(gkeys.Keys), func(i int) bool {
t := gkeys.Keys[i]
return bytes.Compare(k.Data(), t.Key) <= 0
})
if idx < len(group.Keys) {
if idx < len(gkeys.Keys) {
// Found a match.
t := group.Keys[idx]
t := gkeys.Keys[idx]
// Different keys would have the same prefix. So, check Checksum first,
// it would be cheaper when there's no match.
if bytes.Equal(pl.Checksum, t.Checksum) && bytes.Equal(k.Data(), t.Key) {
......@@ -195,7 +225,8 @@ func (w *grpcWorker) PredicateData(group *task.GroupKeys, stream Worker_Predicat
}
k.Free()
v.Free()
}
} // end of iterator
if err := it.Err(); err != nil {
return err
}
......
......@@ -16,17 +16,21 @@
package worker
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"testing"
"google.golang.org/grpc"
"github.com/dgraph-io/dgraph/group"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/stretchr/testify/require"
)
func checkShard(ps *store.Store) (int, []byte) {
......@@ -40,11 +44,10 @@ func checkShard(ps *store.Store) (int, []byte) {
return count, it.Key().Data()
}
func writePLs(t *testing.T, count int, vid uint64, ps *store.Store) {
func writePLs(t *testing.T, pred string, count int, vid uint64, ps *store.Store) {
for i := 0; i < count; i++ {
k := fmt.Sprintf("%03d", i)
t.Logf("key: %v", k)
list, _ := posting.GetOrCreate([]byte(k))
k := posting.Key(uint64(i), pred)
list, _ := posting.GetOrCreate(k)
de := &task.DirectedEdge{
ValueId: vid,
......@@ -79,7 +82,7 @@ func serve(s *grpc.Server, ln net.Listener) {
}
/*
TODO: Make this work again!
TODO: Make PopulateShard work again!
func TestPopulateShard(t *testing.T) {
var err error
dir, err := ioutil.TempDir("", "store0")
......@@ -226,6 +229,7 @@ func TestJoinCluster(t *testing.T) {
t.Fatalf("Expected key to be: %v. Got %v", "099", string(k))
}
}
*/
func TestGenerateGroup(t *testing.T) {
dir, err := ioutil.TempDir("", "store3")
......@@ -234,40 +238,50 @@ func TestGenerateGroup(t *testing.T) {
}
defer os.RemoveAll(dir)
r := bytes.NewReader([]byte("default: fp % 3"))
require.NoError(t, group.ParseConfig(r), "Unable to parse config.")
ps, err := store.NewStore(dir)
if err != nil {
t.Fatal(err)
}
defer ps.Close()
posting.Init(ps)
Init(ps)
writePLs(t, 100, 2, ps)
data, err := generateGroup(0)
require.Equal(t, uint32(0), group.BelongsTo("pred0"))
writePLs(t, "pred0", 33, 1, ps)
require.Equal(t, uint32(1), group.BelongsTo("p1"))
writePLs(t, "p1", 34, 1, ps)
require.Equal(t, uint32(2), group.BelongsTo("pr2"))
writePLs(t, "pr2", 35, 1, ps)
g, err := generateGroup(0)
if err != nil {
t.Error(err)
}
t.Logf("Size of data: %v", len(data))
require.Equal(t, 33, len(g.Keys))
for i, k := range g.Keys {
require.Equal(t, posting.Key(uint64(i), "pred0"), k.Key)
}
var g task.GroupKeys
uo := flatbuffers.GetUOffsetT(data)
t.Logf("Found offset: %v", uo)
g.Init(data, uo)
g, err = generateGroup(1)
if err != nil {
t.Error(err)
}
require.Equal(t, 34, len(g.Keys))
for i, k := range g.Keys {
require.Equal(t, posting.Key(uint64(i), "p1"), k.Key)
}
if g.KeysLength() != 100 {
t.Errorf("There should be 100 keys. Found: %v", g.KeysLength())
t.Fail()
g, err = generateGroup(2)
if err != nil {
t.Error(err)
}
for i := 0; i < 100; i++ {
var k task.KC
if ok := g.Keys(&k, i); !ok {
t.Errorf("Unable to parse key at index: %v", i)
}
expected := fmt.Sprintf("%03d", i)
found := string(k.KeyBytes())
if expected != found {
t.Errorf("Key expected:[%q], found:[%q]", expected, found)
}
t.Logf("Checksum: %q", k.ChecksumBytes())
require.Equal(t, 35, len(g.Keys))
for i, k := range g.Keys {
require.Equal(t, posting.Key(uint64(i), "pr2"), k.Key)
}
}
*/
......@@ -73,7 +73,6 @@ func ProcessTaskOverNetwork(ctx context.Context, q *task.Query) (*task.Result, e
}
// processTask processes the query, accumulates and returns the result.
// TODO: Change input and output to protos from []byte.
func processTask(q *task.Query) (*task.Result, error) {
attr := q.Attr
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment