diff --git a/conn/codec.go b/conn/codec.go
index e40864a02892aca7e242b0fca284f196a5975850..08954f2bb0dca91978b70c1f8ca1193fcb7fed76 100644
--- a/conn/codec.go
+++ b/conn/codec.go
@@ -15,6 +15,8 @@ type Query struct {
 
 type Reply struct {
 	Data []byte
+	// TODO(manishrjain): Add an error here.
+	// Error string
 }
 
 func writeHeader(rwc io.ReadWriteCloser, seq uint64,
diff --git a/conn/server.go b/conn/server.go
index 986783a5f49f396496fd4cb4569ecb5f967326ae..e3e7c4dba08316abf6c9a933b21c8751fd5a9147 100644
--- a/conn/server.go
+++ b/conn/server.go
@@ -35,7 +35,9 @@ func (c *ServerCodec) ReadRequestBody(data interface{}) error {
 	return nil
 }
 
-func (c *ServerCodec) WriteResponse(resp *rpc.Response, data interface{}) error {
+func (c *ServerCodec) WriteResponse(resp *rpc.Response,
+	data interface{}) error {
+
 	if len(resp.Error) > 0 {
 		log.Fatal("Response has error: " + resp.Error)
 	}
diff --git a/worker/mutation.go b/worker/mutation.go
index 01a9b7137adb80c628f5c7f3b583abc225711b80..8b204b78a542e3c62167aa945e4e57a1d2e407c0 100644
--- a/worker/mutation.go
+++ b/worker/mutation.go
@@ -3,8 +3,13 @@ package worker
 import (
 	"bytes"
 	"encoding/gob"
+	"fmt"
+	"sync"
 
+	"github.com/dgraph-io/dgraph/conn"
+	"github.com/dgraph-io/dgraph/posting"
 	"github.com/dgraph-io/dgraph/x"
+	"github.com/dgryski/go-farm"
 )
 
 type Mutations struct {
@@ -24,3 +29,100 @@ func (m *Mutations) Decode(data []byte) error {
 	dec := gob.NewDecoder(r)
 	return dec.Decode(m)
 }
+
+func mutate(m *Mutations, left *Mutations) error {
+	// For now, assume it's all only Set instructions.
+	for _, edge := range m.Set {
+		if farm.Fingerprint64(
+			[]byte(edge.Attribute))%numInstances != instanceIdx {
+
+			glog.WithField("instanceIdx", instanceIdx).
+				WithField("attr", edge.Attribute).
+				Info("Predicate fingerprint doesn't match instanceIdx")
+			return fmt.Errorf("predicate fingerprint doesn't match this instance.")
+		}
+
+		key := posting.Key(edge.Entity, edge.Attribute)
+		plist := posting.GetOrCreate(key, dataStore)
+		if err := plist.AddMutation(edge, posting.Set); err != nil {
+			left.Set = append(left.Set, edge)
+			glog.WithError(err).WithField("edge", edge).
+				Error("While adding mutation.")
+			continue
+		}
+	}
+	return nil
+}
+
+func runMutate(idx int, m *Mutations, wg *sync.WaitGroup,
+	replies chan *conn.Reply, che chan error) {
+
+	defer wg.Done()
+	left := new(Mutations)
+	if idx == int(instanceIdx) {
+		che <- mutate(m, left)
+		return
+	}
+
+	var err error
+	pool := pools[idx]
+	query := new(conn.Query)
+	query.Data, err = m.Encode()
+	if err != nil {
+		che <- err
+		return
+	}
+
+	reply := new(conn.Reply)
+	if err := pool.Call("Worker.Mutate", query, reply); err != nil {
+		glog.WithField("call", "Worker.Mutate").
+			WithField("addr", pool.Addr).
+			WithError(err).Error("While calling mutate")
+		che <- err
+		return
+	}
+	replies <- reply
+}
+
+func MutateOverNetwork(
+	edges []x.DirectedEdge) (left []x.DirectedEdge, rerr error) {
+
+	mutationArray := make([]*Mutations, numInstances)
+	for _, edge := range edges {
+		idx := farm.Fingerprint64([]byte(edge.Attribute)) % numInstances
+		mu := mutationArray[idx]
+		if mu == nil {
+			mu = new(Mutations)
+		}
+		mu.Set = append(mu.Set, edge)
+	}
+
+	var wg sync.WaitGroup
+	replies := make(chan *conn.Reply, numInstances)
+	errors := make(chan error, numInstances)
+	for idx, mu := range mutationArray {
+		if mu == nil || len(mu.Set) == 0 {
+			continue
+		}
+		wg.Add(1)
+		go runMutate(idx, mu, &wg, replies, errors)
+	}
+	wg.Wait()
+	close(replies)
+	close(errors)
+
+	for err := range errors {
+		if err != nil {
+			glog.WithError(err).Error("While running all mutations")
+			return left, err
+		}
+	}
+	for reply := range replies {
+		l := new(Mutations)
+		if err := l.Decode(reply.Data); err != nil {
+			return left, err
+		}
+		left = append(left, l.Set...)
+	}
+	return left, nil
+}
diff --git a/worker/task.go b/worker/task.go
new file mode 100644
index 0000000000000000000000000000000000000000..5ce45ff3ee0021810ba9285e29042896ebdef395
--- /dev/null
+++ b/worker/task.go
@@ -0,0 +1,93 @@
+package worker
+
+import (
+	"github.com/dgraph-io/dgraph/conn"
+	"github.com/dgraph-io/dgraph/posting"
+	"github.com/dgraph-io/dgraph/task"
+	"github.com/dgraph-io/dgraph/x"
+	"github.com/dgryski/go-farm"
+	"github.com/google/flatbuffers/go"
+)
+
+func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
+	uo := flatbuffers.GetUOffsetT(qu)
+	q := new(task.Query)
+	q.Init(qu, uo)
+
+	attr := string(q.Attr())
+	idx := farm.Fingerprint64([]byte(attr)) % numInstances
+
+	var runHere bool
+	if attr == "_xid_" || attr == "_uid_" {
+		idx = 0
+		runHere = (instanceIdx == 0)
+	} else {
+		runHere = (instanceIdx == idx)
+	}
+
+	if runHere {
+		// No need for a network call, as this should be run from within
+		// this instance.
+		return processTask(qu)
+	}
+
+	pool := pools[idx]
+	addr := pool.Addr
+	query := new(conn.Query)
+	query.Data = qu
+	reply := new(conn.Reply)
+	if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
+		glog.WithField("call", "Worker.ServeTask").Fatal(err)
+	}
+	glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
+		Info("Got reply from server")
+	return reply.Data, nil
+}
+
+func processTask(query []byte) (result []byte, rerr error) {
+	uo := flatbuffers.GetUOffsetT(query)
+	q := new(task.Query)
+	q.Init(query, uo)
+	attr := string(q.Attr())
+
+	b := flatbuffers.NewBuilder(0)
+	voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
+	uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
+
+	for i := 0; i < q.UidsLength(); i++ {
+		uid := q.Uids(i)
+		key := posting.Key(uid, attr)
+		pl := posting.GetOrCreate(key, dataStore)
+
+		var valoffset flatbuffers.UOffsetT
+		if val, err := pl.Value(); err != nil {
+			valoffset = b.CreateByteVector(x.Nilbyte)
+		} else {
+			valoffset = b.CreateByteVector(val)
+		}
+		task.ValueStart(b)
+		task.ValueAddVal(b, valoffset)
+		voffsets[i] = task.ValueEnd(b)
+
+		ulist := pl.GetUids()
+		uoffsets[i] = x.UidlistOffset(b, ulist)
+	}
+	task.ResultStartValuesVector(b, len(voffsets))
+	for i := len(voffsets) - 1; i >= 0; i-- {
+		b.PrependUOffsetT(voffsets[i])
+	}
+	valuesVent := b.EndVector(len(voffsets))
+
+	task.ResultStartUidmatrixVector(b, len(uoffsets))
+	for i := len(uoffsets) - 1; i >= 0; i-- {
+		b.PrependUOffsetT(uoffsets[i])
+	}
+	matrixVent := b.EndVector(len(uoffsets))
+
+	task.ResultStart(b)
+	task.ResultAddValues(b, valuesVent)
+	task.ResultAddUidmatrix(b, matrixVent)
+	rend := task.ResultEnd(b)
+	b.Finish(rend)
+	return b.Bytes[b.Head():], nil
+}
diff --git a/worker/worker.go b/worker/worker.go
index fca0b6309352bda5c728b6aa2788d352a528d824..c09afe8abc263ca7a9dadd48570cb63dffffeb55 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -2,13 +2,11 @@ package worker
 
 import (
 	"flag"
-	"fmt"
 	"io"
 	"net"
 	"net/rpc"
 
 	"github.com/dgraph-io/dgraph/conn"
-	"github.com/dgraph-io/dgraph/posting"
 	"github.com/dgraph-io/dgraph/store"
 	"github.com/dgraph-io/dgraph/task"
 	"github.com/dgraph-io/dgraph/x"
@@ -64,87 +62,6 @@ func Connect(workerList []string) {
 	glog.Info("Server started. Clients connected.")
 }
 
-func ProcessTaskOverNetwork(qu []byte) (result []byte, rerr error) {
-	uo := flatbuffers.GetUOffsetT(qu)
-	q := new(task.Query)
-	q.Init(qu, uo)
-
-	attr := string(q.Attr())
-	idx := farm.Fingerprint64([]byte(attr)) % numInstances
-
-	var runHere bool
-	if attr == "_xid_" || attr == "_uid_" {
-		idx = 0
-		runHere = (instanceIdx == 0)
-	} else {
-		runHere = (instanceIdx == idx)
-	}
-
-	if runHere {
-		return ProcessTask(qu)
-	}
-
-	pool := pools[idx]
-	addr := pool.Addr
-	query := new(conn.Query)
-	query.Data = qu
-	reply := new(conn.Reply)
-	if err := pool.Call("Worker.ServeTask", query, reply); err != nil {
-		glog.WithField("call", "Worker.ServeTask").Fatal(err)
-	}
-	glog.WithField("reply", string(reply.Data)).WithField("addr", addr).
-		Info("Got reply from server")
-	return reply.Data, nil
-}
-
-func ProcessTask(query []byte) (result []byte, rerr error) {
-	uo := flatbuffers.GetUOffsetT(query)
-	q := new(task.Query)
-	q.Init(query, uo)
-	attr := string(q.Attr())
-
-	b := flatbuffers.NewBuilder(0)
-	voffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
-	uoffsets := make([]flatbuffers.UOffsetT, q.UidsLength())
-
-	for i := 0; i < q.UidsLength(); i++ {
-		uid := q.Uids(i)
-		key := posting.Key(uid, attr)
-		pl := posting.GetOrCreate(key, dataStore)
-
-		var valoffset flatbuffers.UOffsetT
-		if val, err := pl.Value(); err != nil {
-			valoffset = b.CreateByteVector(x.Nilbyte)
-		} else {
-			valoffset = b.CreateByteVector(val)
-		}
-		task.ValueStart(b)
-		task.ValueAddVal(b, valoffset)
-		voffsets[i] = task.ValueEnd(b)
-
-		ulist := pl.GetUids()
-		uoffsets[i] = x.UidlistOffset(b, ulist)
-	}
-	task.ResultStartValuesVector(b, len(voffsets))
-	for i := len(voffsets) - 1; i >= 0; i-- {
-		b.PrependUOffsetT(voffsets[i])
-	}
-	valuesVent := b.EndVector(len(voffsets))
-
-	task.ResultStartUidmatrixVector(b, len(uoffsets))
-	for i := len(uoffsets) - 1; i >= 0; i-- {
-		b.PrependUOffsetT(uoffsets[i])
-	}
-	matrixVent := b.EndVector(len(uoffsets))
-
-	task.ResultStart(b)
-	task.ResultAddValues(b, valuesVent)
-	task.ResultAddUidmatrix(b, matrixVent)
-	rend := task.ResultEnd(b)
-	b.Finish(rend)
-	return b.Bytes[b.Head():], nil
-}
-
 func NewQuery(attr string, uids []uint64) []byte {
 	b := flatbuffers.NewBuilder(0)
 	task.QueryStartUidsVector(b, len(uids))
@@ -181,25 +98,8 @@ func (w *Worker) Mutate(query *conn.Query, reply *conn.Reply) (rerr error) {
 	}
 
 	left := new(Mutations)
-	// For now, assume it's all only Set instructions.
-	for _, edge := range m.Set {
-		if farm.Fingerprint64(
-			[]byte(edge.Attribute))%numInstances != instanceIdx {
-
-			glog.WithField("instanceIdx", instanceIdx).
-				WithField("attr", edge.Attribute).
-				Info("Predicate fingerprint doesn't match instanceIdx")
-			return fmt.Errorf("predicate fingerprint doesn't match this instance.")
-		}
-
-		key := posting.Key(edge.Entity, edge.Attribute)
-		plist := posting.GetOrCreate(key, dataStore)
-		if err := plist.AddMutation(edge, posting.Set); err != nil {
-			left.Set = append(left.Set, edge)
-			glog.WithError(err).WithField("edge", edge).
-				Error("While adding mutation.")
-			continue
-		}
+	if err := mutate(m, left); err != nil {
+		return err
 	}
 	reply.Data, rerr = left.Encode()
 	return
@@ -212,7 +112,7 @@ func (w *Worker) ServeTask(query *conn.Query, reply *conn.Reply) (rerr error) {
 	attr := string(q.Attr())
 
 	if farm.Fingerprint64([]byte(attr))%numInstances == instanceIdx {
-		reply.Data, rerr = ProcessTask(query.Data)
+		reply.Data, rerr = processTask(query.Data)
 	} else {
 		glog.WithField("attribute", attr).
 			WithField("instanceIdx", instanceIdx).