Skip to content
Snippets Groups Projects
Unverified Commit c0908458 authored by Pawan Rawal's avatar Pawan Rawal
Browse files

Update ClusterSnapshot so that it works on Travis.

parent f2899c72
Branches
No related tags found
No related merge requests found
...@@ -144,6 +144,7 @@ func (l *loader) infinitelyRetry(req api.Mutation) { ...@@ -144,6 +144,7 @@ func (l *loader) infinitelyRetry(req api.Mutation) {
req.IgnoreIndexConflict = opt.ignoreIndexConflict req.IgnoreIndexConflict = opt.ignoreIndexConflict
_, err := txn.Mutate(l.opts.Ctx, &req) _, err := txn.Mutate(l.opts.Ctx, &req)
if err == nil { if err == nil {
atomic.AddUint64(&l.rdfs, uint64(len(req.Set)))
atomic.AddUint64(&l.txns, 1) atomic.AddUint64(&l.txns, 1)
return return
} }
...@@ -160,6 +161,7 @@ func (l *loader) request(req api.Mutation) { ...@@ -160,6 +161,7 @@ func (l *loader) request(req api.Mutation) {
_, err := txn.Mutate(l.opts.Ctx, &req) _, err := txn.Mutate(l.opts.Ctx, &req)
if err == nil { if err == nil {
atomic.AddUint64(&l.rdfs, uint64(len(req.Set)))
atomic.AddUint64(&l.txns, 1) atomic.AddUint64(&l.txns, 1)
return return
} }
......
...@@ -34,7 +34,6 @@ import ( ...@@ -34,7 +34,6 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -216,14 +215,12 @@ func (l *loader) processFile(ctx context.Context, file string) error { ...@@ -216,14 +215,12 @@ func (l *loader) processFile(ctx context.Context, file string) error {
if batchSize >= opt.numRdf { if batchSize >= opt.numRdf {
l.reqs <- mu l.reqs <- mu
atomic.AddUint64(&l.rdfs, uint64(batchSize))
batchSize = 0 batchSize = 0
mu = api.Mutation{} mu = api.Mutation{}
} }
} }
if batchSize > 0 { if batchSize > 0 {
l.reqs <- mu l.reqs <- mu
atomic.AddUint64(&l.rdfs, uint64(batchSize))
mu = api.Mutation{} mu = api.Mutation{}
} }
return nil return nil
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
...@@ -42,7 +43,7 @@ func waitForConvergence(t *testing.T, c *DgraphCluster) { ...@@ -42,7 +43,7 @@ func waitForConvergence(t *testing.T, c *DgraphCluster) {
var s State var s State
require.NoError(t, json.Unmarshal(b, &s)) require.NoError(t, json.Unmarshal(b, &s))
members := s.Groups["1"].Members members := s.Groups["1"].Members
if len(members) == 2 && (members["1"].Leader || members["2"].Leader) { if members["1"].Leader || members["2"].Leader {
break break
} }
...@@ -100,6 +101,7 @@ func matchExportCount(opts matchExport) error { ...@@ -100,6 +101,7 @@ func matchExportCount(opts matchExport) error {
if count != strconv.Itoa(opts.expectedSchema) { if count != strconv.Itoa(opts.expectedSchema) {
return x.Errorf("Schema export count mismatch. Got: %s", count) return x.Errorf("Schema export count mismatch. Got: %s", count)
} }
x.Println("Export count matched.")
return nil return nil
} }
...@@ -124,6 +126,20 @@ func waitForNodeToBeHealthy(t *testing.T, port int) { ...@@ -124,6 +126,20 @@ func waitForNodeToBeHealthy(t *testing.T, port int) {
} }
} }
func restart(cmd *exec.Cmd) error {
cmd.Process.Signal(syscall.SIGINT)
if _, err := cmd.Process.Wait(); err != nil {
return x.Errorf("Error while waiting for Dgraph process to be killed: %v", err)
}
cmd.Process = nil
fmt.Println("Trying to restart Dgraph Server")
if err := cmd.Start(); err != nil {
return x.Errorf("Couldn't start Dgraph server again: %v\n", err)
}
return nil
}
func TestClusterSnapshot(t *testing.T) { func TestClusterSnapshot(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skipping test in short mode") t.Skip("skipping test in short mode")
...@@ -153,6 +169,26 @@ func TestClusterSnapshot(t *testing.T) { ...@@ -153,6 +169,26 @@ func TestClusterSnapshot(t *testing.T) {
t.Fatalf("Live Loader didn't run: %v\n", err) t.Fatalf("Live Loader didn't run: %v\n", err)
} }
// So that snapshot happens and everything is persisted to disk.
if err := restart(cluster.dgraph); err != nil {
// shutdownCluster()
log.Fatal(err)
}
waitForNodeToBeHealthy(t, cluster.dgraphPortOffset+x.PortHTTP)
waitForConvergence(t, cluster)
// TODO(pawan) - Investigate why the test fails if we remove this export.
// The second export has less RDFs than it should if we don't do this export.
err = matchExportCount(matchExport{
expectedRDF: 2e5,
expectedSchema: 10,
dir: cluster.dir,
port: cluster.dgraphPortOffset + x.PortHTTP,
})
if err != nil {
// shutdownCluster()
t.Fatal(err)
}
// Start another Dgraph node. // Start another Dgraph node.
var dgraphDir = filepath.Join(tmpDir, "dgraph_2") var dgraphDir = filepath.Join(tmpDir, "dgraph_2")
n, err := cluster.AddNode(dgraphDir) n, err := cluster.AddNode(dgraphDir)
...@@ -180,18 +216,13 @@ func TestClusterSnapshot(t *testing.T) { ...@@ -180,18 +216,13 @@ func TestClusterSnapshot(t *testing.T) {
waitForNodeToBeHealthy(t, o+x.PortHTTP) waitForNodeToBeHealthy(t, o+x.PortHTTP)
cluster.dgraph.Process.Signal(syscall.SIGINT) // So that n becomes leader.
if _, err = cluster.dgraph.Process.Wait(); err != nil { if err := restart(cluster.dgraph); err != nil {
shutdownCluster()
t.Fatalf("Error while waiting for Dgraph process to be killed: %v", err)
}
cluster.dgraph.Process = nil
fmt.Println("Trying to restart Dgraph Server")
if err := cluster.dgraph.Start(); err != nil {
shutdownCluster() shutdownCluster()
t.Fatalf("Couldn't start Dgraph server again: %v\n", err) log.Fatal(err)
} }
// A better method would be to have a transfer leadership method.
time.Sleep(5 * time.Second)
waitForNodeToBeHealthy(t, cluster.dgraphPortOffset+x.PortHTTP) waitForNodeToBeHealthy(t, cluster.dgraphPortOffset+x.PortHTTP)
waitForNodeToBeHealthy(t, o+x.PortHTTP) waitForNodeToBeHealthy(t, o+x.PortHTTP)
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"path" "path"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
...@@ -201,10 +202,16 @@ func export(bdir string, readTs uint64) error { ...@@ -201,10 +202,16 @@ func export(bdir string, readTs uint64) error {
return err return err
} }
gid := groups().groupId() gid := groups().groupId()
fpath := path.Join(bdir, fmt.Sprintf("dgraph-%d-%s.rdf.gz", gid, fpath, err := filepath.Abs(path.Join(bdir, fmt.Sprintf("dgraph-%d-%s.rdf.gz", gid,
time.Now().Format("2006-01-02-15-04"))) time.Now().Format("2006-01-02-15-04"))))
fspath := path.Join(bdir, fmt.Sprintf("dgraph-%d-%s.schema.gz", gid, if err != nil {
time.Now().Format("2006-01-02-15-04"))) return err
}
fspath, err := filepath.Abs(path.Join(bdir, fmt.Sprintf("dgraph-%d-%s.schema.gz", gid,
time.Now().Format("2006-01-02-15-04"))))
if err != nil {
return err
}
x.Printf("Exporting to: %v, schema at %v\n", fpath, fspath) x.Printf("Exporting to: %v, schema at %v\n", fpath, fspath)
chb := make(chan []byte, 1000) chb := make(chan []byte, 1000)
errChan := make(chan error, 2) errChan := make(chan error, 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment