Newer
Older
package worker
import (
"bufio"
"compress/gzip"
"io/ioutil"
"os"
"path/filepath"
"github.com/stretchr/testify/require"
"github.com/dgraph-io/dgraph/query/graph"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/store"
Manish R Jain
committed
"github.com/dgraph-io/dgraph/x"
friendFacets := map[string]string{
"since": "2005-05-02T15:04:05", "close": "true", "age": "33"}
edge := &task.DirectedEdge{
ValueId: 5,
Label: "author0",
Attr: "friend",
Manish R Jain
committed
addEdge(t, edge, getOrCreate(x.DataKey("friend", 1)))
Manish R Jain
committed
addEdge(t, edge, getOrCreate(x.DataKey("friend", 2)))
Manish R Jain
committed
addEdge(t, edge, getOrCreate(x.DataKey("friend", 3)))
//Add an edge with facet.
addEdgeToUID(t, "friend", 4, 5, friendFacets)
Manish R Jain
committed
addEdge(t, edge, getOrCreate(x.DataKey("name", 1)))
Manish R Jain
committed
addEdge(t, edge, getOrCreate(x.DataKey("name", 2)))
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
func makeFacets(facetKVs map[string]string) (fs []*facets.Facet, err error) {
if len(facetKVs) == 0 {
return nil, nil
}
allKeys := make([]string, 0, len(facetKVs))
for k := range facetKVs {
allKeys = append(allKeys, k)
}
sort.Strings(allKeys)
for _, k := range allKeys {
v := facetKVs[k]
typ, err := facets.ValType(v)
if err != nil {
return nil, err
}
fs = append(fs, &facets.Facet{
k,
[]byte(v),
typ,
})
}
return fs, nil
}
func addEdgeToUID(t *testing.T, attr string, src uint64,
dst uint64, facetKVs map[string]string) {
fs, err := makeFacets(facetKVs)
require.NoError(t, err)
edge := &task.DirectedEdge{
ValueId: dst,
Label: "testing",
Attr: attr,
Entity: src,
Op: task.DirectedEdge_SET,
Facets: fs,
}
addEdge(t, edge, getOrCreate(x.DataKey(attr, src)))
}
func initTestBackup(t *testing.T, schemaStr string) (string, *store.Store) {
schema.ParseBytes([]byte(schemaStr))
dir, err := ioutil.TempDir("", "storetest_")
require.NoError(t, err)
ps, err := store.NewStore(dir)
require.NoError(t, err)
posting.Init(ps)
Init(ps)
populateGraphBackup(t)
time.Sleep(200 * time.Millisecond) // Let the index process jobs from channel.
return dir, ps
}
func TestBackup(t *testing.T) {
// Index the name predicate. We ensure it doesn't show up on backup.
dir, ps := initTestBackup(t, "scalar name:string @index")
defer os.RemoveAll(dir)
defer ps.Close()
// Remove already existing backup folders is any.
bdir, err := ioutil.TempDir("", "backup")
require.NoError(t, err)
Manish R Jain
committed
posting.CommitLists(10)
// We have 4 friend type edges. FP("friends")%10 = 2.
err = backup(group.BelongsTo("friend"), bdir)
require.NoError(t, err)
// We have 2 name type edges(with index). FP("name")%10 =7.
err = backup(group.BelongsTo("name"), bdir)
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
require.NoError(t, err)
searchDir := bdir
fileList := []string{}
err = filepath.Walk(searchDir, func(path string, f os.FileInfo, err error) error {
if path != bdir {
fileList = append(fileList, path)
}
return nil
})
require.NoError(t, err)
var counts []int
for _, file := range fileList {
f, err := os.Open(file)
require.NoError(t, err)
r, err := gzip.NewReader(f)
require.NoError(t, err)
scanner := bufio.NewScanner(r)
count := 0
for scanner.Scan() {
nq, err := rdf.Parse(scanner.Text())
require.NoError(t, err)
// Subject should have uid 1/2/3/4.
require.Contains(t, []string{"0x1", "0x2", "0x3", "0x4"}, nq.Subject)
if nq.ObjectValue != nil {
require.Equal(t, &graph.Value{&graph.Value_DefaultVal{"pho\\ton"}},
nq.ObjectValue)
}
// The only objectId we set was uid 5.
if nq.ObjectId != "" {
require.Equal(t, "0x5", nq.ObjectId)
if nq.Subject == "0x4" {
require.Equal(t, "age", nq.Facets[0].Key)
require.Equal(t, "close", nq.Facets[1].Key)
require.Equal(t, "since", nq.Facets[2].Key)
require.Equal(t, "33", string(nq.Facets[0].Value))
require.Equal(t, "true", string(nq.Facets[1].Value))
require.Equal(t, "2005-05-02T15:04:05", string(nq.Facets[2].Value))
require.Equal(t, 1, int(nq.Facets[0].ValType))
require.Equal(t, 3, int(nq.Facets[1].ValType))
require.Equal(t, 4, int(nq.Facets[2].ValType))
}
count++
}
counts = append(counts, count)
require.NoError(t, scanner.Err())
}
// This order will bw presereved due to file naming.
require.Equal(t, []int{4, 2}, counts)
}