Skip to content
Snippets Groups Projects
Unverified Commit a9f9e63a authored by Janardhan Reddy's avatar Janardhan Reddy
Browse files

Fix export when leader is not present.

parent be6ac20d
No related branches found
No related tags found
No related merge requests found
...@@ -15,6 +15,8 @@ and this project will adhere to [Semantic Versioning](http://semver.org/spec/v2. ...@@ -15,6 +15,8 @@ and this project will adhere to [Semantic Versioning](http://semver.org/spec/v2.
### Changed ### Changed
* `DropAttr` now also removes the schema for the attribute (previously it just removed the edges). * `DropAttr` now also removes the schema for the attribute (previously it just removed the edges).
* Tablet metadata is removed from zero after deletion of predicate.
* LRU size is changed dynamically now based on `max_memory_mb`
## [0.9.3] - 2017-12-01 ## [0.9.3] - 2017-12-01
......
...@@ -351,25 +351,10 @@ func export(bdir string, readTs uint64) error { ...@@ -351,25 +351,10 @@ func export(bdir string, readTs uint64) error {
// TODO: How do we want to handle export for group, do we pause mutations, sync all and then export ? // TODO: How do we want to handle export for group, do we pause mutations, sync all and then export ?
// TODO: Should we move export logic to dgraphzero? // TODO: Should we move export logic to dgraphzero?
func handleExportForGroup(ctx context.Context, in *intern.ExportPayload) *intern.ExportPayload { func handleExportForGroupOverNetwork(ctx context.Context, in *intern.ExportPayload) *intern.ExportPayload {
n := groups().Node n := groups().Node
if in.GroupId == groups().groupId() && n != nil && n.AmLeader() { if in.GroupId == groups().groupId() && n != nil && n.AmLeader() {
n.applyAllMarks(n.ctx) return handleExportForGroup(ctx, in)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Leader of group: %d. Running export.", in.GroupId)
}
if err := export(Config.ExportPath, in.ReadTs); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(err.Error())
}
in.Status = intern.ExportPayload_FAILED
return in
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Export done for group: %d.", in.GroupId)
}
in.Status = intern.ExportPayload_SUCCESS
return in
} }
pl := groups().Leader(in.GroupId) pl := groups().Leader(in.GroupId)
...@@ -395,11 +380,38 @@ func handleExportForGroup(ctx context.Context, in *intern.ExportPayload) *intern ...@@ -395,11 +380,38 @@ func handleExportForGroup(ctx context.Context, in *intern.ExportPayload) *intern
return nrep return nrep
} }
func handleExportForGroup(ctx context.Context, in *intern.ExportPayload) *intern.ExportPayload {
n := groups().Node
if in.GroupId != groups().groupId() || !n.AmLeader() {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("I am not leader of group %d.", in.GroupId)
}
in.Status = intern.ExportPayload_FAILED
return in
}
n.applyAllMarks(n.ctx)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Leader of group: %d. Running export.", in.GroupId)
}
if err := export(Config.ExportPath, in.ReadTs); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(err.Error())
}
in.Status = intern.ExportPayload_FAILED
return in
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Export done for group: %d.", in.GroupId)
}
in.Status = intern.ExportPayload_SUCCESS
return in
}
// Export request is used to trigger exports for the request list of groups. // Export request is used to trigger exports for the request list of groups.
// If a server receives request to export a group that it doesn't handle, it would // If a server receives request to export a group that it doesn't handle, it would
// automatically relay that request to the server that it thinks should handle the request. // automatically relay that request to the server that it thinks should handle the request.
func (w *grpcWorker) Export(ctx context.Context, req *intern.ExportPayload) (*intern.ExportPayload, error) { func (w *grpcWorker) Export(ctx context.Context, req *intern.ExportPayload) (*intern.ExportPayload, error) {
reply := &intern.ExportPayload{ReqId: req.ReqId} reply := &intern.ExportPayload{ReqId: req.ReqId, GroupId: req.GroupId}
reply.Status = intern.ExportPayload_FAILED // Set by default. reply.Status = intern.ExportPayload_FAILED // Set by default.
if ctx.Err() != nil { if ctx.Err() != nil {
...@@ -442,13 +454,6 @@ func ExportOverNetwork(ctx context.Context) error { ...@@ -442,13 +454,6 @@ func ExportOverNetwork(ctx context.Context) error {
// Let's first collect all groups. // Let's first collect all groups.
gids := groups().KnownGroups() gids := groups().KnownGroups()
for i, gid := range gids {
if gid == 0 {
gids[i] = gids[len(gids)-1]
gids = gids[:len(gids)-1]
}
}
ch := make(chan *intern.ExportPayload, len(gids)) ch := make(chan *intern.ExportPayload, len(gids))
for _, gid := range gids { for _, gid := range gids {
go func(group uint32) { go func(group uint32) {
...@@ -457,7 +462,7 @@ func ExportOverNetwork(ctx context.Context) error { ...@@ -457,7 +462,7 @@ func ExportOverNetwork(ctx context.Context) error {
GroupId: group, GroupId: group,
ReadTs: readTs, ReadTs: readTs,
} }
ch <- handleExportForGroup(ctx, req) ch <- handleExportForGroupOverNetwork(ctx, req)
}(gid) }(gid)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment