Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package worker
import (
"bufio"
"bytes"
"compress/gzip"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)
func populateGraphBackup(t *testing.T) {
edge := x.DirectedEdge{
ValueId: 5,
Source: "author0",
Timestamp: time.Now(),
Attribute: "friend",
}
edge.Entity = 1
addEdge(t, edge, getOrCreate(posting.Key(1, "friend")))
edge.Entity = 2
addEdge(t, edge, getOrCreate(posting.Key(2, "friend")))
edge.Entity = 3
addEdge(t, edge, getOrCreate(posting.Key(3, "friend")))
edge.Entity = 4
addEdge(t, edge, getOrCreate(posting.Key(4, "friend")))
edge.Entity = 1
edge.ValueId = 0
edge.Value = []byte("photon")
edge.Attribute = "name"
addEdge(t, edge, getOrCreate(posting.Key(1, "name")))
edge.Entity = 2
addEdge(t, edge, getOrCreate(posting.Key(2, "name")))
}
func initTestBackup(t *testing.T, schemaStr string) (string, *store.Store) {
schema.ParseBytes([]byte(schemaStr))
ParseGroupConfig("groups.conf")
dir, err := ioutil.TempDir("", "storetest_")
require.NoError(t, err)
ps, err := store.NewStore(dir)
require.NoError(t, err)
posting.Init(ps)
Init(ps)
populateGraphBackup(t)
time.Sleep(200 * time.Millisecond) // Let the index process jobs from channel.
return dir, ps
}
func TestBackup(t *testing.T) {
// Index the name predicate. We ensure it doesn't show up on backup.
dir, ps := initTestBackup(t, "scalar name:string @index")
defer os.RemoveAll(dir)
defer ps.Close()
// Remove already existing backup folders is any.
bdir := "backup_test"
os.RemoveAll(bdir)
defer os.RemoveAll(bdir)
posting.MergeLists(10)
// We have 4 friend type edges. FP("friends")%10 = 2.
err := backup(BelongsTo("friend"), bdir)
require.NoError(t, err)
// We have 2 name type edges(with index). FP("name")%10 =7.
err = backup(BelongsTo("name"), bdir)
require.NoError(t, err)
searchDir := bdir
fileList := []string{}
err = filepath.Walk(searchDir, func(path string, f os.FileInfo, err error) error {
if path != bdir {
fileList = append(fileList, path)
}
return nil
})
require.NoError(t, err)
var counts []int
for _, file := range fileList {
f, err := os.Open(file)
require.NoError(t, err)
r, err := gzip.NewReader(f)
require.NoError(t, err)
scanner := bufio.NewScanner(r)
count := 0
for scanner.Scan() {
nq, err := rdf.Parse(scanner.Text())
require.NoError(t, err)
// Subject should have uid 1/2/3/4.
require.Contains(t, []string{"_uid_:1", "_uid_:2", "_uid_:3", "_uid_:4"}, nq.Subject)
// The only value we set was "photon".
if !bytes.Equal(nq.ObjectValue, nil) {
require.Equal(t, []byte("photon"), nq.ObjectValue)
}
// The only objectId we set was uid 5.
if nq.ObjectId != "" {
require.Equal(t, "_uid_:5", nq.ObjectId)
}
count++
}
counts = append(counts, count)
require.NoError(t, scanner.Err())
}
// This order will bw presereved due to file naming.
require.Equal(t, []int{4, 2}, counts)
}