Skip to content
Snippets Groups Projects
Unverified Commit a5da8b9b authored by Manish R Jain's avatar Manish R Jain
Browse files

Make Dgraph work with Dgraphzero.

Another HUGE change, removes group config, uses group assignment via
dgraphzero. Every dgraph server connects to zero, gets the group id.
Every mutation checks against the membership state, and if not found,
asks zero if it can serve the tablet.

Postings lists don't use group id anymore, nor does schema. Every dgraph
server now only serves one group.

Squashed commit of the following:

commit 699ee4acf761127b3ff2adaaaae8e673433a12c6
Author: Manish R Jain <manish@dgraph.io>
Date:   Thu Sep 14 13:10:08 2017 +1000

    Comments while reviewing

commit 61e83f8325d7a5dcd153ffd117662f4f64c8008f
Author: Manish R Jain <manish@dgraph.io>
Date:   Thu Sep 14 12:21:49 2017 +1000

    Make it compile

commit b10a38022f524dc3b5b7c076ae39202fd0a584da
Author: Manish R Jain <manish@dgraph.io>
Date:   Tue Sep 12 15:12:52 2017 +1000

    Make Connect from Dgraph binary work.

    Keep track of group zero members via conf changes.

    Compiling dgraph code, with many group related changes.

    Remove groups from schema.go

    Remove group config.

    Mutation works. Query partially works

    Make queries work again.
parent 743f0be5
Branches
No related tags found
No related merge requests found
Showing
with 86 additions and 423 deletions
...@@ -49,7 +49,6 @@ import ( ...@@ -49,7 +49,6 @@ import (
"github.com/cockroachdb/cmux" "github.com/cockroachdb/cmux"
"github.com/dgraph-io/dgraph/dgraph" "github.com/dgraph-io/dgraph/dgraph"
"github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/group"
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos" "github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/query"
...@@ -60,7 +59,6 @@ import ( ...@@ -60,7 +59,6 @@ import (
) )
var ( var (
gconf string
baseHttpPort int baseHttpPort int
baseGrpcPort int baseGrpcPort int
bindall bool bindall bool
...@@ -134,7 +132,6 @@ func setupConfigOpts() { ...@@ -134,7 +132,6 @@ func setupConfigOpts() {
flag.IntVar(&x.Config.PortOffset, "port_offset", 0, flag.IntVar(&x.Config.PortOffset, "port_offset", 0,
"Value added to all listening port numbers.") "Value added to all listening port numbers.")
flag.StringVar(&gconf, "group_conf", "", "group configuration file")
flag.IntVar(&baseHttpPort, "port", 8080, "Port to run HTTP service on.") flag.IntVar(&baseHttpPort, "port", 8080, "Port to run HTTP service on.")
flag.IntVar(&baseGrpcPort, "grpc_port", 9080, "Port to run gRPC service on.") flag.IntVar(&baseGrpcPort, "grpc_port", 9080, "Port to run gRPC service on.")
flag.BoolVar(&bindall, "bindall", false, flag.BoolVar(&bindall, "bindall", false,
...@@ -764,8 +761,6 @@ func main() { ...@@ -764,8 +761,6 @@ func main() {
} }
} }
x.Checkf(group.ParseGroupConfig(gconf), "While parsing group config.")
// Posting will initialize index which requires schema. Hence, initialize // Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init(). // schema before calling posting.Init().
schema.Init(dgraph.State.Pstore) schema.Init(dgraph.State.Pstore)
......
...@@ -83,6 +83,7 @@ func (s *Server) assignUids(ctx context.Context, num *protos.Num) (*protos.Assig ...@@ -83,6 +83,7 @@ func (s *Server) assignUids(ctx context.Context, num *protos.Num) (*protos.Assig
if available < num.Val { if available < num.Val {
var proposal protos.ZeroProposal var proposal protos.ZeroProposal
proposal.MaxLeaseId = maxLease + howMany proposal.MaxLeaseId = maxLease + howMany
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil { if err := s.Node.proposeAndWait(ctx, &proposal); err != nil {
return nil, err return nil, err
} }
......
...@@ -34,7 +34,7 @@ import ( ...@@ -34,7 +34,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/dgraph-io/badger" "github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos" "github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/raftwal"
...@@ -66,8 +66,6 @@ type state struct { ...@@ -66,8 +66,6 @@ type state struct {
} }
func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup) { func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup) {
defer wg.Done()
s := grpc.NewServer( s := grpc.NewServer(
grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxRecvMsgSize(x.GrpcMaxSize),
grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize),
...@@ -80,35 +78,41 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup) { ...@@ -80,35 +78,41 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup) {
m := conn.NewNode(&rc) m := conn.NewNode(&rc)
st.rs = &conn.RaftServer{Node: m} st.rs = &conn.RaftServer{Node: m}
st.node = &node{Node: m, server: st.zero, ctx: context.Background()} st.node = &node{Node: m, ctx: context.Background()}
st.zero = &Server{NumReplicas: *numReplicas, Node: st.node} st.zero = &Server{NumReplicas: *numReplicas, Node: st.node}
st.zero.Init()
st.node.server = st.zero
protos.RegisterZeroServer(s, st.zero) protos.RegisterZeroServer(s, st.zero)
protos.RegisterRaftServer(s, st.rs) protos.RegisterRaftServer(s, st.rs)
err := s.Serve(l) go func() {
log.Printf("gRpc server stopped : %s", err.Error()) defer wg.Done()
s.GracefulStop() err := s.Serve(l)
log.Printf("gRpc server stopped : %s", err.Error())
s.GracefulStop()
}()
} }
func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) {
defer wg.Done()
srv := &http.Server{ srv := &http.Server{
ReadTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second,
WriteTimeout: 600 * time.Second, WriteTimeout: 600 * time.Second,
IdleTimeout: 2 * time.Minute, IdleTimeout: 2 * time.Minute,
} }
err := srv.Serve(l) go func() {
log.Printf("Stopped taking more http(s) requests. Err: %s", err.Error()) defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 630*time.Second) err := srv.Serve(l)
defer cancel() log.Printf("Stopped taking more http(s) requests. Err: %s", err.Error())
err = srv.Shutdown(ctx) ctx, cancel := context.WithTimeout(context.Background(), 630*time.Second)
log.Printf("All http(s) requests finished.") defer cancel()
if err != nil { err = srv.Shutdown(ctx)
log.Printf("Http(s) shutdown err: %v", err.Error()) log.Printf("All http(s) requests finished.")
} if err != nil {
log.Printf("Http(s) shutdown err: %v", err.Error())
}
}()
} }
func main() { func main() {
...@@ -135,8 +139,8 @@ func main() { ...@@ -135,8 +139,8 @@ func main() {
wg.Add(3) wg.Add(3)
// Initilize the servers. // Initilize the servers.
var st state var st state
go st.serveGRPC(grpcListener, &wg) st.serveGRPC(grpcListener, &wg)
go st.serveHTTP(httpListener, &wg) st.serveHTTP(httpListener, &wg)
// Open raft write-ahead log and initialize raft node. // Open raft write-ahead log and initialize raft node.
x.Checkf(os.MkdirAll(*w, 0700), "Error while creating WAL dir.") x.Checkf(os.MkdirAll(*w, 0700), "Error while creating WAL dir.")
...@@ -144,7 +148,7 @@ func main() { ...@@ -144,7 +148,7 @@ func main() {
kvOpt.SyncWrites = true kvOpt.SyncWrites = true
kvOpt.Dir = *w kvOpt.Dir = *w
kvOpt.ValueDir = *w kvOpt.ValueDir = *w
kvOpt.MapTablesTo = table.MemoryMap kvOpt.TableLoadingMode = options.MemoryMap
kv, err := badger.NewKV(&kvOpt) kv, err := badger.NewKV(&kvOpt)
x.Checkf(err, "Error while opening WAL store") x.Checkf(err, "Error while opening WAL store")
wal := raftwal.Init(kv, *nodeId) wal := raftwal.Init(kv, *nodeId)
......
...@@ -19,6 +19,7 @@ package main ...@@ -19,6 +19,7 @@ package main
import ( import (
"errors" "errors"
"fmt"
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic" "sync/atomic"
...@@ -49,6 +50,9 @@ func (p *proposals) Store(pid uint32, pctx *proposalCtx) bool { ...@@ -49,6 +50,9 @@ func (p *proposals) Store(pid uint32, pctx *proposalCtx) bool {
} }
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
if p.ids == nil {
p.ids = make(map[uint32]*proposalCtx)
}
if _, has := p.ids[pid]; has { if _, has := p.ids[pid]; has {
return false return false
} }
...@@ -100,6 +104,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *protos.ZeroProposal ...@@ -100,6 +104,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *protos.ZeroProposal
break break
} }
} }
fmt.Printf(" ===> Proposal: %+v\n", proposal)
data, err := proposal.Marshal() data, err := proposal.Marshal()
if err != nil { if err != nil {
return err return err
...@@ -123,6 +128,13 @@ var ( ...@@ -123,6 +128,13 @@ var (
errInvalidProposal = errors.New("Invalid group proposal") errInvalidProposal = errors.New("Invalid group proposal")
) )
func newGroup() *protos.Group {
return &protos.Group{
Members: make(map[uint64]*protos.Member),
Tablets: make(map[string]*protos.Tablet),
}
}
func (n *node) applyProposal(e raftpb.Entry) (uint32, error) { func (n *node) applyProposal(e raftpb.Entry) (uint32, error) {
var p protos.ZeroProposal var p protos.ZeroProposal
if err := p.Unmarshal(e.Data); err != nil { if err := p.Unmarshal(e.Data); err != nil {
...@@ -131,17 +143,21 @@ func (n *node) applyProposal(e raftpb.Entry) (uint32, error) { ...@@ -131,17 +143,21 @@ func (n *node) applyProposal(e raftpb.Entry) (uint32, error) {
if p.Id == 0 { if p.Id == 0 {
return 0, errInvalidProposal return 0, errInvalidProposal
} }
x.Printf("Received proposal: %+v\n", p) x.Printf("Applying proposal: %+v\n", p)
n.server.Lock() n.server.Lock()
defer n.server.Unlock() defer n.server.Unlock()
state := n.server.state
if p.Member != nil { if p.Member != nil {
if p.Member.GroupId == 0 { if p.Member.GroupId == 0 {
return 0, errInvalidProposal return 0, errInvalidProposal
} }
state := n.server.state
group := state.Groups[p.Member.GroupId] group := state.Groups[p.Member.GroupId]
if group == nil {
group = newGroup()
state.Groups[p.Member.GroupId] = group
}
_, has := group.Members[p.Member.Id] _, has := group.Members[p.Member.Id]
if !has && len(group.Members) >= n.server.NumReplicas { if !has && len(group.Members) >= n.server.NumReplicas {
// We shouldn't allow more members than the number of replicas. // We shouldn't allow more members than the number of replicas.
...@@ -153,12 +169,15 @@ func (n *node) applyProposal(e raftpb.Entry) (uint32, error) { ...@@ -153,12 +169,15 @@ func (n *node) applyProposal(e raftpb.Entry) (uint32, error) {
if p.Tablet.GroupId == 0 { if p.Tablet.GroupId == 0 {
return 0, errInvalidProposal return 0, errInvalidProposal
} }
state := n.server.state
group := state.Groups[p.Tablet.GroupId] group := state.Groups[p.Tablet.GroupId]
if group == nil {
group = newGroup()
state.Groups[p.Tablet.GroupId] = group
}
group.Tablets[p.Tablet.Predicate] = p.Tablet group.Tablets[p.Tablet.Predicate] = p.Tablet
} }
if p.MaxLeaseId > 0 { if p.MaxLeaseId > 0 {
n.server.state.MaxLeaseId = p.MaxLeaseId state.MaxLeaseId = p.MaxLeaseId
} }
return p.Id, nil return p.Id, nil
} }
...@@ -205,7 +224,9 @@ func (n *node) initAndStartNode(wal *raftwal.Wal) error { ...@@ -205,7 +224,9 @@ func (n *node) initAndStartNode(wal *raftwal.Wal) error {
n.SetRaft(raft.StartNode(n.Cfg, nil)) n.SetRaft(raft.StartNode(n.Cfg, nil))
} else { } else {
peers := []raft.Peer{{ID: n.Id}} data, err := n.RaftContext.Marshal()
x.Check(err)
peers := []raft.Peer{{ID: n.Id, Context: data}}
n.SetRaft(raft.StartNode(n.Cfg, peers)) n.SetRaft(raft.StartNode(n.Cfg, peers))
} }
...@@ -249,6 +270,9 @@ func (n *node) Run() { ...@@ -249,6 +270,9 @@ func (n *node) Run() {
} else if entry.Type == raftpb.EntryNormal { } else if entry.Type == raftpb.EntryNormal {
pid, err := n.applyProposal(entry) pid, err := n.applyProposal(entry)
if err != nil {
x.Printf("While applying proposal: %v\n", err)
}
n.props.Done(pid, err) n.props.Done(pid, err)
} else { } else {
......
...@@ -19,6 +19,7 @@ package main ...@@ -19,6 +19,7 @@ package main
import ( import (
"errors" "errors"
"fmt"
"math" "math"
"sync" "sync"
...@@ -56,11 +57,22 @@ type Server struct { ...@@ -56,11 +57,22 @@ type Server struct {
nextGroup uint32 nextGroup uint32
} }
func (s *Server) Init() {
s.Lock()
defer s.Unlock()
s.state = &protos.MembershipState{
Groups: make(map[uint32]*protos.Group),
Zeros: make(map[uint64]*protos.Member),
}
s.nextLeaseId = 1
s.nextGroup = 1
}
// Do not modify the membership state out of this. // Do not modify the membership state out of this.
func (s *Server) membershipState() *protos.MembershipState { func (s *Server) membershipState() *protos.MembershipState {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
return s.state return s.state
} }
...@@ -142,6 +154,7 @@ func (s *Server) createProposals(dst *protos.Group) ([]*protos.ZeroProposal, err ...@@ -142,6 +154,7 @@ func (s *Server) createProposals(dst *protos.Group) ([]*protos.ZeroProposal, err
// Connect is used to connect the very first time with group zero. // Connect is used to connect the very first time with group zero.
func (s *Server) Connect(ctx context.Context, func (s *Server) Connect(ctx context.Context,
m *protos.Member) (resp *protos.MembershipState, err error) { m *protos.Member) (resp *protos.MembershipState, err error) {
x.Printf("Got connection request: %+v\n", m)
if ctx.Err() != nil { if ctx.Err() != nil {
return &emptyMembershipState, ctx.Err() return &emptyMembershipState, ctx.Err()
} }
...@@ -149,6 +162,7 @@ func (s *Server) Connect(ctx context.Context, ...@@ -149,6 +162,7 @@ func (s *Server) Connect(ctx context.Context,
return &emptyMembershipState, errInvalidId return &emptyMembershipState, errInvalidId
} }
if len(m.Addr) == 0 { if len(m.Addr) == 0 {
fmt.Println("No address provided.")
return &emptyMembershipState, errInvalidAddress return &emptyMembershipState, errInvalidAddress
} }
// Create a connection and check validity of the address by doing an Echo. // Create a connection and check validity of the address by doing an Echo.
......
...@@ -373,20 +373,19 @@ func (w *RaftServer) JoinCluster(ctx context.Context, ...@@ -373,20 +373,19 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
} }
// Commenting out the following checks for now, until we get rid of groups. // Commenting out the following checks for now, until we get rid of groups.
// TODO: Uncomment this after groups is removed. // TODO: Uncomment this after groups is removed.
// if rc.Group != w.GetNode().Group || rc.Id == w.GetNode().Id {
// return &protos.Payload{}, x.Errorf(errorNodeIDExists)
// }
// TODO: Figure out what other conditions we need to check to reject a Join.
// // Best effor reject
// if _, found := groups().Server(rc.Id, rc.Group); found || rc.Id == Config.RaftId {
// return &protos.Payload{}, x.Errorf(errorNodeIDExists)
// }
node := w.GetNode() node := w.GetNode()
if node == nil { if node == nil || node.Raft() == nil {
return &protos.Payload{}, errNoNode return nil, errNoNode
}
// Check that the new node is from the same group as me.
if rc.Group != node.RaftContext.Group {
return nil, x.Errorf("Raft group mismatch")
} }
// Also check that the new node is not me.
if rc.Id == node.RaftContext.Id {
return nil, x.Errorf("Same Raft ID")
}
// Check that the new node is not already part of the group.
if _, ok := node.GetPeer(rc.Id); ok { if _, ok := node.GetPeer(rc.Id); ok {
return &protos.Payload{}, x.Errorf("Node id already part of group.") return &protos.Payload{}, x.Errorf("Node id already part of group.")
} }
...@@ -410,16 +409,14 @@ var ( ...@@ -410,16 +409,14 @@ var (
func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error { func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error {
var rc protos.RaftContext var rc protos.RaftContext
x.Check(rc.Unmarshal(msg.Context)) x.Check(rc.Unmarshal(msg.Context))
// node := groups().Node(rc.Group)
// if node == nil {
// // Maybe we went down, went back up, reconnected, and got an RPC
// // message before we set up Raft?
// return errNoNode
// }
node := w.GetNode() node := w.GetNode()
if node == nil || node.Raft() == nil { if node == nil || node.Raft() == nil {
return errNoNode return errNoNode
} }
if rc.Group != node.RaftContext.Group {
return errNoNode
}
node.Connect(msg.From, rc.Addr) node.Connect(msg.From, rc.Addr)
c := make(chan error, 1) c := make(chan error, 1)
......
...@@ -46,6 +46,7 @@ type Pool struct { ...@@ -46,6 +46,7 @@ type Pool struct {
Addr string Addr string
// Requires a lock on poolsi. // Requires a lock on poolsi.
// TODO: Remove the refcounting. Let grpc take care of closing connections.
refcount int64 refcount int64
} }
......
...@@ -18,7 +18,6 @@ package dgraph ...@@ -18,7 +18,6 @@ package dgraph
import ( import (
"github.com/dgraph-io/dgraph/client" "github.com/dgraph-io/dgraph/client"
"github.com/dgraph-io/dgraph/group"
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos" "github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/schema"
...@@ -43,7 +42,6 @@ func NewEmbeddedDgraphClient(config Options, opts client.BatchMutationOptions, ...@@ -43,7 +42,6 @@ func NewEmbeddedDgraphClient(config Options, opts client.BatchMutationOptions,
x.Init() x.Init()
State = NewServerState() State = NewServerState()
group.ParseGroupConfig("") // this ensures that only one group is used
schema.Init(State.Pstore) schema.Init(State.Pstore)
posting.Init(State.Pstore) posting.Init(State.Pstore)
worker.Init(State.Pstore) worker.Init(State.Pstore)
......
/*
* Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package group
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
"github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
)
type predMeta struct {
val string
exactMatch bool
gid uint32
}
type config struct {
n uint32
k uint32
pred []predMeta
}
var groupConfig config
func parsePredicates(groupId uint32, p string) error {
preds := strings.Split(p, ",")
x.AssertTruef(len(preds) > 0, "Length of predicates in config should be > 0")
for _, pred := range preds {
pred = strings.TrimSpace(pred)
meta := predMeta{
val: pred,
gid: groupId,
}
if strings.HasPrefix(pred, "~") {
return fmt.Errorf("Cannot assign group to reverses. They are stored"+
" with the original predicate: %+v", pred)
}
if strings.HasSuffix(pred, "*") {
meta.val = strings.TrimSuffix(meta.val, "*")
} else {
meta.exactMatch = true
}
groupConfig.pred = append(groupConfig.pred, meta)
}
return nil
}
func parseDefaultConfig(l string) (uint32, error) {
// If we have already seen a default config line, and n has a value then we
// log.Fatal.
if groupConfig.n != 0 {
return 0, fmt.Errorf("Default config can only be defined once: %v", l)
}
l = strings.TrimSpace(l)
conf := strings.Split(l, " ")
// + in (fp % n + k) is optional.
if !(len(conf) == 5 || len(conf) == 3) || conf[0] != "fp" || conf[1] != "%" {
return 0, fmt.Errorf("Default config format should be like: %v", "default: fp % n + k")
}
var err error
var n uint64
n, err = strconv.ParseUint(conf[2], 10, 32)
x.Check(err)
groupConfig.n = uint32(n)
x.AssertTrue(groupConfig.n != 0)
if len(conf) == 5 {
if conf[3] != "+" {
return 0, fmt.Errorf("Default config format should be like: %v", "default: fp % n + k")
}
n, err = strconv.ParseUint(conf[4], 10, 32)
groupConfig.k = uint32(n)
x.Check(err)
}
if groupConfig.k == 0 {
return 0, fmt.Errorf(`k in fp % n + k should be greater than zero.`)
}
return groupConfig.k, nil
}
// ParseConfig parses a group config provided by reader.
func ParseConfig(r io.Reader) error {
groupConfig = config{}
scanner := bufio.NewScanner(r)
// To keep track of last groupId seen across lines. If we the groups ids are
// not sequential, we log.Fatal.
var curGroupId uint32 = 1
// If after seeing line with default config, we see other lines, we log.Fatal.
// Default config should be specified as the last line, so that we can check
// accurately that k in (fp % N + k) generates consecutive groups.
seenDefault := false
for scanner.Scan() {
l := scanner.Text()
// Skip empty lines and comments.
if l == "" || strings.HasPrefix(l, "//") {
continue
}
c := strings.Split(l, ":")
if len(c) < 2 {
return fmt.Errorf("Incorrect format for config line: %v", l)
}
if c[0] == "default" {
seenDefault = true
k, err := parseDefaultConfig(c[1])
if err != nil {
return err
}
if k == 0 {
continue
}
if curGroupId != 0 && k > curGroupId {
return fmt.Errorf("k in (fp mod N + k) should be <= the last groupno %v.",
curGroupId)
}
} else {
// There shouldn't be a line after the default config line.
if seenDefault {
return fmt.Errorf("Default config should be specified as the last line. Found %v",
l)
}
groupId, err := strconv.ParseUint(c[0], 10, 32)
x.Check(err)
if groupId == 0 {
return fmt.Errorf("Group ids should be greater than zero. Instead set to 0 for predicates: %v", c[1])
}
if curGroupId != uint32(groupId) {
return fmt.Errorf("Group ids should be sequential and should start from 1. "+
"Found %v, should have been %v", groupId, curGroupId)
}
curGroupId++
err = parsePredicates(uint32(groupId), c[1])
if err != nil {
return err
}
}
}
x.Check(scanner.Err())
return nil
}
// ParseGroupConfig parses the config file and stores the predicate <-> group map.
func ParseGroupConfig(file string) error {
cf, err := os.Open(file)
if os.IsNotExist(err) {
groupConfig.n = 1
groupConfig.k = 1
return nil
}
x.Check(err)
if err = ParseConfig(cf); err != nil {
return err
}
if groupConfig.n == 0 {
return fmt.Errorf("Cant take modulo 0.")
}
return nil
}
func fpGroup(pred string) uint32 {
if groupConfig.n == 1 {
return groupConfig.k
}
return farm.Fingerprint32([]byte(pred))%groupConfig.n + groupConfig.k
}
func BelongsTo(pred string) uint32 {
for _, meta := range groupConfig.pred {
if meta.exactMatch && meta.val == pred {
return meta.gid
}
if !meta.exactMatch && strings.HasPrefix(pred, meta.val) {
return meta.gid
}
}
return fpGroup(pred)
}
/*
* Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package group
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestGroups(t *testing.T) {
err := ParseGroupConfig("group_tests/filemissing.conf")
if err != nil {
t.Errorf("Expected nil error. Got: %v", err)
}
gid := BelongsTo("type.object.name.en")
if gid != 1 {
t.Errorf("Expected groupId to be: %v. Got: %v", 1, gid)
}
gid = BelongsTo("_uid_")
if gid != 1 {
t.Errorf("Expected groupId to be: %v. Got: %v", 1, gid)
}
groupConfig = config{}
err = ParseGroupConfig("group_tests/defaultmissing.conf")
if err.Error() != "Cant take modulo 0." {
t.Error("Error doesn't match expected value")
}
groupConfig = config{}
err = ParseGroupConfig("group_tests/defaultwrongseq.conf")
require.Contains(t, err.Error(), "k in (fp mod N + k) should be")
groupConfig = config{}
err = ParseGroupConfig("group_tests/defaultnotlast.conf")
require.Contains(t, err.Error(), "Default config should be specified as the last line.")
groupConfig = config{}
err = ParseGroupConfig("group_tests/doubledefault.conf")
require.Contains(t, err.Error(), "Default config can only be defined once:")
groupConfig = config{}
err = ParseGroupConfig("group_tests/zerok.conf")
require.Contains(t, err.Error(), "k in fp")
groupConfig = config{}
err = ParseGroupConfig("group_tests/incorrectformat.conf")
if err.Error() != "Incorrect format for config line: _uid_" {
t.Error("Error doesn't match expected value")
}
groupConfig = config{}
err = ParseGroupConfig("group_tests/wrongformat.conf")
require.Contains(t, err.Error(), "Default config format should be like:")
groupConfig = config{}
err = ParseGroupConfig("group_tests/wrongsequence.conf")
require.Contains(t, err.Error(), "Group ids should be sequential and should start from 1")
groupConfig = config{}
if err = ParseGroupConfig("group_tests/defaultright.conf"); err != nil {
t.Errorf("Expected nil error. Got: %v", err)
}
groupConfig = config{}
err = ParseGroupConfig("group_tests/zeropred.conf")
require.Contains(t, err.Error(), "Group ids should be greater than zero.")
groupConfig = config{}
if err = ParseGroupConfig("group_tests/rightsequence.conf"); err != nil {
t.Errorf("Expected nil error. Got: %v", err)
}
gid = BelongsTo("_uid_")
if gid != 1 {
t.Errorf("Expected groupId to be: %v. Got: %v", 1, gid)
}
gid = BelongsTo("type.object.name.fr")
if gid != 2 {
t.Errorf("Expected groupId to be: %v. Got: %v", 2, gid)
}
gid = BelongsTo("film.actor.film")
if gid != 11 {
t.Errorf("Expected groupId to be: %v. Got: %v", 11, gid)
}
groupConfig = config{}
err = ParseGroupConfig("group_tests/reverse_pred.conf")
require.Error(t, err)
}
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name.en
3: type.object.name*, film.performance.*
// Default config missing, should error out.
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name.en
// Default formula for getting group where fp is the fingerprint of a predicate.
default: fp % 10 + 2
3: type.object.name*, film.performance.*
// Default formula for getting group.
// Everything ok should pass.
default: fp % 10 + 1
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name.en
3: type.object.name*, film.performance.*
// Default formula for getting group where fp is the fingerprint of a predicate.
// 4 > 2 the last groupId should error out.
default: fp % 10 + 5
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name.en
3: type.object.name*, film.performance.*
// Default formula for getting group where fp is the fingerprint of a predicate.
default: fp % 10 + 4
// Should give double default error.
default: fp % 10 + 4
// If * is specified prefix matching would be done.
_uid_
1: type.object.name.en
// If * is specified prefix matching would be done.
1: _uid_
// This should error out as we cant assign groups to reverses.
2: ~director.film
// Default formula for getting group where fp is the fingerprint of a predicate.
default: fp % 10 + 3
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name*, film.performance.*
// Default formula for getting group where fp is the fingerprint of a predicate.
default: fp % 10 + 3
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name.en
3: type.object.name*, film.performance.*
// Default formula for getting group where fp is the fingerprint of a predicate.
default: attr % 10 + 4
// If * is specified prefix matching would be done.
1: _uid_
2: type.object.name.en
// Group id not sequential should error out.
7: type.object.name*, film.performance.*
// Default formula for getting group where fp is the fingerprint of a predicate.
default: fp % 10 + 4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment