Skip to content
Snippets Groups Projects
Commit 911d8b32 authored by Ashwin's avatar Ashwin
Browse files

Moved worker flag to server, removed numInstance flag

parent b19e1504
No related branches found
No related tags found
No related merge requests found
...@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) { ...@@ -103,7 +103,7 @@ func TestNewGraph(t *testing.T) {
t.Error(err) t.Error(err)
} }
worker.Init(ps, nil) worker.Init(ps, nil, nil)
uo := flatbuffers.GetUOffsetT(sg.result) uo := flatbuffers.GetUOffsetT(sg.result)
r := new(task.Result) r := new(task.Result)
...@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) { ...@@ -134,7 +134,7 @@ func populateGraph(t *testing.T) (string, *store.Store) {
ps := new(store.Store) ps := new(store.Store)
ps.Init(dir) ps.Init(dir)
worker.Init(ps, nil) worker.Init(ps, nil, nil)
clog := commit.NewLogger(dir, "mutations", 50<<20) clog := commit.NewLogger(dir, "mutations", 50<<20)
clog.Init() clog.Init()
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"runtime" "runtime"
"strings"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
...@@ -45,8 +46,8 @@ var numcpu = flag.Int("numCpu", runtime.NumCPU(), ...@@ -45,8 +46,8 @@ var numcpu = flag.Int("numCpu", runtime.NumCPU(),
"Number of cores to be used by the process") "Number of cores to be used by the process")
var instanceIdx = flag.Uint64("instanceIdx", 0, var instanceIdx = flag.Uint64("instanceIdx", 0,
"serves only entities whose Fingerprint % numInstance == instanceIdx.") "serves only entities whose Fingerprint % numInstance == instanceIdx.")
var numInstances = flag.Uint64("numInstances", 1, var workers = flag.String("workers", "",
"Total number of server instances") "Comma separated list of IP addresses of workers")
func addCorsHeaders(w http.ResponseWriter) { func addCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
...@@ -142,16 +143,18 @@ func main() { ...@@ -142,16 +143,18 @@ func main() {
clog.Init() clog.Init()
defer clog.Close() defer clog.Close()
addrs := strings.Split(*workers, ",")
posting.Init(clog) posting.Init(clog)
if *instanceIdx != 0 { if *instanceIdx != 0 {
worker.Init(ps, nil) worker.Init(ps, nil, addrs)
uid.Init(nil) uid.Init(nil)
} else { } else {
uidStore := new(store.Store) uidStore := new(store.Store)
uidStore.Init(*uidDir) uidStore.Init(*uidDir)
defer uidStore.Close() defer uidStore.Close()
// Only server instance 0 will have uidStore // Only server instance 0 will have uidStore
worker.Init(ps, uidStore) worker.Init(ps, uidStore, addrs)
uid.Init(uidStore) uid.Init(uidStore)
} }
......
...@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er ...@@ -62,7 +62,7 @@ func prepare() (dir1, dir2 string, ps *store.Store, clog *commit.Logger, rerr er
clog.Init() clog.Init()
posting.Init(clog) posting.Init(clog)
worker.Init(ps, nil) worker.Init(ps, nil, nil)
uid.Init(ps) uid.Init(ps)
loader.Init(ps, ps) loader.Init(ps, ps)
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"io" "io"
"net" "net"
"net/rpc" "net/rpc"
"strings"
"github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/posting"
...@@ -15,18 +14,18 @@ import ( ...@@ -15,18 +14,18 @@ import (
"github.com/google/flatbuffers/go" "github.com/google/flatbuffers/go"
) )
var workers = flag.String("workers", "",
"Comma separated list of IP addresses of workers")
var workerPort = flag.String("workerport", ":12345", var workerPort = flag.String("workerport", ":12345",
"Port used by worker for internal communication.") "Port used by worker for internal communication.")
var glog = x.Log("worker") var glog = x.Log("worker")
var dataStore, xiduidStore *store.Store var dataStore, xiduidStore *store.Store
var pools []*conn.Pool var pools []*conn.Pool
var addrs []string
func Init(ps, xuStore *store.Store) { func Init(ps, xuStore *store.Store, workerList []string) {
dataStore = ps dataStore = ps
xiduidStore = xuStore xiduidStore = xuStore
addrs = workerList
} }
func Connect() { func Connect() {
...@@ -38,7 +37,6 @@ func Connect() { ...@@ -38,7 +37,6 @@ func Connect() {
glog.Fatal(err) glog.Fatal(err)
} }
addrs := strings.Split(*workers, ",")
for _, addr := range addrs { for _, addr := range addrs {
if len(addr) == 0 { if len(addr) == 0 {
continue continue
......
...@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) { ...@@ -58,7 +58,7 @@ func TestProcessTask(t *testing.T) {
defer clog.Close() defer clog.Close()
posting.Init(clog) posting.Init(clog)
Init(ps, nil) Init(ps, nil, nil)
edge := x.DirectedEdge{ edge := x.DirectedEdge{
ValueId: 23, ValueId: 23,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment