Skip to content
Snippets Groups Projects
backup.go 10.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
     *
     * This program is free software: you can redistribute it and/or modify
     * it under the terms of the GNU Affero General Public License as published by
     * the Free Software Foundation, either version 3 of the License, or
     * (at your option) any later version.
     *
     * This program is distributed in the hope that it will be useful,
     * but WITHOUT ANY WARRANTY; without even the implied warranty of
     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     * GNU Affero General Public License for more details.
     *
     * You should have received a copy of the GNU Affero General Public License
     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
     */
    
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    package worker
    
    import (
    	"bufio"
    	"bytes"
    	"compress/gzip"
    	"fmt"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"os"
    	"path"
    
    Motakjuq's avatar
    Motakjuq committed
    	"strconv"
    
    	"strings"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"sync"
    	"time"
    
    
    	"github.com/dgraph-io/dgraph/group"
    
    	"github.com/dgraph-io/dgraph/protos/typesp"
    	"github.com/dgraph-io/dgraph/protos/workerp"
    
    Manish R Jain's avatar
    Manish R Jain committed
    	"github.com/dgraph-io/dgraph/types"
    
    	"github.com/dgraph-io/dgraph/types/facets"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	"github.com/dgraph-io/dgraph/x"
    
    	"golang.org/x/net/context"
    	"google.golang.org/grpc"
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    )
    
    const numBackupRoutines = 10
    
    type kv struct {
    
    	list   *typesp.PostingList
    
    type skv struct {
    	attr   string
    	schema *typesp.Schema
    }
    
    
    func toRDF(buf *bytes.Buffer, item kv) {
    
    	for _, p := range pl.Postings {
    
    Motakjuq's avatar
    Motakjuq committed
    		buf.WriteString(item.prefix)
    
    
    		if !bytes.Equal(p.Value, nil) {
    
    			// Value posting
    			// Convert to appropriate type
    
    Manish R Jain's avatar
    Manish R Jain committed
    			vID := types.TypeID(p.ValType)
    			src := types.ValueForType(vID)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			src.Value = p.Value
    
    			str, err := types.Convert(src, types.StringID)
    			x.Check(err)
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteByte('"')
    			buf.WriteString(str.Value.(string))
    			buf.WriteByte('"')
    			if vID == types.GeoID {
    				buf.WriteString("^^<geo:geojson> ")
    			} else if vID == types.PasswordID {
    				buf.WriteString("^^<pwd:")
    				buf.WriteString(vID.Name())
    				buf.WriteByte('>')
    
    			} else if p.PostingType == typesp.Posting_VALUE_LANG {
    
    				buf.WriteByte('@')
    				buf.WriteString(string(p.Metadata))
    
    Motakjuq's avatar
    Motakjuq committed
    			} else if vID != types.BinaryID &&
    				vID != types.DefaultID {
    				buf.WriteString("^^<xs:")
    				buf.WriteString(vID.Name())
    				buf.WriteByte('>')
    
    		} else {
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteString("<0x")
    			buf.WriteString(strconv.FormatUint(p.Uid, 16))
    			buf.WriteByte('>')
    
    		// Label
    		if len(p.Label) > 0 {
    			buf.WriteString(" <")
    			buf.WriteString(p.Label)
    			buf.WriteByte('>')
    		}
    		// Facets.
    
    		fcs := p.Facets
    		if len(fcs) != 0 {
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteString(" (")
    
    			for i, f := range fcs {
    
    Motakjuq's avatar
    Motakjuq committed
    				if i != 0 {
    					buf.WriteByte(',')
    				}
    				buf.WriteString(f.Key)
    				buf.WriteByte('=')
    
    				fVal := &types.Val{Tid: types.StringID}
    
    				x.Check(types.Marshal(facets.ValFor(f), fVal))
    
    				buf.WriteString(fVal.Value.(string))
    
    Motakjuq's avatar
    Motakjuq committed
    			buf.WriteByte(')')
    
    Motakjuq's avatar
    Motakjuq committed
    		buf.WriteString(" .\n")
    
    func toSchema(buf *bytes.Buffer, s *skv) {
    
    	if strings.ContainsRune(s.attr, ':') {
    		buf.WriteRune('<')
    		buf.WriteString(s.attr)
    		buf.WriteRune('>')
    	} else {
    		buf.WriteString(s.attr)
    	}
    
    	buf.WriteByte(':')
    	buf.WriteString(types.TypeID(s.schema.ValueType).Name())
    	if s.schema.Directive == typesp.Schema_REVERSE {
    		buf.WriteString(" @reverse")
    	} else if s.schema.Directive == typesp.Schema_INDEX && len(s.schema.Tokenizer) > 0 {
    		buf.WriteString(" @index(")
    		buf.WriteString(strings.Join(s.schema.Tokenizer, ","))
    		buf.WriteByte(')')
    	}
    	buf.WriteString("\n")
    }
    
    
    func writeToFile(fpath string, ch chan []byte) error {
    	f, err := os.Create(fpath)
    	if err != nil {
    		return err
    	}
    
    	defer f.Close()
    	x.Check(err)
    	w := bufio.NewWriterSize(f, 1000000)
    	gw, err := gzip.NewWriterLevel(w, gzip.BestCompression)
    	if err != nil {
    		return err
    	}
    
    	for buf := range ch {
    		if _, err := gw.Write(buf); err != nil {
    			return err
    		}
    	}
    	if err := gw.Flush(); err != nil {
    		return err
    	}
    	if err := gw.Close(); err != nil {
    		return err
    	}
    	return w.Flush()
    }
    
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    // Backup creates a backup of data by exporting it as an RDF gzip.
    
    func backup(gid uint32, bdir string) error {
    	// Use a goroutine to write to file.
    	err := os.MkdirAll(bdir, 0700)
    	if err != nil {
    		return err
    	}
    	fpath := path.Join(bdir, fmt.Sprintf("dgraph-%d-%s.rdf.gz", gid,
    		time.Now().Format("2006-01-02-15-04")))
    
    	fspath := path.Join(bdir, fmt.Sprintf("dgraph-schema-%d-%s.rdf.gz", gid,
    		time.Now().Format("2006-01-02-15-04")))
    	fmt.Printf("Backing up at: %v, schema at %v\n", fpath, fspath)
    
    	chb := make(chan []byte, 1000)
    
    	errChan := make(chan error, 2)
    
    	go func() {
    		errChan <- writeToFile(fpath, chb)
    	}()
    
    	chsb := make(chan []byte, 1000)
    	go func() {
    		errChan <- writeToFile(fspath, chsb)
    	}()
    
    	// Use a bunch of goroutines to convert to RDF format.
    	chkv := make(chan kv, 1000)
    	var wg sync.WaitGroup
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	wg.Add(numBackupRoutines)
    	for i := 0; i < numBackupRoutines; i++ {
    		go func() {
    
    			buf := new(bytes.Buffer)
    			buf.Grow(50000)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			for item := range chkv {
    
    				toRDF(buf, item)
    				if buf.Len() >= 40000 {
    
    					tmp := make([]byte, buf.Len())
    					copy(tmp, buf.Bytes())
    					chb <- tmp
    
    					buf.Reset()
    				}
    			}
    			if buf.Len() > 0 {
    
    				tmp := make([]byte, buf.Len())
    				copy(tmp, buf.Bytes())
    				chb <- tmp
    
    	// Use a goroutine to convert typesp.Schema to string
    	chs := make(chan *skv, 1000)
    	wg.Add(1)
    	go func() {
    		buf := new(bytes.Buffer)
    		buf.Grow(50000)
    		for item := range chs {
    			toSchema(buf, item)
    			if buf.Len() >= 40000 {
    				tmp := make([]byte, buf.Len())
    				copy(tmp, buf.Bytes())
    				chsb <- tmp
    				buf.Reset()
    			}
    		}
    		if buf.Len() > 0 {
    			tmp := make([]byte, buf.Len())
    			copy(tmp, buf.Bytes())
    			chsb <- tmp
    		}
    		wg.Done()
    	}()
    
    
    	// Iterate over rocksdb.
    	it := pstore.NewIterator()
    	defer it.Close()
    	var lastPred string
    
    Motakjuq's avatar
    Motakjuq committed
    	prefix := new(bytes.Buffer)
    	prefix.Grow(100)
    
    	for it.SeekToFirst(); it.Valid(); {
    		key := it.Key().Data()
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			// Seek to the end of index keys.
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			continue
    		}
    
    		if pk.IsReverse() {
    			// Seek to the end of reverse keys.
    			it.Seek(pk.SkipRangeOfSameType())
    			continue
    		}
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			// Skip the UID mappings.
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			continue
    		}
    
    		if pk.IsSchema() {
    
    			if group.BelongsTo(pk.Attr) == gid {
    				s := &typesp.Schema{}
    				x.Check(s.Unmarshal(it.Value().Data()))
    				chs <- &skv{
    					attr:   pk.Attr,
    					schema: s,
    				}
    			}
    			// skip predicate
    			it.Next()
    
    
    		x.AssertTrue(pk.IsData())
    		pred, uid := pk.Attr, pk.Uid
    
    		if pred != lastPred && group.BelongsTo(pred) != gid {
    
    Motakjuq's avatar
    Motakjuq committed
    		prefix.WriteString("<0x")
    		prefix.WriteString(strconv.FormatUint(uid, 16))
    		prefix.WriteString("> <")
    		prefix.WriteString(pred)
    		prefix.WriteString("> ")
    
    		pl := &typesp.PostingList{}
    
    		x.Check(pl.Unmarshal(it.Value().Data()))
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    		chkv <- kv{
    
    Motakjuq's avatar
    Motakjuq committed
    			prefix: prefix.String(),
    
    Motakjuq's avatar
    Motakjuq committed
    		prefix.Reset()
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    		lastPred = pred
    		it.Next()
    	}
    
    
    	close(chkv) // We have stopped output to chkv.
    
    	close(chs)  // we have stopped output to chs (schema)
    
    	wg.Wait()   // Wait for numBackupRoutines to finish.
    	close(chb)  // We have stopped output to chb.
    
    	close(chsb) // we have stopped output to chs (schema)
    
    	err = <-errChan
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	return err
    }
    
    
    func handleBackupForGroup(ctx context.Context, reqId uint64, gid uint32) *workerp.BackupPayload {
    
    	n := groups().Node(gid)
    	if n.AmLeader() {
    		x.Trace(ctx, "Leader of group: %d. Running backup.", gid)
    		if err := backup(gid, *backupPath); err != nil {
    			x.TraceError(ctx, err)
    
    			return &workerp.BackupPayload{
    
    				Status: workerp.BackupPayload_FAILED,
    
    			}
    		}
    		x.Trace(ctx, "Backup done for group: %d.", gid)
    
    		return &workerp.BackupPayload{
    
    			Status:  workerp.BackupPayload_SUCCESS,
    
    	// I'm not the leader. Relay to someone who I think is.
    	var addrs []string
    	{
    		// Try in order: leader of given group, any server from given group, leader of group zero.
    		_, addr := groups().Leader(gid)
    		addrs = append(addrs, addr)
    		addrs = append(addrs, groups().AnyServer(gid))
    		_, addr = groups().Leader(0)
    		addrs = append(addrs, addr)
    	}
    
    	var conn *grpc.ClientConn
    	for _, addr := range addrs {
    		pl := pools().get(addr)
    		var err error
    		conn, err = pl.Get()
    		if err == nil {
    			x.Trace(ctx, "Relaying backup request for group %d to %q", gid, pl.Addr)
    			defer pl.Put(conn)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    			break
    		}
    
    	// Unable to find any connection to any of these servers. This should be exceedingly rare.
    	// But probably not worthy of crashing the server. We can just skip the backup.
    	if conn == nil {
    		x.Trace(ctx, fmt.Sprintf("Unable to find a server to backup group: %d", gid))
    
    		return &workerp.BackupPayload{
    
    			Status:  workerp.BackupPayload_FAILED,
    
    	c := workerp.NewWorkerClient(conn)
    	nr := &workerp.BackupPayload{
    
    		ReqId:   reqId,
    		GroupId: gid,
    
    	nrep, err := c.Backup(ctx, nr)
    
    Ashwin Ramesh's avatar
    Ashwin Ramesh committed
    	if err != nil {
    
    		x.TraceError(ctx, err)
    
    		return &workerp.BackupPayload{
    
    			Status:  workerp.BackupPayload_FAILED,
    
    	return nrep
    }
    
    // Backup request is used to trigger backups for the request list of groups.
    // If a server receives request to backup a group that it doesn't handle, it would
    // automatically relay that request to the server that it thinks should handle the request.
    
    func (w *grpcWorker) Backup(ctx context.Context, req *workerp.BackupPayload) (*workerp.BackupPayload, error) {
    	reply := &workerp.BackupPayload{ReqId: req.ReqId}
    	reply.Status = workerp.BackupPayload_FAILED // Set by default.
    
    
    	if ctx.Err() != nil {
    		return reply, ctx.Err()
    	}
    	if !w.addIfNotPresent(req.ReqId) {
    
    		reply.Status = workerp.BackupPayload_DUPLICATE
    
    	chb := make(chan *workerp.BackupPayload, 1)
    
    	go func() {
    		chb <- handleBackupForGroup(ctx, req.ReqId, req.GroupId)
    	}()
    
    	select {
    	case rep := <-chb:
    		return rep, nil
    	case <-ctx.Done():
    		return reply, ctx.Err()
    	}
    }
    
    
    func BackupOverNetwork(ctx context.Context) error {
    
    	// If we haven't even had a single membership update, don't run backup.
    
    	if len(*peerAddr) > 0 && groups().LastUpdate() == 0 {
    
    		x.Trace(ctx, "This server hasn't yet been fully initiated. Please retry later.")
    
    		return x.Errorf("Uninitiated server. Please retry later")
    
    	// Let's first collect all groups.
    	gids := groups().KnownGroups()
    
    	ch := make(chan *workerp.BackupPayload, len(gids))
    
    	for _, gid := range gids {
    
    		// Nothing is stored in group zero
    		if gid == 0 {
    			continue
    		}
    
    		go func(group uint32) {
    			reqId := uint64(rand.Int63())
    			ch <- handleBackupForGroup(ctx, reqId, group)
    		}(gid)
    	}
    
    	for i := 0; i < len(gids); i++ {
    		bp := <-ch
    
    		if bp.Status != workerp.BackupPayload_SUCCESS {
    
    			x.Trace(ctx, "Backup status: %v for group id: %d", bp.Status, bp.GroupId)
    
    			return fmt.Errorf("Backup status: %v for group id: %d", bp.Status, bp.GroupId)
    
    		x.Trace(ctx, "Backup successful for group: %v", bp.GroupId)
    
    	x.Trace(ctx, "DONE backup")