Skip to content
Snippets Groups Projects
Unverified Commit 9b8744d4 authored by Manish R Jain's avatar Manish R Jain
Browse files

RAFT: Consensus based mutations. Waits for mutations to be committed to the...

RAFT: Consensus based mutations. Waits for mutations to be committed to the RAFT group, before replying back to the client. Also removes the unnecessary logic of returning unapplied mutations. Instead on an error, all the mutations should be reapplied.
parent ee7bf285
No related branches found
No related tags found
No related merge requests found
......@@ -114,6 +114,7 @@ func convertToNQuad(ctx context.Context, mutation string) ([]rdf.NQuad, error) {
var nquads []rdf.NQuad
r := strings.NewReader(mutation)
scanner := bufio.NewScanner(r)
x.Trace(ctx, "Converting to NQuad")
// Scanning the mutation string, one line at a time.
for scanner.Scan() {
......@@ -178,21 +179,11 @@ func convertToEdges(ctx context.Context, nquads []rdf.NQuad) (mutationResult, er
}
func applyMutations(ctx context.Context, m worker.Mutations) error {
left, err := worker.MutateOverNetwork(ctx, m)
err := worker.MutateOverNetwork(ctx, m)
if err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while MutateOverNetwork"))
return err
}
if len(left.Set) > 0 || len(left.Del) > 0 {
x.TraceError(ctx, x.Errorf("%d edges couldn't be applied", len(left.Del)+len(left.Set)))
for _, e := range left.Set {
x.TraceError(ctx, x.Errorf("Unable to apply set mutation for edge: %v", e))
}
for _, e := range left.Del {
x.TraceError(ctx, x.Errorf("Unable to apply delete mutation for edge: %v", e))
}
return x.Errorf("Unapplied mutations")
}
return nil
}
......
......@@ -282,6 +282,7 @@ func (l *List) getPostingList() *types.PostingList {
}
// Caller must hold at least a read lock.
// TODO: Consider using the new sort.Search function.
func (l *List) lePostingIndex(maxUid uint64) (int, uint64) {
posting := l.getPostingList()
left, right := 0, posting.PostingsLength()-1
......@@ -313,6 +314,7 @@ func (l *List) lePostingIndex(maxUid uint64) (int, uint64) {
return sofar, p.Uid()
}
// TODO: Consider using the new sort.Search function.
func (l *List) leMutationIndex(maxUid uint64) (int, uint64) {
left, right := 0, len(l.mindex)-1
sofar := -1
......
......@@ -244,14 +244,11 @@ func GetOrAssign(xid string, instanceIdx uint64,
if pl.Length() > 1 {
log.Fatalf("We shouldn't have more than 1 uid for xid: %v\n", xid)
} else {
// We found one posting.
var p types.Posting
if ok := pl.Get(&p, 0); !ok {
return 0, errors.New("While retrieving entry from posting list")
}
return p.Uid(), nil
}
return 0, errors.New("Some unhandled route lead me here." +
" Wake the stupid developer up.")
// We found one posting.
var p types.Posting
if ok := pl.Get(&p, 0); !ok {
return 0, errors.New("While retrieving entry from posting list")
}
return p.Uid(), nil
}
package worker
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"math/rand"
"strconv"
"strings"
"sync"
......@@ -15,23 +16,73 @@ import (
"github.com/dgraph-io/dgraph/x"
)
const (
mutationMsg = 1
assignMsg = 2
)
type peerPool struct {
sync.RWMutex
peers map[uint64]*Pool
}
func (p *peerPool) Get(id uint64) *Pool {
p.RLock()
defer p.RUnlock()
return p.peers[id]
}
func (p *peerPool) Set(id uint64, pool *Pool) {
p.Lock()
defer p.Unlock()
p.peers[id] = pool
}
type proposals struct {
sync.RWMutex
ids map[uint32]chan error
}
func (p *proposals) Store(pid uint32, ch chan error) {
p.Lock()
defer p.Unlock()
_, has := p.ids[pid]
x.Assertf(!has, "Same proposal is being stored again.")
p.ids[pid] = ch
}
func (p *proposals) Done(pid uint32, err error) {
var ch chan error
p.Lock()
ch, has := p.ids[pid]
if has {
delete(p.ids, pid)
}
p.Unlock()
if !has {
return
}
ch <- err
}
type node struct {
cfg *raft.Config
ctx context.Context
data map[string]string
done <-chan struct{}
id uint64
localAddr string
peers peerPool
props proposals
raft raft.Node
store *raft.MemoryStorage
peers map[uint64]*Pool
localAddr string
}
// TODO: Make this thread safe.
func (n *node) Connect(pid uint64, addr string) {
if pid == n.id {
return
}
if _, has := n.peers[pid]; has {
if pool := n.peers.Get(pid); pool != nil {
return
}
......@@ -50,26 +101,81 @@ func (n *node) Connect(pid uint64, addr string) {
if err != nil {
log.Fatalf("Unable to connect: %v", err)
}
_ = pool.Put(conn)
n.peers[pid] = pool
x.Check(pool.Put(conn))
n.peers.Set(pid, pool)
fmt.Printf("CONNECTED TO %d %v\n", pid, addr)
return
}
func (n *node) AddToCluster(pid uint64) {
pool := n.peers.Get(pid)
x.Assertf(pool != nil, "Unable to find conn pool for peer: %d", pid)
n.raft.ProposeConfChange(context.TODO(), raftpb.ConfChange{
ID: pid,
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
Context: []byte(strconv.FormatUint(pid, 10) + ":" + n.peers[pid].Addr),
Context: []byte(strconv.FormatUint(pid, 10) + ":" + pool.Addr),
})
}
type header struct {
proposalId uint32
msgId uint16
}
func (h *header) Length() int {
return 6 // 4 bytes for proposalId, 2 bytes for msgId.
}
func (h *header) Encode() []byte {
result := make([]byte, h.Length())
binary.LittleEndian.PutUint32(result[0:4], h.proposalId)
binary.LittleEndian.PutUint16(result[4:6], h.msgId)
return result
}
func (h *header) Decode(in []byte) {
h.proposalId = binary.LittleEndian.Uint32(in[0:4])
h.msgId = binary.LittleEndian.Uint16(in[4:6])
}
func (n *node) ProposeAndWait(ctx context.Context, msg uint16, data []byte) error {
var h header
h.proposalId = rand.Uint32()
h.msgId = msg
hdata := h.Encode()
proposalData := make([]byte, len(data)+len(hdata))
x.Assert(copy(proposalData, hdata) == len(hdata))
x.Assert(copy(proposalData[len(hdata):], data) == len(data))
che := make(chan error, 1)
n.props.Store(h.proposalId, che)
err := n.raft.Propose(ctx, proposalData)
if err != nil {
return x.Wrapf(err, "While proposing")
}
// Wait for the proposal to be committed.
x.Trace(ctx, "Waiting for the proposal to be applied.")
select {
case err = <-che:
x.TraceError(ctx, err)
fmt.Printf("DEBUG. Proposeandwait replied with: %v", err)
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (n *node) send(m raftpb.Message) {
x.Assertf(n.id != m.To, "Seding message to itself")
pool, has := n.peers[m.To]
x.Assertf(has, "Don't have address for peer: %d", m.To)
pool := n.peers.Get(m.To)
x.Assertf(pool != nil, "Don't have address for peer: %d", m.To)
conn, err := pool.Get()
x.Check(err)
......@@ -84,8 +190,6 @@ func (n *node) send(m raftpb.Message) {
}
func (n *node) process(e raftpb.Entry) error {
// TODO: Implement this.
fmt.Printf("process: %+v\n", e)
if e.Data == nil {
return nil
}
......@@ -105,11 +209,23 @@ func (n *node) process(e raftpb.Entry) error {
}
if e.Type == raftpb.EntryNormal {
parts := bytes.SplitN(e.Data, []byte(":"), 2)
k := string(parts[0])
v := string(parts[1])
n.data[k] = v
fmt.Printf(" Key: %v Val: %v\n", k, v)
var h header
h.Decode(e.Data[0:h.Length()])
x.Assertf(h.msgId == 1, "We only handle mutations for now.")
m := new(Mutations)
// Ensure that this can be decoded.
if err := m.Decode(e.Data[h.Length():]); err != nil {
x.TraceError(n.ctx, err)
n.props.Done(h.proposalId, err)
return err
}
if err := mutate(n.ctx, m); err != nil {
x.TraceError(n.ctx, err)
n.props.Done(h.proposalId, err)
return err
}
n.props.Done(h.proposalId, nil)
}
return nil
......@@ -150,7 +266,8 @@ func (n *node) processSnapshot(s raftpb.Snapshot) {
fmt.Printf("Don't know who the leader is")
return
}
pool := n.peers[lead]
pool := n.peers.Get(lead)
x.Assertf(pool != nil, "Leader: %d pool should not be nil", lead)
fmt.Printf("Getting snapshot from leader: %v", lead)
_, err := ws.PopulateShard(context.TODO(), pool, 0)
x.Checkf(err, "processSnapshot")
......@@ -184,19 +301,12 @@ func (n *node) Run() {
}
}
func (n *node) Campaign(ctx context.Context) {
if len(n.peers) > 0 {
fmt.Printf("CAMPAIGN\n")
x.Check(n.raft.Campaign(ctx))
}
}
func (n *node) Step(ctx context.Context, msg raftpb.Message) error {
return n.raft.Step(ctx, msg)
}
func (n *node) SnapshotPeriodically() {
for t := range time.Tick(10 * time.Second) {
for t := range time.Tick(time.Minute) {
fmt.Printf("Snapshot Periodically: %v", t)
le, err := n.store.LastIndex()
......@@ -232,12 +342,12 @@ func parsePeer(peer string) (uint64, string) {
func (n *node) JoinCluster(any string, s *State) {
// Tell one of the peers to join.
pid, paddr := parsePeer(any)
n.Connect(pid, paddr)
// TODO: Make this thread safe.
pool := n.peers[pid]
pool := n.peers.Get(pid)
x.Assertf(pool != nil, "Unable to find pool for peer: %d", pid)
// TODO: Ask for the leader, before running PopulateShard.
// Bring the instance up to speed first.
_, err := s.PopulateShard(context.TODO(), pool, 0)
......@@ -261,8 +371,16 @@ func (n *node) JoinCluster(any string, s *State) {
func newNode(id uint64, my string) *node {
fmt.Printf("NEW NODE ID: %v\n", id)
peers := peerPool{
peers: make(map[uint64]*Pool),
}
props := proposals{
ids: make(map[uint32]chan error),
}
store := raft.NewMemoryStorage()
n := &node{
ctx: context.TODO(),
id: id,
store: store,
cfg: &raft.Config{
......@@ -274,7 +392,8 @@ func newNode(id uint64, my string) *node {
MaxInflightMsgs: 256,
},
data: make(map[string]string),
peers: make(map[uint64]*Pool),
peers: peers,
props: props,
localAddr: my,
}
return n
......
......@@ -60,7 +60,7 @@ func (m *Mutations) Decode(data []byte) error {
// runMutations goes through all the edges and applies them. It returns the
// mutations which were not applied in left.
func runMutations(ctx context.Context, edges []x.DirectedEdge, op byte, left *Mutations) error {
func runMutations(ctx context.Context, edges []x.DirectedEdge, op byte) error {
for _, edge := range edges {
if farm.Fingerprint64(
[]byte(edge.Attribute))%ws.numGroups != ws.groupId {
......@@ -72,62 +72,47 @@ func runMutations(ctx context.Context, edges []x.DirectedEdge, op byte, left *Mu
defer decr()
if err := plist.AddMutationWithIndex(ctx, edge, op); err != nil {
if op == posting.Set {
left.Set = append(left.Set, edge)
} else if op == posting.Del {
left.Del = append(left.Del, edge)
}
log.Printf("Error while adding mutation: %v %v", edge, err)
continue
return err // abort applying the rest of them.
}
}
return nil
}
// mutate runs the set and delete mutations.
func mutate(ctx context.Context, m *Mutations, left *Mutations) error {
func mutate(ctx context.Context, m *Mutations) error {
// Running the set instructions first.
if err := runMutations(ctx, m.Set, posting.Set, left); err != nil {
if err := runMutations(ctx, m.Set, posting.Set); err != nil {
return err
}
if err := runMutations(ctx, m.Del, posting.Del, left); err != nil {
if err := runMutations(ctx, m.Del, posting.Del); err != nil {
return err
}
return nil
}
// runMutate is used to run the mutations on an instance.
func runMutate(ctx context.Context, idx int, m *Mutations,
replies chan *Payload, che chan error) {
left := new(Mutations)
var err error
func proposeMutation(ctx context.Context, idx int, m *Mutations, che chan error) {
data, err := m.Encode()
if err != nil {
che <- err
return
}
// We run them locally if idx == groupId
// HACK HACK HACK
// if idx == int(ws.groupId) {
if true {
if err = mutate(ctx, m, left); err != nil {
che <- err
return
}
reply := new(Payload)
// Encoding and sending back the mutations which were not applied.
if reply.Data, err = left.Encode(); err != nil {
che <- err
return
}
replies <- reply
che <- nil
che <- GetNode().ProposeAndWait(ctx, mutationMsg, data)
// che <- GetNode().raft.Propose(ctx, data)
return
}
// TODO: Move this to the appropriate place. Propose mutation should only deal with RAFT.
// Get a connection from the pool and run mutations over the network.
pool := ws.GetPool(idx)
query := new(Payload)
query.Data, err = m.Encode()
if err != nil {
che <- err
return
}
query.Data = data
conn, err := pool.Get()
if err != nil {
......@@ -137,19 +122,15 @@ func runMutate(ctx context.Context, idx int, m *Mutations,
defer pool.Put(conn)
c := NewWorkerClient(conn)
reply, err := c.Mutate(ctx, query)
if err != nil {
che <- err
return
}
replies <- reply
che <- nil
_, err = c.Mutate(ctx, query)
che <- err
}
// addToMutationArray adds the edges to the appropriate index in the mutationArray,
// taking into account the op(operation) and the attribute.
func addToMutationArray(mutationArray []*Mutations, edges []x.DirectedEdge, op string) {
for _, edge := range edges {
// TODO: Determine the right group using rules, instead of modulos.
idx := farm.Fingerprint64([]byte(edge.Attribute)) % ws.numGroups
mu := mutationArray[idx]
if mu == nil {
......@@ -165,15 +146,14 @@ func addToMutationArray(mutationArray []*Mutations, edges []x.DirectedEdge, op s
}
}
// MutateOverNetwork checks which instance should be running the mutations
// MutateOverNetwork checks which group should be running the mutations
// according to fingerprint of the predicate and sends it to that instance.
func MutateOverNetwork(ctx context.Context, m Mutations) (left Mutations, rerr error) {
func MutateOverNetwork(ctx context.Context, m Mutations) (rerr error) {
mutationArray := make([]*Mutations, ws.numGroups)
addToMutationArray(mutationArray, m.Set, set)
addToMutationArray(mutationArray, m.Del, del)
replies := make(chan *Payload, ws.numGroups)
errors := make(chan error, ws.numGroups)
count := 0
for idx, mu := range mutationArray {
......@@ -181,7 +161,7 @@ func MutateOverNetwork(ctx context.Context, m Mutations) (left Mutations, rerr e
continue
}
count++
go runMutate(ctx, idx, mu, replies, errors)
go proposeMutation(ctx, idx, mu, errors)
}
// Wait for all the goroutines to reply back.
......@@ -191,23 +171,13 @@ func MutateOverNetwork(ctx context.Context, m Mutations) (left Mutations, rerr e
case err := <-errors:
if err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while running all mutations"))
return left, err
return err
}
case <-ctx.Done():
return left, ctx.Err()
return ctx.Err()
}
}
close(replies)
close(errors)
// Any mutations which weren't applied are added to left which is returned.
for reply := range replies {
l := new(Mutations)
if err := l.Decode(reply.Data); err != nil {
return left, err
}
left.Set = append(left.Set, l.Set...)
left.Del = append(left.Del, l.Del...)
}
return left, nil
return nil
}
......@@ -145,19 +145,16 @@ func (w *grpcWorker) GetOrAssign(ctx context.Context, query *Payload) (*Payload,
// Mutate is used to apply mutations over the network on other instances.
func (w *grpcWorker) Mutate(ctx context.Context, query *Payload) (*Payload, error) {
m := new(Mutations)
// Ensure that this can be decoded. This is an optional step.
if err := m.Decode(query.Data); err != nil {
return nil, err
return nil, x.Wrapf(err, "While decoding mutation.")
}
left := new(Mutations)
if err := mutate(ctx, m, left); err != nil {
// Propose to the cluster.
// TODO: Figure out the right group to propose this to.
if err := GetNode().raft.Propose(ctx, query.Data); err != nil {
return nil, err
}
reply := new(Payload)
var rerr error
reply.Data, rerr = left.Encode()
return reply, rerr
return &Payload{}, nil
}
// ServeTask is used to respond to a query.
......
......@@ -129,7 +129,7 @@ var Nilbyte []byte
func Trace(ctx context.Context, format string, args ...interface{}) {
if *debugMode {
fmt.Printf(format, args...)
fmt.Printf(format+"\n", args...)
return
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment