Skip to content
Snippets Groups Projects
export.go 12.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
     *
     * This program is free software: you can redistribute it and/or modify
     * it under the terms of the GNU Affero General Public License as published by
     * the Free Software Foundation, either version 3 of the License, or
     * (at your option) any later version.
     *
     * This program is distributed in the hope that it will be useful,
     * but WITHOUT ANY WARRANTY; without even the implied warranty of
     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     * GNU Affero General Public License for more details.
     *
     * You should have received a copy of the GNU Affero General Public License
     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
     */
    
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    package worker
    
    import (
    	"bufio"
    	"bytes"
    	"compress/gzip"
    	"fmt"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"os"
    	"path"
    
    Motakjuq's avatar
    Motakjuq committed
    	"strconv"
    
    	"strings"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"sync"
    	"time"
    
    
    	"github.com/dgraph-io/badger"
    
    	"golang.org/x/net/context"
    	"google.golang.org/grpc"
    
    
    	"github.com/dgraph-io/dgraph/group"
    
    	"github.com/dgraph-io/dgraph/posting"
    
    	"github.com/dgraph-io/dgraph/protos"
    
    Manish R Jain's avatar
    Manish R Jain committed
    	"github.com/dgraph-io/dgraph/types"
    
    	"github.com/dgraph-io/dgraph/types/facets"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"github.com/dgraph-io/dgraph/x"
    
    	"golang.org/x/net/trace"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    
    type kv struct {
    
    type skv struct {
    	attr   string
    
    // Map from our types to RDF type. Useful when writing storage types
    
    // for RDF's in export. This is the dgraph type name and rdf storage type
    
    // might not be the same always (e.g. - datetime and bool).
    var rdfTypeMap = map[types.TypeID]string{
    	types.StringID:   "xs:string",
    	types.DateTimeID: "xs:dateTime",
    	types.IntID:      "xs:int",
    	types.FloatID:    "xs:float",
    	types.BoolID:     "xs:boolean",
    	types.GeoID:      "geo:geojson",
    	types.PasswordID: "pwd:password",
    
    	types.BinaryID:   "xs:base64Binary",
    
    func toRDF(buf *bytes.Buffer, item kv) {
    
    	var pitr posting.PIterator
    	pitr.Init(pl, 0)
    	for ; pitr.Valid(); pitr.Next() {
    		p := pitr.Posting()
    
    Motakjuq's avatar
    Motakjuq committed
    		buf.WriteString(item.prefix)
    
    		if !bytes.Equal(p.Value, nil) {
    
    			// Value posting
    			// Convert to appropriate type
    
    Manish R Jain's avatar
    Manish R Jain committed
    			vID := types.TypeID(p.ValType)
    			src := types.ValueForType(vID)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			src.Value = p.Value
    
    			str, err := types.Convert(src, types.StringID)
    			x.Check(err)
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteByte('"')
    			buf.WriteString(str.Value.(string))
    			buf.WriteByte('"')
    
    			if p.PostingType == protos.Posting_VALUE_LANG {
    
    				buf.WriteByte('@')
    				buf.WriteString(string(p.Metadata))
    
    			} else if vID != types.DefaultID {
    
    				rdfType, ok := rdfTypeMap[vID]
    				x.AssertTruef(ok, "Didn't find RDF type for dgraph type: %+v", vID.Name())
    				buf.WriteString("^^<")
    				buf.WriteString(rdfType)
    
    Motakjuq's avatar
    Motakjuq committed
    				buf.WriteByte('>')
    
    		} else {
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteString(strconv.FormatUint(p.Uid, 16))
    			buf.WriteByte('>')
    
    		// Label
    		if len(p.Label) > 0 {
    			buf.WriteString(" <")
    			buf.WriteString(p.Label)
    			buf.WriteByte('>')
    		}
    		// Facets.
    
    		fcs := p.Facets
    		if len(fcs) != 0 {
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteString(" (")
    
    			for i, f := range fcs {
    
    Motakjuq's avatar
    Motakjuq committed
    				if i != 0 {
    					buf.WriteByte(',')
    				}
    				buf.WriteString(f.Key)
    				buf.WriteByte('=')
    
    				fVal := &types.Val{Tid: types.StringID}
    
    				x.Check(types.Marshal(facets.ValFor(f), fVal))
    
    				if facets.TypeIDFor(f) == types.StringID {
    					buf.WriteByte('"')
    					buf.WriteString(fVal.Value.(string))
    					buf.WriteByte('"')
    				} else {
    					buf.WriteString(fVal.Value.(string))
    				}
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteByte(')')
    
    Motakjuq's avatar
    Motakjuq committed
    		buf.WriteString(" .\n")
    
    func toSchema(buf *bytes.Buffer, s *skv) {
    
    	if strings.ContainsRune(s.attr, ':') {
    		buf.WriteRune('<')
    		buf.WriteString(s.attr)
    		buf.WriteRune('>')
    	} else {
    		buf.WriteString(s.attr)
    	}
    
    	buf.WriteByte(':')
    	buf.WriteString(types.TypeID(s.schema.ValueType).Name())
    
    	if s.schema.Directive == protos.SchemaUpdate_REVERSE {
    
    		buf.WriteString(" @reverse")
    
    	} else if s.schema.Directive == protos.SchemaUpdate_INDEX && len(s.schema.Tokenizer) > 0 {
    
    		buf.WriteString(" @index(")
    		buf.WriteString(strings.Join(s.schema.Tokenizer, ","))
    		buf.WriteByte(')')
    	}
    
    	buf.WriteString(" . \n")
    
    func writeToFile(fpath string, ch chan []byte) error {
    	f, err := os.Create(fpath)
    	if err != nil {
    		return err
    	}
    
    	defer f.Close()
    	x.Check(err)
    	w := bufio.NewWriterSize(f, 1000000)
    	gw, err := gzip.NewWriterLevel(w, gzip.BestCompression)
    	if err != nil {
    		return err
    	}
    
    	for buf := range ch {
    		if _, err := gw.Write(buf); err != nil {
    			return err
    		}
    	}
    	if err := gw.Flush(); err != nil {
    		return err
    	}
    	if err := gw.Close(); err != nil {
    		return err
    	}
    	return w.Flush()
    }
    
    
    // Export creates a export of data by exporting it as an RDF gzip.
    func export(gid uint32, bdir string) error {
    
    	// Use a goroutine to write to file.
    	err := os.MkdirAll(bdir, 0700)
    	if err != nil {
    		return err
    	}
    	fpath := path.Join(bdir, fmt.Sprintf("dgraph-%d-%s.rdf.gz", gid,
    		time.Now().Format("2006-01-02-15-04")))
    
    	fspath := path.Join(bdir, fmt.Sprintf("dgraph-schema-%d-%s.rdf.gz", gid,
    		time.Now().Format("2006-01-02-15-04")))
    
    	x.Printf("Exporting to: %v, schema at %v\n", fpath, fspath)
    
    	chb := make(chan []byte, 1000)
    
    	errChan := make(chan error, 2)
    
    	go func() {
    		errChan <- writeToFile(fpath, chb)
    	}()
    
    	chsb := make(chan []byte, 1000)
    	go func() {
    		errChan <- writeToFile(fspath, chsb)
    	}()
    
    	// Use a bunch of goroutines to convert to RDF format.
    	chkv := make(chan kv, 1000)
    	var wg sync.WaitGroup
    
    	wg.Add(numExportRoutines)
    	for i := 0; i < numExportRoutines; i++ {
    
    		go func(i int) {
    
    			buf := new(bytes.Buffer)
    			buf.Grow(50000)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			for item := range chkv {
    
    				toRDF(buf, item)
    				if buf.Len() >= 40000 {
    
    					tmp := make([]byte, buf.Len())
    					copy(tmp, buf.Bytes())
    					chb <- tmp
    
    					buf.Reset()
    				}
    			}
    			if buf.Len() > 0 {
    
    				tmp := make([]byte, buf.Len())
    				copy(tmp, buf.Bytes())
    				chb <- tmp
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			}
    			wg.Done()
    
    	// Use a goroutine to convert protos.Schema to string
    
    	chs := make(chan *skv, 1000)
    	wg.Add(1)
    	go func() {
    		buf := new(bytes.Buffer)
    		buf.Grow(50000)
    		for item := range chs {
    			toSchema(buf, item)
    			if buf.Len() >= 40000 {
    				tmp := make([]byte, buf.Len())
    				copy(tmp, buf.Bytes())
    				chsb <- tmp
    				buf.Reset()
    			}
    		}
    		if buf.Len() > 0 {
    			tmp := make([]byte, buf.Len())
    			copy(tmp, buf.Bytes())
    			chsb <- tmp
    		}
    		wg.Done()
    	}()
    
    
    	// Iterate over rocksdb.
    
    	it := pstore.NewIterator(badger.DefaultIteratorOptions)
    
    	defer it.Close()
    	var lastPred string
    
    Motakjuq's avatar
    Motakjuq committed
    	prefix := new(bytes.Buffer)
    	prefix.Grow(100)
    
    	var debugCount int
    	for it.Rewind(); it.Valid(); debugCount++ {
    		item := it.Item()
    		key := item.Key()
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			// Seek to the end of index keys.
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			continue
    		}
    
    		if pk.IsReverse() {
    			// Seek to the end of reverse keys.
    			it.Seek(pk.SkipRangeOfSameType())
    			continue
    		}
    
    		if pk.Attr == "_uid_" || pk.Attr == "_predicate_" ||
    			pk.Attr == "_lease_" {
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			// Skip the UID mappings.
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			continue
    		}
    
    		if pk.IsSchema() {
    
    			if group.BelongsTo(pk.Attr) == gid {
    
    				x.Check(s.Unmarshal(item.Value()))
    
    				chs <- &skv{
    					attr:   pk.Attr,
    					schema: s,
    				}
    			}
    			// skip predicate
    			it.Next()
    
    		x.AssertTrue(pk.IsData())
    		pred, uid := pk.Attr, pk.Uid
    
    		if pred != lastPred && group.BelongsTo(pred) != gid {
    
    Motakjuq's avatar
    Motakjuq committed
    		prefix.WriteString(strconv.FormatUint(uid, 16))
    		prefix.WriteString("> <")
    		prefix.WriteString(pred)
    		prefix.WriteString("> ")
    
    		x.Check(pl.Unmarshal(item.Value()))
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    		chkv <- kv{
    
    Motakjuq's avatar
    Motakjuq committed
    			prefix: prefix.String(),
    
    Motakjuq's avatar
    Motakjuq committed
    		prefix.Reset()
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    		lastPred = pred
    		it.Next()
    	}
    
    
    	close(chkv) // We have stopped output to chkv.
    
    	close(chs)  // we have stopped output to chs (schema)
    
    	wg.Wait()   // Wait for numExportRoutines to finish.
    
    	close(chb)  // We have stopped output to chb.
    
    	close(chsb) // we have stopped output to chs (schema)
    
    	err = <-errChan
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	return err
    }
    
    
    func handleExportForGroup(ctx context.Context, reqId uint64, gid uint32) *protos.ExportPayload {
    
    	n := groups().Node(gid)
    	if n.AmLeader() {
    
    		if tr, ok := trace.FromContext(ctx); ok {
    
    			tr.LazyPrintf("Leader of group: %d. Running export.", gid)
    
    		if err := export(gid, Config.ExportPath); err != nil {
    
    			if tr, ok := trace.FromContext(ctx); ok {
    				tr.LazyPrintf(err.Error())
    			}
    
    				Status: protos.ExportPayload_FAILED,
    
    		if tr, ok := trace.FromContext(ctx); ok {
    
    			tr.LazyPrintf("Export done for group: %d.", gid)
    
    			Status:  protos.ExportPayload_SUCCESS,
    
    	// I'm not the leader. Relay to someone who I think is.
    	var addrs []string
    	{
    		// Try in order: leader of given group, any server from given group, leader of group zero.
    		_, addr := groups().Leader(gid)
    		addrs = append(addrs, addr)
    		addrs = append(addrs, groups().AnyServer(gid))
    		_, addr = groups().Leader(0)
    		addrs = append(addrs, addr)
    	}
    
    	var conn *grpc.ClientConn
    	for _, addr := range addrs {
    
    		pl, err := pools().get(addr)
    		if err != nil {
    
    			if tr, ok := trace.FromContext(ctx); ok {
    
    				tr.LazyPrintf(err.Error())
    
    		if tr, ok := trace.FromContext(ctx); ok {
    
    			tr.LazyPrintf("Relaying export request for group %d to %q", gid, pl.Addr)
    
    	// Unable to find any connection to any of these servers. This should be exceedingly rare.
    
    	// But probably not worthy of crashing the server. We can just skip the export.
    
    		if tr, ok := trace.FromContext(ctx); ok {
    
    			tr.LazyPrintf("Unable to find a server to export group: %d", gid)
    
    			Status:  protos.ExportPayload_FAILED,
    
    	defer pools().release(pl)
    
    	c := protos.NewWorkerClient(conn)
    
    		ReqId:   reqId,
    		GroupId: gid,
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	if err != nil {
    
    		if tr, ok := trace.FromContext(ctx); ok {
    			tr.LazyPrintf(err.Error())
    		}
    
    			Status:  protos.ExportPayload_FAILED,
    
    // Export request is used to trigger exports for the request list of groups.
    // If a server receives request to export a group that it doesn't handle, it would
    
    // automatically relay that request to the server that it thinks should handle the request.
    
    func (w *grpcWorker) Export(ctx context.Context, req *protos.ExportPayload) (*protos.ExportPayload, error) {
    	reply := &protos.ExportPayload{ReqId: req.ReqId}
    	reply.Status = protos.ExportPayload_FAILED // Set by default.
    
    
    	if ctx.Err() != nil {
    		return reply, ctx.Err()
    	}
    	if !w.addIfNotPresent(req.ReqId) {
    
    		reply.Status = protos.ExportPayload_DUPLICATE
    
    	chb := make(chan *protos.ExportPayload, 1)
    
    		chb <- handleExportForGroup(ctx, req.ReqId, req.GroupId)
    
    	}()
    
    	select {
    	case rep := <-chb:
    		return rep, nil
    	case <-ctx.Done():
    		return reply, ctx.Err()
    	}
    }
    
    
    func ExportOverNetwork(ctx context.Context) error {
    	// If we haven't even had a single membership update, don't run export.
    
    	if err := x.HealthCheck(); err != nil {
    
    		if tr, ok := trace.FromContext(ctx); ok {
    
    			tr.LazyPrintf("Request rejected %v", err)
    
    	// Let's first collect all groups.
    	gids := groups().KnownGroups()
    
    	for i, gid := range gids {
    
    			gids[i] = gids[len(gids)-1]
    			gids = gids[:len(gids)-1]
    
    	ch := make(chan *protos.ExportPayload, len(gids))
    
    	for _, gid := range gids {
    
    		go func(group uint32) {
    			reqId := uint64(rand.Int63())
    
    			ch <- handleExportForGroup(ctx, reqId, group)
    
    		}(gid)
    	}
    
    	for i := 0; i < len(gids); i++ {
    		bp := <-ch
    
    		if bp.Status != protos.ExportPayload_SUCCESS {
    
    			if tr, ok := trace.FromContext(ctx); ok {
    
    				tr.LazyPrintf("Export status: %v for group id: %d", bp.Status, bp.GroupId)
    
    			return fmt.Errorf("Export status: %v for group id: %d", bp.Status, bp.GroupId)
    
    		if tr, ok := trace.FromContext(ctx); ok {
    
    			tr.LazyPrintf("Export successful for group: %v", bp.GroupId)
    
    		}
    	}
    	if tr, ok := trace.FromContext(ctx); ok {