Skip to content
Snippets Groups Projects
Unverified Commit 1cd36e1b authored by Manish R Jain's avatar Manish R Jain
Browse files

RAFT: Support node restarts.

- Implement a persistent WAL based on RocksDB. We use RocksDB because
given a RAFT entry at index i, all the entries which exist after i need
to be deleted. So, RocksDB became a natural choice to write such a
thing.

- Also, each server can run multiple RAFT groups, and we want to store
them all in one persistent storage.

- Batch all messages to be sent to other nodes, with a small timeout of
10ms. Ignore any nodes which are down and can't receive messages, or any
other errors. RAFT would automatically retry those.

- Remove code from cluster and commit package.
parent 88f085c0
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2016 Dgraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
This package includes getting the list of predicates that a node serves
and sharing it with other nodes in the cluster.
A RAFT backed key-value store will maintain a globally consistent
mapping from a given predicate to the information of the node
that serves that predicate.
*/
package cluster
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cluster
import (
"bytes"
"log"
"github.com/dgraph-io/dgraph/store"
)
func getPredicate(b []byte) string {
buf := bytes.NewBuffer(b)
a, err := buf.ReadString('|')
if err != nil {
log.Fatalf("Error retrieving predicates. Byte: %v", b)
}
str := string(a[:len(a)-1]) // omit the trailing '|'
return str
}
func GetPredicateList(ps *store.Store) []string {
var predicateList []string
var lastPredicate, predicate string
it := ps.NewIterator()
for it.SeekToFirst(); it.Valid(); it.Next() {
predicate = getPredicate(it.Key().Data())
if predicate != lastPredicate {
predicateList = append(predicateList, predicate)
lastPredicate = predicate
}
}
return predicateList
}
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cluster
import (
"bytes"
"encoding/binary"
"io/ioutil"
"os"
"testing"
"github.com/Sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/store"
)
func TestgetPredicate(t *testing.T) {
buf := bytes.NewBufferString("friends")
require.NoError(t, binary.Write(buf, binary.LittleEndian, 12345))
require.EqualValues(t, getPredicate(buf.Bytes()), "friends")
}
func TestGetPredicateList(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
dir1, err := ioutil.TempDir("", "dir_")
require.NoError(t, err)
defer os.RemoveAll(dir1)
ps1, err := store.NewStore(dir1)
require.NoError(t, err)
defer ps1.Close()
k1 := posting.Key(1000, "friend")
k2 := posting.Key(1010, "friend")
k3 := posting.Key(1020, "friend")
k4 := posting.Key(1030, "follow")
k5 := posting.Key(1040, "follow")
ps1.SetOne(k1, []byte("alice"))
ps1.SetOne(k2, []byte("bob"))
ps1.SetOne(k3, []byte("ram"))
ps1.SetOne(k4, []byte("ash"))
ps1.SetOne(k5, []byte("mallory"))
require.Equal(t, GetPredicateList(ps1), []string{"follow", "friend"})
}
package cluster
type shardinfo struct {
ip string
}
......@@ -41,6 +41,7 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/query/graph"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/store"
......@@ -52,13 +53,12 @@ import (
)
var (
postingDir = flag.String("p", "p", "Directory to store posting lists")
mutationDir = flag.String("m", "m", "Directory to store mutations")
port = flag.Int("port", 8080, "Port to run server on.")
numcpu = flag.Int("cores", runtime.NumCPU(),
postingDir = flag.String("p", "p", "Directory to store posting lists.")
walDir = flag.String("w", "w", "Directory to store raft write-ahead logs.")
port = flag.Int("port", 8080, "Port to run server on.")
numcpu = flag.Int("cores", runtime.NumCPU(),
"Number of cores to be used by the process")
raftId = flag.Uint64("idx", 1, "RAFT ID that this server will use to join RAFT groups.")
cluster = flag.String("cluster", "", "List of peers in this format: ID1:URL1,ID2:URL2,...")
peer = flag.String("peer", "", "Address of any peer.")
workerPort = flag.String("workerport", ":12345",
"Port used by worker for internal communication.")
......@@ -68,11 +68,10 @@ var (
cpuprofile = flag.String("cpu", "", "write cpu profile to file")
memprofile = flag.String("mem", "", "write memory profile to file")
schemaFile = flag.String("schema", "", "Path to schema file")
rdbStats = flag.Duration("rdbstats", 5*time.Minute, "Print out RocksDB stats every this many seconds. If <=0, we don't print anyting.")
groupConf = flag.String("conf", "", "Path to config file with group <-> predicate mapping.")
closeCh = make(chan struct{})
groupId uint64 = 0 // ALL
rdbStats = flag.Duration("rdbstats", 5*time.Minute,
"Print out RocksDB stats every this many seconds. If <=0, we don't print anyting.")
groupConf = flag.String("conf", "", "Path to config file with group <-> predicate mapping.")
closeCh = make(chan struct{})
)
type mutationResult struct {
......@@ -564,9 +563,9 @@ func checkFlagsAndInitDirs() {
if err != nil {
log.Fatalf("Error while creating the filepath for postings: %v", err)
}
err = os.MkdirAll(*mutationDir, 0700)
err = os.MkdirAll(*walDir, 0700)
if err != nil {
log.Fatalf("Error while creating the filepath for mutations: %v", err)
log.Fatalf("Error while creating the filepath for wal: %v", err)
}
}
......@@ -584,7 +583,7 @@ func serveHTTP(l net.Listener) {
}
}
func setupServer() {
func setupServer(che chan error) {
go worker.RunServer(*workerPort) // For internal communication.
l, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
......@@ -615,10 +614,7 @@ func setupServer() {
log.Println("Server listening on port", *port)
// Start cmux serving.
if err := tcpm.Serve(); !strings.Contains(err.Error(),
"use of closed network connection") {
log.Fatal(err)
}
che <- tcpm.Serve()
}
func printStats(ps *store.Store) {
......@@ -639,6 +635,11 @@ func main() {
x.Checkf(err, "Error initializing postings store")
defer ps.Close()
wals, err := store.NewSyncStore(*walDir)
x.Checkf(err, "Error initializing wal store")
defer wals.Close()
wal := raftwal.Init(wals, *raftId)
posting.InitIndex(ps)
posting.Init()
printStats(ps)
......@@ -647,14 +648,6 @@ func main() {
worker.SetState(ps)
uid.Init(ps)
my := "localhost" + *workerPort
// TODO: Clean up the RAFT group creation code.
// First initiate the commmon group across the entire cluster. This group
// stores information about which server serves which groups.
go worker.StartRaftNodes(*raftId, my, *cluster, *peer)
if len(*schemaFile) > 0 {
err = schema.Parse(*schemaFile)
if err != nil {
......@@ -662,5 +655,17 @@ func main() {
}
}
// Setup external communication.
setupServer()
che := make(chan error, 1)
go setupServer(che)
// TODO: Clean up the RAFT group creation code.
// Initiate the commmon group across the entire cluster. This group
// stores information about which server serves which groups.
my := "localhost" + *workerPort
go worker.StartRaftNodes(wal, *raftId, my, *peer)
if err := <-che; !strings.Contains(err.Error(),
"use of closed network connection") {
log.Fatal(err)
}
}
/*
* Copyright 2015 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package commit provides commit logs for storing mutations, as they arrive
// at the server. Mutations also get stored in memory within posting.List.
// So, commit logs are useful to handle machine crashes, and re-init of a
// posting list.
// This package provides functionality to write to a rotating log, and a way
// to quickly filter relevant entries corresponding to an attribute.
package commit
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/trace"
)
type logFile struct {
sync.RWMutex
endTs int64 // never modified after creation.
path string
size int64
}
type curFile struct {
sync.RWMutex
f *os.File
size int64
dirtyLogs int
}
func (c *curFile) Size() int64 {
c.RLock()
defer c.RUnlock()
return c.size
}
type ByTimestamp []*logFile
func (b ByTimestamp) Len() int { return len(b) }
func (b ByTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByTimestamp) Less(i, j int) bool {
return b[i].endTs < b[j].endTs
}
type Logger struct {
// Directory to store logs into.
dir string
// Prefix all filenames with this.
filePrefix string
// MaxSize is the maximum size of commit log file in bytes,
// before it gets rotated.
maxSize int64
// Sync every N logs. A value of zero or less would mean
// sync every append to file.
SyncEvery int
// Sync every d duration.
SyncDur time.Duration
sync.RWMutex
list []*logFile
cf *curFile
lastLogTs int64 // handled via atomics.
ticker *time.Ticker
events trace.EventLog
}
func (l *Logger) curFile() *curFile {
l.RLock()
defer l.RUnlock()
return l.cf
}
func (l *Logger) updateLastLogTs(val int64) {
for {
prev := atomic.LoadInt64(&l.lastLogTs)
if val <= prev {
return
}
if atomic.CompareAndSwapInt64(&l.lastLogTs, prev, val) {
return
}
}
}
func (l *Logger) periodicSync() {
if l.SyncDur == 0 {
l.events.Printf("No Periodic Sync for commit log.")
return
}
l.events.Printf("Periodic Sync at duration: %v", l.SyncDur)
l.ticker = time.NewTicker(l.SyncDur)
for _ = range l.ticker.C {
cf := l.curFile()
if cf == nil {
continue
}
{
cf.Lock()
if cf.dirtyLogs > 0 {
if err := cf.f.Sync(); err != nil {
l.events.Errorf("While periodically syncing: %v", err)
} else {
cf.dirtyLogs = 0
l.events.Printf("Successful periodic sync.")
}
} else {
l.events.Printf("Skipping periodic sync.")
}
cf.Unlock()
}
}
}
func (l *Logger) Close() {
l.Lock()
defer l.Unlock()
if l.ticker != nil {
l.ticker.Stop()
}
if l.cf != nil {
if err := l.cf.f.Close(); err != nil {
l.events.Errorf("Error while closing current file: %v", err)
}
l.cf = nil
}
l.events.Finish()
}
func NewLogger(dir string, fileprefix string, maxSize int64) *Logger {
l := new(Logger)
l.dir = dir
l.filePrefix = fileprefix
l.maxSize = maxSize
l.events = trace.NewEventLog("commit", "Logger")
return l
}
// A mutex lock should have already been acquired to call this function.
func (l *Logger) handleFile(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
if !strings.HasPrefix(info.Name(), l.filePrefix+"-") {
return nil
}
if !strings.HasSuffix(info.Name(), ".log") {
return nil
}
lidx := strings.LastIndex(info.Name(), ".log")
tstring := info.Name()[len(l.filePrefix)+1 : lidx]
l.events.Printf("Found log with ts: %v", tstring)
// Handle if we find the current log file.
if tstring == "current" {
return nil
}
ts, err := strconv.ParseInt(tstring, 16, 64)
if err != nil {
return err
}
lf := new(logFile)
lf.endTs = ts
lf.path = path
l.list = append(l.list, lf)
l.updateLastLogTs(lf.endTs)
return nil
}
func (l *Logger) Init() {
os.MkdirAll(l.dir, 0700)
// Check the directory exists.
if fi, err := os.Stat(l.dir); err != nil || !fi.IsDir() {
log.Fatalf("Dir %q not found", l.dir)
}
l.Lock()
defer l.Unlock()
l.events.Printf("Logger init started")
{
// First check if we have a current file.
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
fi, err := os.Stat(path)
if err == nil {
// we have the file. Derive information for counters.
l.cf = new(curFile)
l.cf.size = fi.Size()
l.cf.dirtyLogs = 0
lastTs, err := lastTimestamp(path)
if err != nil {
log.Fatalf("Unable to read last log ts: %v", err)
}
l.updateLastLogTs(lastTs)
// Open file for append.
l.cf.f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY,
os.FileMode(0644))
if err != nil {
log.Fatalf("Unable to open current file in append mode: %v", err)
}
}
}
if err := filepath.Walk(l.dir, l.handleFile); err != nil {
log.Fatalf("While walking over directory: %v", err)
}
sort.Sort(ByTimestamp(l.list))
if l.cf == nil {
l.createNew()
}
go l.periodicSync()
l.events.Printf("Logger init finished.")
}
func (l *Logger) filepath(ts int64) string {
return fmt.Sprintf("%s-%s.log", l.filePrefix, strconv.FormatInt(ts, 16))
}
type Header struct {
ts int64
hash uint32
size int32
}
func parseHeader(hdr []byte) (Header, error) {
buf := bytes.NewBuffer(hdr)
var h Header
var err error
setError(&err, binary.Read(buf, binary.LittleEndian, &h.ts))
setError(&err, binary.Read(buf, binary.LittleEndian, &h.hash))
setError(&err, binary.Read(buf, binary.LittleEndian, &h.size))
if err != nil {
return h, err
}
return h, nil
}
func lastTimestamp(path string) (int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, err
}
defer f.Close()
discard := make([]byte, 4096)
reader := bufio.NewReaderSize(f, 2<<20)
var maxTs int64
header := make([]byte, 16)
count := 0
for {
n, err := reader.Read(header)
if err == io.EOF {
break
}
if n < len(header) {
log.Fatalf("Unable to read full 16 byte header. Read %v", n)
}
if err != nil {
return 0, err
}
count++
h, err := parseHeader(header)
if err != nil {
return 0, err
}
if h.ts > maxTs {
maxTs = h.ts
} else if h.ts < maxTs {
log.Fatalf("Log file doesn't have monotonically increasing records."+
" ts: %v. maxts: %v. numrecords: %v", h.ts, maxTs, count)
}
for int(h.size) > cap(discard) {
discard = make([]byte, cap(discard)*2)
}
reader.Read(discard[:int(h.size)])
}
return maxTs, nil
}
func (l *Logger) rotateCurrent() error {
l.Lock()
defer l.Unlock()
cf := l.cf
cf.Lock()
defer cf.Unlock()
if len(l.list) > 0 {
last := l.list[len(l.list)-1]
if last.endTs > atomic.LoadInt64(&l.lastLogTs) {
return fmt.Errorf("Maxtimestamp is lower than existing commit logs.")
}
}
lastTs := atomic.LoadInt64(&l.lastLogTs)
newpath := filepath.Join(l.dir, l.filepath(lastTs))
if err := cf.f.Close(); err != nil {
return err
}
if err := os.Rename(cf.f.Name(), newpath); err != nil {
l.events.Errorf("Error while renaming: %v", err)
return err
}
lf := new(logFile)
lf.endTs = lastTs
lf.path = newpath
lf.size = cf.size
l.list = append(l.list, lf)
l.createNew()
return nil
}
// Expects a lock has already been acquired.
func (l *Logger) createNew() {
path := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
if err := os.MkdirAll(l.dir, 0744); err != nil {
log.Fatalf("Unable to create directory: %v", err)
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
os.FileMode(0644))
if err != nil {
log.Fatalf("Unable to create a new file: %v", err)
}
l.cf = new(curFile)
l.cf.f = f
}
func setError(prev *error, n error) {
if prev == nil {
prev = &n
}
return
}
func (l *Logger) AddLog(hash uint32, value []byte) (int64, error) {
// 16 bytes to write the ts, hash and len(value) later.
lbuf := int64(len(value)) + 16
if l.curFile().Size()+lbuf > l.maxSize {
if err := l.rotateCurrent(); err != nil {
l.events.Errorf("Error while rotating current file out: %v", err)
return 0, err
}
}
cf := l.curFile()
if cf == nil {
log.Fatalf("Current file isn't initialized.")
}
cf.Lock()
defer cf.Unlock()
ts := time.Now().UnixNano()
lts := atomic.LoadInt64(&l.lastLogTs)
if ts < lts {
ts = lts + 1
// We don't have to do CompareAndSwap because we've a mutex lock.
}
buf := new(bytes.Buffer)
var err error
setError(&err, binary.Write(buf, binary.LittleEndian, ts))
setError(&err, binary.Write(buf, binary.LittleEndian, hash))
setError(&err, binary.Write(buf, binary.LittleEndian, int32(len(value))))
_, nerr := buf.Write(value)
setError(&err, nerr)
if err != nil {
return ts, err
}
if _, err = cf.f.Write(buf.Bytes()); err != nil {
l.events.Errorf("Error while writing to current file: %v", err)
return ts, err
}
cf.dirtyLogs++
cf.size += int64(buf.Len())
l.updateLastLogTs(ts)
if l.SyncEvery <= 0 || cf.dirtyLogs >= l.SyncEvery {
cf.dirtyLogs = 0
l.events.Printf("Syncing file")
return ts, cf.f.Sync()
}
return ts, nil
}
// streamEntries allows for hash to be zero.
// This means iterate over all the entries.
func streamEntriesInFile(path string,
afterTs int64, hash uint32, iter LogIterator) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
discard := make([]byte, 4096)
reader := bufio.NewReaderSize(f, 5<<20)
header := make([]byte, 16)
for {
n, err := reader.Read(header)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("While reading header: %v", err)
return err
}
if n != len(header) {
log.Fatalf("Unable to read all data. Size: %v, expected: %v", n, len(header))
}
hdr, err := parseHeader(header)
if err != nil {
return err
}
if (hash == 0 || hdr.hash == hash) && hdr.ts >= afterTs {
data := make([]byte, hdr.size)
n, err := reader.Read(data)
if err != nil {
log.Fatalf("While reading data: %v", err)
return err
}
if int32(n) != hdr.size {
log.Fatalf("Unable to read all data. Size: %v, expected: %v", n, hdr.size)
}
iter(hdr, data)
} else {
for int(hdr.size) > cap(discard) {
discard = make([]byte, cap(discard)*2)
}
reader.Read(discard[:int(hdr.size)])
}
}
return nil
}
type LogIterator func(hdr Header, record []byte)
func (l *Logger) StreamEntries(afterTs int64, hash uint32,
iter LogIterator) error {
if atomic.LoadInt64(&l.lastLogTs) < afterTs {
return nil
}
var paths []string
l.RLock()
for _, lf := range l.list {
if afterTs < lf.endTs {
paths = append(paths, lf.path)
}
}
l.RUnlock()
{
cur := filepath.Join(l.dir, fmt.Sprintf("%s-current.log", l.filePrefix))
if _, err := os.Stat(cur); err == nil {
paths = append(paths, cur)
}
}
for _, path := range paths {
if err := streamEntriesInFile(path, afterTs, hash, iter); err != nil {
return err
}
}
return nil
}
/*
* Copyright 2015 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package commit
import (
"bytes"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
)
func TestHandleFile(t *testing.T) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 50<<20)
ts := time.Now().UnixNano()
for i := 0; i < 10; i++ {
fp := filepath.Join(dir, l.filepath(ts+int64(i)))
if err := ioutil.WriteFile(fp, []byte("test calling"),
os.ModeAppend); err != nil {
t.Error(err)
return
}
}
l.Init()
for i, lf := range l.list {
exp := ts + int64(i)
if lf.endTs != exp {
t.Errorf("Expected %v. Got: %v", exp, lf.endTs)
}
}
}
func TestAddLog(t *testing.T) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 50<<20)
l.SyncDur = time.Millisecond
l.SyncEvery = 1000 // So, sync after write never gets called.
l.Init()
defer l.Close()
for i := 0; i < 10; i++ {
if _, err := l.AddLog(0, []byte("hey")); err != nil {
t.Error(err)
t.Fail()
}
time.Sleep(500 * time.Microsecond)
}
_, err = lastTimestamp(l.cf.f.Name())
if err != nil {
t.Error(err)
}
}
func TestRotatingLog(t *testing.T) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 1024) // 1 kB
l.SyncDur = 0
l.SyncEvery = 0
l.Init()
data := make([]byte, 400)
var ts []int64
for i := 0; i < 9; i++ {
if logts, err := l.AddLog(0, data); err != nil {
t.Error(err)
return
} else {
ts = append(ts, logts)
}
}
// This should have created 4 files of 832 bytes each (header + data), and
// the current file should be of size 416.
if len(l.list) != 4 {
t.Errorf("Expected 4 files. Got: %v", len(l.list))
}
for i, lf := range l.list {
exp := ts[i*2+1]
if lf.endTs != exp {
t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs)
}
}
if l.curFile().Size() != 416 {
t.Errorf("Expected size 416. Got: %v", l.curFile().Size())
}
l.Close()
l = nil // Important to avoid re-use later.
// Now, let's test a re-init of logger.
nl := NewLogger(dir, "dgraph", 1024)
nl.Init()
defer nl.Close()
if len(nl.list) != 4 {
t.Errorf("Expected 4 files. Got: %v", len(nl.list))
}
if nl.curFile().Size() != 416 {
t.Errorf("Expected size 416. Got: %v", nl.curFile().Size())
}
secondlast, err := nl.AddLog(0, data)
if err != nil {
t.Error(err)
return
}
if nl.curFile().Size() != 832 {
t.Errorf("Expected size 832. Got: %v", nl.curFile().Size())
}
last, err := nl.AddLog(0, data)
if err != nil {
t.Error(err)
return
}
if len(nl.list) != 5 {
t.Errorf("Expected 5 files. Got: %v", len(nl.list))
}
if nl.list[4].endTs != secondlast {
t.Errorf("Expected ts: %v. Got: %v", secondlast, nl.list[4].endTs)
}
if nl.curFile().Size() != 416 {
t.Errorf("Expected size 416. Got: %v", nl.curFile().Size())
}
if nl.lastLogTs != last {
t.Errorf("Expected last log ts: %v. Got: %v", last, nl.lastLogTs)
}
}
func TestReadEntries(t *testing.T) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 1024) // 1 kB
l.SyncDur = 0
l.SyncEvery = 0
l.Init()
defer l.Close()
data := make([]byte, 400)
var ts []int64
for i := 0; i < 9; i++ {
if lts, err := l.AddLog(uint32(i%3), data); err != nil {
t.Error(err)
return
} else {
ts = append(ts, lts)
}
}
// This should have created 4 files of 832 bytes each (header + data), and
// the current file should be of size 416.
if len(l.list) != 4 {
t.Errorf("Expected 4 files. Got: %v", len(l.list))
}
for i, lf := range l.list {
exp := ts[i*2+1]
if lf.endTs != exp {
t.Errorf("Expected end ts: %v. Got: %v", exp, lf.endTs)
}
}
if l.curFile().Size() != 416 {
t.Errorf("Expected size 416. Got: %v", l.curFile().Size())
}
if l.lastLogTs != ts[8] {
t.Errorf("Expected ts: %v. Got: %v", ts[8], l.lastLogTs)
}
{
// Check for hash = 1, ts >= 2.
count := 0
err := l.StreamEntries(ts[2], uint32(1), func(hdr Header, entry []byte) {
count++
if bytes.Compare(data, entry) != 0 {
t.Error("Data doesn't equate.")
}
})
if err != nil {
t.Error(err)
}
if count != 2 {
t.Errorf("Expected 2 entries. Got: %v", count)
}
}
{
// Add another entry for hash = 1.
if _, err := l.AddLog(1, data); err != nil {
t.Error(err)
}
// Check for hash = 1, ts >= 0.
count := 0
err := l.StreamEntries(ts[0], uint32(1), func(hdr Header, entry []byte) {
count++
if bytes.Compare(data, entry) != 0 {
t.Error("Data doesn't equate.")
}
})
if err != nil {
t.Error(err)
}
if count != 4 {
t.Errorf("Expected 4 entries. Got: %v", count)
}
}
}
func benchmarkAddLog(n int, b *testing.B) {
dir, err := ioutil.TempDir("", "dgraph-log")
if err != nil {
b.Error(err)
return
}
defer os.RemoveAll(dir)
l := NewLogger(dir, "dgraph", 50<<20)
l.SyncEvery = n
l.Init()
data := make([]byte, 100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
end := rand.Intn(50)
if _, err := l.AddLog(0, data[:50+end]); err != nil {
b.Error(err)
}
}
l.Close()
}
func BenchmarkAddLog_SyncEveryRecord(b *testing.B) { benchmarkAddLog(0, b) }
func BenchmarkAddLog_SyncEvery10Records(b *testing.B) { benchmarkAddLog(10, b) }
func BenchmarkAddLog_SyncEvery100Records(b *testing.B) { benchmarkAddLog(100, b) }
func BenchmarkAddLog_SyncEvery1000Records(b *testing.B) { benchmarkAddLog(1000, b) }
package raftwal
import (
"encoding/binary"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)
type Wal struct {
wals *store.Store
id uint64
}
func Init(walStore *store.Store, id uint64) *Wal {
return &Wal{wals: walStore, id: id}
}
func (w *Wal) snapshotKey(gid uint32) []byte {
b := make([]byte, 14)
binary.BigEndian.PutUint64(b[0:8], w.id)
copy(b[8:10], []byte("ss"))
binary.BigEndian.PutUint32(b[10:14], gid)
return b
}
func (w *Wal) hardStateKey(gid uint32) []byte {
b := make([]byte, 14)
binary.BigEndian.PutUint64(b[0:8], w.id)
copy(b[8:10], []byte("hs"))
binary.BigEndian.PutUint32(b[10:14], gid)
return b
}
func (w *Wal) entryKey(gid uint32, term, idx uint64) []byte {
b := make([]byte, 28)
binary.BigEndian.PutUint64(b[0:8], w.id)
binary.BigEndian.PutUint32(b[8:12], gid)
binary.BigEndian.PutUint64(b[12:20], term)
binary.BigEndian.PutUint64(b[20:28], idx)
return b
}
func (w *Wal) prefix(gid uint32) []byte {
b := make([]byte, 12)
binary.BigEndian.PutUint64(b[0:8], w.id)
binary.BigEndian.PutUint32(b[8:12], gid)
return b
}
// Store stores the snapshot, hardstate and entries for a given RAFT group.
func (w *Wal) Store(gid uint32, s raftpb.Snapshot, h raftpb.HardState, es []raftpb.Entry) error {
b := w.wals.NewWriteBatch()
defer b.Destroy()
if !raft.IsEmptySnap(s) {
data, err := s.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal snapshot")
}
b.Put(w.snapshotKey(gid), data)
}
if !raft.IsEmptyHardState(h) {
data, err := h.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal hardstate")
}
b.Put(w.hardStateKey(gid), data)
}
var t, i uint64
for _, e := range es {
t, i = e.Term, e.Index
data, err := e.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal entry")
}
k := w.entryKey(gid, e.Term, e.Index)
b.Put(k, data)
}
// If we get no entries, then the default value of t and i would be zero. That would
// end up deleting all the previous valid raft entry logs. This check avoids that.
if t > 0 || i > 0 {
// Delete all keys above this index.
start := w.entryKey(gid, t, i+1)
prefix := w.prefix(gid)
itr := w.wals.NewIterator()
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
b.Delete(itr.Key().Data())
}
}
err := w.wals.WriteBatch(b)
return x.Wrapf(err, "wal.Store: While WriteBatch")
}
func (w *Wal) Snapshot(gid uint32) (snap raftpb.Snapshot, rerr error) {
data, err := w.wals.Get(w.snapshotKey(gid))
if err != nil {
return snap, x.Wrapf(err, "While getting snapshot")
}
rerr = x.Wrapf(snap.Unmarshal(data), "While unmarshal snapshot")
return
}
func (w *Wal) HardState(gid uint32) (hd raftpb.HardState, rerr error) {
data, err := w.wals.Get(w.hardStateKey(gid))
if err != nil {
return hd, x.Wrapf(err, "While getting hardstate")
}
rerr = x.Wrapf(hd.Unmarshal(data), "While unmarshal hardstate")
return
}
func (w *Wal) Entries(gid uint32, fromTerm, fromIndex uint64) (es []raftpb.Entry, rerr error) {
start := w.entryKey(gid, fromTerm, fromIndex)
prefix := w.prefix(gid)
itr := w.wals.NewIterator()
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
data := itr.Value().Data()
var e raftpb.Entry
if err := e.Unmarshal(data); err != nil {
return es, x.Wrapf(err, "While unmarshal raftpb.Entry")
}
es = append(es, e)
}
return
}
......@@ -60,10 +60,16 @@ func NewStore(filepath string) (*Store, error) {
s.setOpts()
var err error
s.db, err = rdb.OpenDb(s.opt, filepath)
if err != nil {
return nil, x.Wrap(err)
}
return s, nil
return s, x.Wrap(err)
}
func NewSyncStore(filepath string) (*Store, error) {
s := &Store{}
s.setOpts()
s.wopt.SetSync(true) // Do synchronous writes.
var err error
s.db, err = rdb.OpenDb(s.opt, filepath)
return s, x.Wrap(err)
}
// NewReadOnlyStore constructs a readonly Store object at filepath, given options.
......@@ -72,10 +78,7 @@ func NewReadOnlyStore(filepath string) (*Store, error) {
s.setOpts()
var err error
s.db, err = rdb.OpenDbForReadOnly(s.opt, filepath, false)
if err != nil {
return nil, x.Wrap(err)
}
return s, nil
return s, x.Wrap(err)
}
// Get returns the value given a key for RocksDB.
......
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"log"
......@@ -16,6 +17,7 @@ import (
flatbuffers "github.com/google/flatbuffers/go"
"golang.org/x/net/context"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
)
......@@ -69,16 +71,24 @@ func (p *proposals) Done(pid uint32, err error) {
ch <- err
}
type sendmsg struct {
to uint64
data []byte
}
type node struct {
cfg *raft.Config
ctx context.Context
done chan struct{}
id uint64
gid uint32
peers peerPool
props proposals
raft raft.Node
store *raft.MemoryStorage
raftContext []byte
wal *raftwal.Wal
messages chan sendmsg
}
func (n *node) Connect(pid uint64, addr string) {
......@@ -149,7 +159,6 @@ func (n *node) ProposeAndWait(ctx context.Context, msg uint16, data []byte) erro
select {
case err = <-che:
x.TraceError(ctx, err)
fmt.Printf("DEBUG. Proposeandwait replied with: %v", err)
return err
case <-ctx.Done():
return ctx.Err()
......@@ -158,21 +167,69 @@ func (n *node) ProposeAndWait(ctx context.Context, msg uint16, data []byte) erro
func (n *node) send(m raftpb.Message) {
x.Assertf(n.id != m.To, "Seding message to itself")
data, err := m.Marshal()
x.Check(err)
fmt.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To)
select {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
default:
log.Fatalf("Unable to push messages to channel in send")
}
}
addr := n.peers.Get(m.To)
x.Assertf(len(addr) > 0, "Don't have address for peer: %d", m.To)
func (n *node) doSendMessage(to uint64, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
addr := n.peers.Get(to)
x.Assertf(len(addr) > 0, "Don't have address for peer: %d", to)
pool := pools().get(addr)
conn, err := pool.Get()
x.Check(err)
defer pool.Put(conn)
c := NewWorkerClient(conn)
m.Context = n.raftContext
data, err := m.Marshal()
x.Checkf(err, "Unable to marshal: %+v", m)
p := &Payload{Data: data}
_, err = c.RaftMessage(context.TODO(), p)
ch := make(chan error, 1)
go func() {
_, err = c.RaftMessage(ctx, p)
ch <- err
}()
select {
case <-ctx.Done():
return
case <-ch:
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}
}
func (n *node) batchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
for {
select {
case sm := <-n.messages:
if _, ok := batches[sm.to]; !ok {
batches[sm.to] = new(bytes.Buffer)
}
buf := batches[sm.to]
binary.Write(buf, binary.LittleEndian, uint32(len(sm.data)))
buf.Write(sm.data)
case <-time.Tick(10 * time.Millisecond):
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
go n.doSendMessage(to, buf.Bytes())
buf.Reset()
}
}
}
}
func (n *node) processMutation(e raftpb.Entry, h header) error {
......@@ -204,10 +261,9 @@ func (n *node) process(e raftpb.Entry) error {
if e.Data == nil {
return nil
}
fmt.Printf("Entry type to process: %v\n", e.Type.String())
fmt.Printf("Entry type to process: %+v\n", e)
if e.Type == raftpb.EntryConfChange {
fmt.Printf("Configuration change\n")
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
......@@ -239,18 +295,11 @@ func (n *node) process(e raftpb.Entry) error {
func (n *node) saveToStorage(s raftpb.Snapshot, h raftpb.HardState,
es []raftpb.Entry) {
if !raft.IsEmptySnap(s) {
fmt.Printf("saveToStorage snapshot: %v\n", s.String())
le, err := n.store.LastIndex()
if err != nil {
log.Fatalf("While retrieving last index: %v\n", err)
}
te, err := n.store.Term(le)
if err != nil {
log.Fatalf("While retrieving term: %v\n", err)
}
fmt.Printf("[Node: %d] Term: %v for le: %v\n", n.id, te, le)
if s.Metadata.Index <= le {
fmt.Printf("%d node ignoring snapshot. Last index: %v\n", n.id, le)
return
}
......@@ -268,7 +317,6 @@ func (n *node) saveToStorage(s raftpb.Snapshot, h raftpb.HardState,
func (n *node) processSnapshot(s raftpb.Snapshot) {
lead := n.raft.Status().Lead
if lead == 0 {
fmt.Printf("Don't know who the leader is")
return
}
addr := n.peers.Get(lead)
......@@ -276,31 +324,32 @@ func (n *node) processSnapshot(s raftpb.Snapshot) {
pool := pools().get(addr)
x.Assertf(pool != nil, "Leader: %d pool should not be nil", lead)
fmt.Printf("Getting snapshot from leader: %v", lead)
_, err := ws.PopulateShard(context.TODO(), pool, 0)
x.Checkf(err, "processSnapshot")
fmt.Printf("DONE with snapshot")
}
func (n *node) Run() {
fmt.Println("Run")
for {
select {
case <-time.Tick(time.Second):
n.raft.Tick()
case rd := <-n.raft.Ready():
x.Check(n.wal.Store(n.gid, rd.Snapshot, rd.HardState, rd.Entries))
n.saveToStorage(rd.Snapshot, rd.HardState, rd.Entries)
for _, msg := range rd.Messages {
// TODO: Do some optimizations here to drop messages.
msg.Context = n.raftContext
n.send(msg)
}
if !raft.IsEmptySnap(rd.Snapshot) {
fmt.Printf("Got snapshot: %q\n", rd.Snapshot.Data)
n.processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
x.Check(n.process(entry))
}
n.raft.Advance()
case <-n.done:
......@@ -320,9 +369,7 @@ func (n *node) Step(ctx context.Context, msg raftpb.Message) error {
func (n *node) snapshotPeriodically() {
for {
select {
case t := <-time.Tick(time.Minute):
fmt.Printf("Snapshot Periodically: %v", t)
case <-time.Tick(10 * time.Minute):
le, err := n.store.LastIndex()
x.Checkf(err, "Unable to retrieve last index")
......@@ -330,18 +377,14 @@ func (n *node) snapshotPeriodically() {
x.Checkf(err, "Unable to get existing snapshot")
si := existing.Metadata.Index
fmt.Printf("le, si: %v %v\n", le, si)
if le <= si {
fmt.Printf("le, si: %v %v. No snapshot\n", le, si)
continue
}
msg := fmt.Sprintf("Snapshot from %v", strconv.FormatUint(n.id, 10))
_, err = n.store.CreateSnapshot(le, nil, []byte(msg))
x.Checkf(err, "While creating snapshot")
x.Checkf(n.store.Compact(le), "While compacting snapshot")
fmt.Println("Snapshot DONE =================")
case <-n.done:
return
......@@ -358,7 +401,7 @@ func parsePeer(peer string) (uint64, string) {
return pid, kv[1]
}
func (n *node) JoinCluster(any string, s *State) {
func (n *node) joinPeers(any string, s *State) {
// Tell one of the peers to join.
pid, paddr := parsePeer(any)
n.Connect(pid, paddr)
......@@ -399,7 +442,7 @@ func createRaftContext(id uint64, gid uint32, addr string) []byte {
}
func newNode(gid uint32, id uint64, myAddr string) *node {
fmt.Printf("NEW NODE ID: %v\n", id)
fmt.Printf("NEW NODE GID, ID: [%v, %v]\n", gid, id)
peers := peerPool{
peers: make(map[uint64]string),
......@@ -412,10 +455,11 @@ func newNode(gid uint32, id uint64, myAddr string) *node {
n := &node{
ctx: context.TODO(),
id: id,
gid: gid,
store: store,
cfg: &raft.Config{
ID: id,
ElectionTick: 3,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: store,
MaxSizePerMsg: 4096,
......@@ -424,24 +468,77 @@ func newNode(gid uint32, id uint64, myAddr string) *node {
peers: peers,
props: props,
raftContext: createRaftContext(id, gid, myAddr),
messages: make(chan sendmsg, 1000),
}
return n
}
func (n *node) StartNode(cluster string) {
var peers []raft.Peer
if len(cluster) > 0 {
for _, p := range strings.Split(cluster, ",") {
pid, paddr := parsePeer(p)
peers = append(peers, raft.Peer{ID: pid})
n.Connect(pid, paddr)
func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
n.wal = wal
var sp raftpb.Snapshot
sp, rerr = wal.Snapshot(n.gid)
if rerr != nil {
return
}
var term, idx uint64
if !raft.IsEmptySnap(sp) {
fmt.Printf("Found Snapshot: %+v\n", sp)
restart = true
if rerr = n.store.ApplySnapshot(sp); rerr != nil {
return
}
term = sp.Metadata.Term
idx = sp.Metadata.Index
}
var hd raftpb.HardState
hd, rerr = wal.HardState(n.gid)
if rerr != nil {
return
}
if !raft.IsEmptyHardState(hd) {
fmt.Printf("Found hardstate: %+v\n", sp)
restart = true
if rerr = n.store.SetHardState(hd); rerr != nil {
return
}
}
n.raft = raft.StartNode(n.cfg, peers)
var es []raftpb.Entry
es, rerr = wal.Entries(n.gid, term, idx)
if rerr != nil {
return
}
fmt.Printf("Found %d entries\n", len(es))
if len(es) > 0 {
restart = true
}
rerr = n.store.Append(es)
return
}
func (n *node) InitAndStartNode(wal *raftwal.Wal, peer string) {
restart, err := n.initFromWal(wal)
x.Check(err)
if restart {
fmt.Printf("RESTARTING\n")
n.raft = raft.RestartNode(n.cfg)
} else {
if len(peer) > 0 {
n.raft = raft.StartNode(n.cfg, nil)
n.joinPeers(peer, ws)
} else {
peers := []raft.Peer{{ID: n.id}}
n.raft = raft.StartNode(n.cfg, peers)
}
}
go n.Run()
go n.snapshotPeriodically()
go n.Inform()
go n.batchAndSendMessages()
}
func (n *node) doInform(rc *task.RaftContext) {
......@@ -467,6 +564,9 @@ func (n *node) doInform(rc *task.RaftContext) {
data := b.Bytes[b.Head():]
common := groups().Node(math.MaxUint32)
if common == nil {
return
}
x.Checkf(common.ProposeAndWait(context.TODO(), membershipMsg, data),
"Expected acceptance.")
}
......@@ -476,10 +576,9 @@ func (n *node) Inform() {
if rc.Group() == math.MaxUint32 {
return
}
n.doInform(rc)
select {
case <-time.Tick(time.Minute):
case <-time.Tick(30 * time.Second):
n.doInform(rc)
case <-n.done:
return
......@@ -490,16 +589,7 @@ func (n *node) AmLeader() bool {
return n.raft.Status().Lead == n.raft.Status().ID
}
func (w *grpcWorker) RaftMessage(ctx context.Context, query *Payload) (*Payload, error) {
if ctx.Err() != nil {
return &Payload{}, ctx.Err()
}
msg := raftpb.Message{}
if err := msg.Unmarshal(query.Data); err != nil {
return &Payload{}, err
}
func (w *grpcWorker) applyMessage(ctx context.Context, msg raftpb.Message) error {
rc := task.GetRootAsRaftContext(msg.Context, 0)
node := groups().Node(rc.Group())
node.Connect(msg.From, string(rc.Addr()))
......@@ -509,10 +599,32 @@ func (w *grpcWorker) RaftMessage(ctx context.Context, query *Payload) (*Payload,
select {
case <-ctx.Done():
return &Payload{}, ctx.Err()
return ctx.Err()
case err := <-c:
return &Payload{}, err
return err
}
}
func (w *grpcWorker) RaftMessage(ctx context.Context, query *Payload) (*Payload, error) {
if ctx.Err() != nil {
return &Payload{}, ctx.Err()
}
for idx := 0; idx < len(query.Data); {
len := int(binary.LittleEndian.Uint32(query.Data[idx : idx+4]))
idx += 4
msg := raftpb.Message{}
if err := msg.Unmarshal(query.Data[idx : idx+len]); err != nil {
x.Check(err)
}
fmt.Printf("RECEIVED: %v %v-->%v\n", msg.Type, msg.From, msg.To)
if err := w.applyMessage(ctx, msg); err != nil {
return &Payload{}, err
}
idx += len
}
// fmt.Printf("Got %d messages\n", count)
return &Payload{}, nil
}
func (w *grpcWorker) JoinCluster(ctx context.Context, query *Payload) (*Payload, error) {
......
......@@ -6,6 +6,7 @@ import (
"math/rand"
"sync"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
)
......@@ -39,19 +40,13 @@ func groups() *groupi {
return gr
}
func StartRaftNodes(raftId uint64, my, cluster, peer string) {
node := groups().InitNode(math.MaxUint32, raftId, my)
node.StartNode(cluster)
if len(peer) > 0 {
go node.JoinCluster(peer, ws)
}
func StartRaftNodes(wal *raftwal.Wal, raftId uint64, my, peer string) {
node := groups().newNode(math.MaxUint32, raftId, my)
go node.InitAndStartNode(wal, peer)
// Also create node for group zero, which would handle UID assignment.
node = groups().InitNode(0, raftId, my)
node.StartNode(cluster)
if len(peer) > 0 {
go node.JoinCluster(peer, ws)
}
node = groups().newNode(0, raftId, my)
go node.InitAndStartNode(wal, peer)
}
func (g *groupi) Node(groupId uint32) *node {
......@@ -69,7 +64,7 @@ func (g *groupi) ServesGroup(groupId uint32) bool {
return has
}
func (g *groupi) InitNode(groupId uint32, nodeId uint64, publicAddr string) *node {
func (g *groupi) newNode(groupId uint32, nodeId uint64, publicAddr string) *node {
g.Lock()
defer g.Unlock()
if g.local == nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment