Skip to content
Snippets Groups Projects
Unverified Commit 2767bbb7 authored by Motakjuq's avatar Motakjuq Committed by Ashish Negi
Browse files

Improve backup performance

Add bench test for toRDF
parent 70fe355b
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@ import (
"math/rand"
"os"
"path"
"strconv"
"sync"
"time"
......@@ -28,7 +29,8 @@ type kv struct {
func toRDF(buf *bytes.Buffer, item kv) {
pl := item.list
for _, p := range pl.Postings {
x.Check2(buf.WriteString(item.prefix))
buf.WriteString(item.prefix)
if !bytes.Equal(p.Value, nil) {
// Value posting
// Convert to appropriate type
......@@ -37,35 +39,46 @@ func toRDF(buf *bytes.Buffer, item kv) {
src.Value = p.Value
str, err := types.Convert(src, types.StringID)
x.Check(err)
x.Check2(buf.WriteString(fmt.Sprintf("\"%s\"", str.Value)))
if types.TypeID(p.ValType) == types.GeoID {
x.Check2(buf.WriteString(fmt.Sprintf("^^<geo:geojson> ")))
} else if types.TypeID(p.ValType) == types.PasswordID {
x.Check2(buf.WriteString(fmt.Sprintf("^^<pwd:%s>", vID.Name())))
} else if types.TypeID(p.ValType) != types.BinaryID &&
types.TypeID(p.ValType) != types.DefaultID {
x.Check2(buf.WriteString(fmt.Sprintf("^^<xs:%s>", vID.Name())))
buf.WriteByte('"')
buf.WriteString(str.Value.(string))
buf.WriteByte('"')
if vID == types.GeoID {
buf.WriteString("^^<geo:geojson> ")
} else if vID == types.PasswordID {
buf.WriteString("^^<pwd:")
buf.WriteString(vID.Name())
buf.WriteByte('>')
} else if vID != types.BinaryID &&
vID != types.DefaultID {
buf.WriteString("^^<xs:")
buf.WriteString(vID.Name())
buf.WriteByte('>')
} else if len(p.Lang) > 0 {
x.Check2(buf.WriteString(fmt.Sprintf("@%s", p.Lang)))
buf.WriteByte('@')
buf.WriteString(p.Lang)
}
} else {
x.Check2(buf.WriteString(fmt.Sprintf("<%#x>", p.Uid)))
buf.WriteString("<0x")
buf.WriteString(strconv.FormatUint(p.Uid, 16))
buf.WriteByte('>')
}
facets := p.Facets
if len(facets) != 0 {
x.Check2(buf.WriteString(" ("))
}
for i, f := range facets {
x.Check2(buf.WriteString(fmt.Sprintf("%s=%s", f.Key, string(f.Value))))
if i != len(facets)-1 {
x.Check2(buf.WriteRune(','))
buf.WriteString(" (")
for i, f := range facets {
if i != 0 {
buf.WriteByte(',')
}
buf.WriteString(f.Key)
buf.WriteByte('=')
buf.WriteString(string(f.Value))
}
buf.WriteByte(')')
}
if len(facets) != 0 {
x.Check2(buf.WriteRune(')'))
}
x.Check2(buf.WriteString(" .\n"))
buf.WriteString(" .\n")
}
}
......@@ -143,6 +156,8 @@ func backup(gid uint32, bdir string) error {
it := pstore.NewIterator()
defer it.Close()
var lastPred string
prefix := new(bytes.Buffer)
prefix.Grow(100)
for it.SeekToFirst(); it.Valid(); {
key := it.Key().Data()
pk := x.Parse(key)
......@@ -175,13 +190,18 @@ func backup(gid uint32, bdir string) error {
continue
}
prefix := fmt.Sprintf("<%#x> <%s> ", uid, pred)
prefix.WriteString("<0x")
prefix.WriteString(strconv.FormatUint(uid, 16))
prefix.WriteString("> <")
prefix.WriteString(pred)
prefix.WriteString("> ")
pl := &types.PostingList{}
x.Check(pl.Unmarshal(it.Value().Data()))
chkv <- kv{
prefix: prefix,
prefix: prefix.String(),
list: pl,
}
prefix.Reset()
lastPred = pred
it.Next()
}
......
......@@ -2,7 +2,9 @@ package worker
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"io/ioutil"
"os"
"path/filepath"
......@@ -11,6 +13,8 @@ import (
"time"
"github.com/stretchr/testify/require"
geom "github.com/twpayne/go-geom"
"github.com/twpayne/go-geom/encoding/wkb"
"github.com/dgraph-io/dgraph/group"
"github.com/dgraph-io/dgraph/posting"
......@@ -19,6 +23,7 @@ import (
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
)
......@@ -168,6 +173,10 @@ func TestBackup(t *testing.T) {
require.Equal(t, "0x5", nq.ObjectId)
}
if nq.Subject == "0x2" && nq.Predicate == "name" {
require.Equal(t, "en", nq.Lang)
}
if nq.Subject == "0x4" {
require.Equal(t, "age", nq.Facets[0].Key)
require.Equal(t, "close", nq.Facets[1].Key)
......@@ -187,3 +196,95 @@ func TestBackup(t *testing.T) {
// This order will bw presereved due to file naming.
require.Equal(t, []int{4, 2}, counts)
}
func generateBenchValues() []kv {
byteInt := make([]byte, 4)
binary.LittleEndian.PutUint32(byteInt, 123)
fac := []*facets.Facet{
&facets.Facet{
Key: "facetTest",
Value: []byte("testVal"),
},
}
geoData, _ := wkb.Marshal(geom.NewPoint(geom.XY).MustSetCoords(geom.Coord{-122.082506, 37.4249518}), binary.LittleEndian)
// Posting_STRING Posting_ValType = 0
// Posting_BINARY Posting_ValType = 1
// Posting_INT32 Posting_ValType = 2
// Posting_FLOAT Posting_ValType = 3
// Posting_BOOL Posting_ValType = 4
// Posting_DATE Posting_ValType = 5
// Posting_DATETIME Posting_ValType = 6
// Posting_GEO Posting_ValType = 7
// Posting_UID Posting_ValType = 8
benchItems := []kv{
kv{
prefix: "testString",
list: &types.PostingList{
Postings: []*types.Posting{&types.Posting{
ValType: types.Posting_STRING,
Value: []byte("手機裡的眼淚"),
Uid: uint64(65454),
Facets: fac,
}},
},
},
kv{prefix: "testGeo",
list: &types.PostingList{
Postings: []*types.Posting{&types.Posting{
ValType: types.Posting_GEO,
Value: geoData,
Uid: uint64(65454),
Facets: fac,
}},
}},
kv{prefix: "testPassword",
list: &types.PostingList{
Postings: []*types.Posting{&types.Posting{
ValType: types.Posting_PASSWORD,
Value: []byte("test"),
Uid: uint64(65454),
Facets: fac,
}},
}},
kv{prefix: "testInt",
list: &types.PostingList{
Postings: []*types.Posting{&types.Posting{
ValType: types.Posting_INT32,
Value: byteInt,
Uid: uint64(65454),
Facets: fac,
}},
}},
kv{prefix: "testUid",
list: &types.PostingList{
Postings: []*types.Posting{&types.Posting{
ValType: types.Posting_INT32,
Uid: uint64(65454),
Facets: fac,
}},
}},
}
return benchItems
}
func BenchmarkToRDF(b *testing.B) {
buf := new(bytes.Buffer)
buf.Grow(50000)
items := generateBenchValues()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
toRDF(buf, items[0])
toRDF(buf, items[1])
toRDF(buf, items[2])
toRDF(buf, items[3])
toRDF(buf, items[4])
buf.Reset()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment