Skip to content
Snippets Groups Projects
backup_test.go 7.06 KiB
Newer Older
  • Learn to ignore specific revisions
  • Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    package worker
    
    import (
    	"bufio"
    
    Motakjuq's avatar
    Motakjuq committed
    	"bytes"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"compress/gzip"
    
    Motakjuq's avatar
    Motakjuq committed
    	"encoding/binary"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"io/ioutil"
    	"os"
    	"path/filepath"
    
    	"sort"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"testing"
    	"time"
    
    
    	"github.com/stretchr/testify/require"
    
    Motakjuq's avatar
    Motakjuq committed
    	geom "github.com/twpayne/go-geom"
    	"github.com/twpayne/go-geom/encoding/wkb"
    
    	"github.com/dgraph-io/dgraph/group"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"github.com/dgraph-io/dgraph/posting"
    
    	"github.com/dgraph-io/dgraph/query/graph"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"github.com/dgraph-io/dgraph/rdf"
    	"github.com/dgraph-io/dgraph/schema"
    	"github.com/dgraph-io/dgraph/store"
    
    	"github.com/dgraph-io/dgraph/task"
    
    Motakjuq's avatar
    Motakjuq committed
    	"github.com/dgraph-io/dgraph/types"
    
    	"github.com/dgraph-io/dgraph/types/facets"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    )
    
    func populateGraphBackup(t *testing.T) {
    
    	friendFacets := map[string]string{
    		"since": "2005-05-02T15:04:05", "close": "true", "age": "33"}
    
    	edge := &task.DirectedEdge{
    		ValueId: 5,
    		Label:   "author0",
    		Attr:    "friend",
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	}
    	edge.Entity = 1
    
    	addEdge(t, edge, getOrCreate(x.DataKey("friend", 1)))
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    	edge.Entity = 2
    
    	addEdge(t, edge, getOrCreate(x.DataKey("friend", 2)))
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    	edge.Entity = 3
    
    	addEdge(t, edge, getOrCreate(x.DataKey("friend", 3)))
    
    	//Add an edge with facet.
    	addEdgeToUID(t, "friend", 4, 5, friendFacets)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    	edge.Entity = 1
    	edge.ValueId = 0
    
    Pawan Rawal's avatar
    Pawan Rawal committed
    	edge.Value = []byte("pho\\ton")
    
    	edge.Attr = "name"
    
    	addEdge(t, edge, getOrCreate(x.DataKey("name", 1)))
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    	edge.Entity = 2
    
    	addEdge(t, edge, getOrCreate(x.DataKey("name", 2)))
    
    func makeFacets(facetKVs map[string]string) (fs []*facets.Facet, err error) {
    	if len(facetKVs) == 0 {
    		return nil, nil
    	}
    	allKeys := make([]string, 0, len(facetKVs))
    	for k := range facetKVs {
    		allKeys = append(allKeys, k)
    	}
    	sort.Strings(allKeys)
    
    	for _, k := range allKeys {
    		v := facetKVs[k]
    		typ, err := facets.ValType(v)
    		if err != nil {
    			return nil, err
    		}
    		fs = append(fs, &facets.Facet{
    			k,
    			[]byte(v),
    			typ,
    		})
    	}
    	return fs, nil
    }
    
    func addEdgeToUID(t *testing.T, attr string, src uint64,
    	dst uint64, facetKVs map[string]string) {
    	fs, err := makeFacets(facetKVs)
    	require.NoError(t, err)
    	edge := &task.DirectedEdge{
    		ValueId: dst,
    		Label:   "testing",
    		Attr:    attr,
    		Entity:  src,
    		Op:      task.DirectedEdge_SET,
    		Facets:  fs,
    	}
    	addEdge(t, edge, getOrCreate(x.DataKey(attr, src)))
    }
    
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    func initTestBackup(t *testing.T, schemaStr string) (string, *store.Store) {
    	schema.ParseBytes([]byte(schemaStr))
    
    	group.ParseGroupConfig("groups.conf")
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    	dir, err := ioutil.TempDir("", "storetest_")
    	require.NoError(t, err)
    
    	ps, err := store.NewStore(dir)
    	require.NoError(t, err)
    
    	posting.Init(ps)
    	Init(ps)
    	populateGraphBackup(t)
    	time.Sleep(200 * time.Millisecond) // Let the index process jobs from channel.
    
    	return dir, ps
    }
    
    func TestBackup(t *testing.T) {
    	// Index the name predicate. We ensure it doesn't show up on backup.
    	dir, ps := initTestBackup(t, "scalar name:string @index")
    	defer os.RemoveAll(dir)
    	defer ps.Close()
    	// Remove already existing backup folders is any.
    
    	bdir, err := ioutil.TempDir("", "backup")
    	require.NoError(t, err)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	defer os.RemoveAll(bdir)
    
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	time.Sleep(time.Second)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    	// We have 4 friend type edges. FP("friends")%10 = 2.
    
    	err = backup(group.BelongsTo("friend"), bdir)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	require.NoError(t, err)
    
    	// We have 2 name type edges(with index). FP("name")%10 =7.
    
    	err = backup(group.BelongsTo("name"), bdir)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	require.NoError(t, err)
    
    	searchDir := bdir
    	fileList := []string{}
    	err = filepath.Walk(searchDir, func(path string, f os.FileInfo, err error) error {
    		if path != bdir {
    			fileList = append(fileList, path)
    		}
    		return nil
    	})
    	require.NoError(t, err)
    
    	var counts []int
    	for _, file := range fileList {
    		f, err := os.Open(file)
    		require.NoError(t, err)
    
    		r, err := gzip.NewReader(f)
    		require.NoError(t, err)
    
    		scanner := bufio.NewScanner(r)
    		count := 0
    		for scanner.Scan() {
    			nq, err := rdf.Parse(scanner.Text())
    			require.NoError(t, err)
    			// Subject should have uid 1/2/3/4.
    
    			require.Contains(t, []string{"0x1", "0x2", "0x3", "0x4"}, nq.Subject)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			// The only value we set was "photon".
    
    			if nq.ObjectValue != nil {
    
    				require.Equal(t, &graph.Value{&graph.Value_DefaultVal{"pho\\ton"}},
    					nq.ObjectValue)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			}
    			// The only objectId we set was uid 5.
    			if nq.ObjectId != "" {
    
    				require.Equal(t, "0x5", nq.ObjectId)
    
    Motakjuq's avatar
    Motakjuq committed
    			if nq.Subject == "0x2" && nq.Predicate == "name" {
    				require.Equal(t, "en", nq.Lang)
    			}
    
    
    			if nq.Subject == "0x4" {
    				require.Equal(t, "age", nq.Facets[0].Key)
    				require.Equal(t, "close", nq.Facets[1].Key)
    				require.Equal(t, "since", nq.Facets[2].Key)
    				require.Equal(t, "33", string(nq.Facets[0].Value))
    				require.Equal(t, "true", string(nq.Facets[1].Value))
    				require.Equal(t, "2005-05-02T15:04:05", string(nq.Facets[2].Value))
    				require.Equal(t, 1, int(nq.Facets[0].ValType))
    				require.Equal(t, 3, int(nq.Facets[1].ValType))
    				require.Equal(t, 4, int(nq.Facets[2].ValType))
    			}
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			count++
    		}
    		counts = append(counts, count)
    		require.NoError(t, scanner.Err())
    	}
    	// This order will bw presereved due to file naming.
    	require.Equal(t, []int{4, 2}, counts)
    }
    
    Motakjuq's avatar
    Motakjuq committed
    
    func generateBenchValues() []kv {
    	byteInt := make([]byte, 4)
    	binary.LittleEndian.PutUint32(byteInt, 123)
    
    	fac := []*facets.Facet{
    		&facets.Facet{
    			Key:   "facetTest",
    			Value: []byte("testVal"),
    		},
    	}
    
    	geoData, _ := wkb.Marshal(geom.NewPoint(geom.XY).MustSetCoords(geom.Coord{-122.082506, 37.4249518}), binary.LittleEndian)
    
    	// Posting_STRING   Posting_ValType = 0
    	// Posting_BINARY   Posting_ValType = 1
    	// Posting_INT32    Posting_ValType = 2
    	// Posting_FLOAT    Posting_ValType = 3
    	// Posting_BOOL     Posting_ValType = 4
    	// Posting_DATE     Posting_ValType = 5
    	// Posting_DATETIME Posting_ValType = 6
    	// Posting_GEO      Posting_ValType = 7
    	// Posting_UID      Posting_ValType = 8
    	benchItems := []kv{
    		kv{
    			prefix: "testString",
    			list: &types.PostingList{
    				Postings: []*types.Posting{&types.Posting{
    					ValType: types.Posting_STRING,
    					Value:   []byte("手機裡的眼淚"),
    					Uid:     uint64(65454),
    					Facets:  fac,
    				}},
    			},
    		},
    		kv{prefix: "testGeo",
    			list: &types.PostingList{
    				Postings: []*types.Posting{&types.Posting{
    					ValType: types.Posting_GEO,
    					Value:   geoData,
    					Uid:     uint64(65454),
    					Facets:  fac,
    				}},
    			}},
    		kv{prefix: "testPassword",
    			list: &types.PostingList{
    				Postings: []*types.Posting{&types.Posting{
    					ValType: types.Posting_PASSWORD,
    					Value:   []byte("test"),
    					Uid:     uint64(65454),
    					Facets:  fac,
    				}},
    			}},
    		kv{prefix: "testInt",
    			list: &types.PostingList{
    				Postings: []*types.Posting{&types.Posting{
    					ValType: types.Posting_INT32,
    					Value:   byteInt,
    					Uid:     uint64(65454),
    					Facets:  fac,
    				}},
    			}},
    		kv{prefix: "testUid",
    			list: &types.PostingList{
    				Postings: []*types.Posting{&types.Posting{
    					ValType: types.Posting_INT32,
    					Uid:     uint64(65454),
    					Facets:  fac,
    				}},
    			}},
    	}
    
    	return benchItems
    }
    
    func BenchmarkToRDF(b *testing.B) {
    	buf := new(bytes.Buffer)
    	buf.Grow(50000)
    
    	items := generateBenchValues()
    
    	b.ReportAllocs()
    	b.ResetTimer()
    	for i := 0; i < b.N; i++ {
    		toRDF(buf, items[0])
    		toRDF(buf, items[1])
    		toRDF(buf, items[2])
    		toRDF(buf, items[3])
    		toRDF(buf, items[4])
    		buf.Reset()
    	}
    }