Skip to content
Snippets Groups Projects
Commit 87955a74 authored by Pawan Rawal's avatar Pawan Rawal
Browse files

Refactoring

1. Renamed GetResponse to Query in DGraph service.
2. Client now accepts the ip of the server as a flag instead of just the port.
3. Removed ToProtocolBuffer method, and added PreTraverse as a method for a Subgraph.
4. Renamed runServerForClient to runGrpcServer.
parent 2a18c933
No related branches found
No related tags found
No related merge requests found
......@@ -28,13 +28,13 @@ import (
)
var glog = x.Log("client")
var port = flag.String("port", "9090", "Port to communicate with server")
var ip = flag.String("ip", "127.0.0.1:9090", "Port to communicate with server")
var query = flag.String("query", "", "Query sent to the server")
func main() {
flag.Parse()
// TODO(pawan): Pick address for server from config
conn, err := grpc.Dial("127.0.0.1:"+*port, grpc.WithInsecure())
conn, err := grpc.Dial(*ip, grpc.WithInsecure())
if err != nil {
x.Err(glog, err).Fatal("DialTCPConnection")
}
......@@ -42,7 +42,7 @@ func main() {
c := pb.NewDGraphClient(conn)
r, err := c.GetResponse(context.Background(), &pb.GraphRequest{Query: *query})
r, err := c.Query(context.Background(), &pb.GraphRequest{Query: *query})
if err != nil {
x.Err(glog, err).Fatal("Error in getting response from server")
}
......
......@@ -112,7 +112,7 @@ const _ = grpc.SupportPackageIsVersion2
// Client API for DGraph service
type DGraphClient interface {
GetResponse(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error)
Query(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error)
}
type dGraphClient struct {
......@@ -123,9 +123,9 @@ func NewDGraphClient(cc *grpc.ClientConn) DGraphClient {
return &dGraphClient{cc}
}
func (c *dGraphClient) GetResponse(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error) {
func (c *dGraphClient) Query(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error) {
out := new(GraphResponse)
err := grpc.Invoke(ctx, "/pb.DGraph/GetResponse", in, out, c.cc, opts...)
err := grpc.Invoke(ctx, "/pb.DGraph/Query", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
......@@ -135,27 +135,27 @@ func (c *dGraphClient) GetResponse(ctx context.Context, in *GraphRequest, opts .
// Server API for DGraph service
type DGraphServer interface {
GetResponse(context.Context, *GraphRequest) (*GraphResponse, error)
Query(context.Context, *GraphRequest) (*GraphResponse, error)
}
func RegisterDGraphServer(s *grpc.Server, srv DGraphServer) {
s.RegisterService(&_DGraph_serviceDesc, srv)
}
func _DGraph_GetResponse_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _DGraph_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GraphRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DGraphServer).GetResponse(ctx, in)
return srv.(DGraphServer).Query(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.DGraph/GetResponse",
FullMethod: "/pb.DGraph/Query",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DGraphServer).GetResponse(ctx, req.(*GraphRequest))
return srv.(DGraphServer).Query(ctx, req.(*GraphRequest))
}
return interceptor(ctx, in, info, handler)
}
......@@ -165,29 +165,29 @@ var _DGraph_serviceDesc = grpc.ServiceDesc{
HandlerType: (*DGraphServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetResponse",
Handler: _DGraph_GetResponse_Handler,
MethodName: "Query",
Handler: _DGraph_Query_Handler,
},
},
Streams: []grpc.StreamDesc{},
}
var fileDescriptor0 = []byte{
// 256 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4b, 0xc4, 0x30,
0x10, 0x85, 0xed, 0xee, 0x1a, 0xed, 0x74, 0x05, 0x1d, 0x45, 0x8a, 0x28, 0x48, 0xf0, 0xb0, 0x1e,
0xec, 0x61, 0xf5, 0xec, 0x49, 0xd8, 0x83, 0x9e, 0x02, 0xfe, 0x80, 0xd6, 0x06, 0x37, 0x50, 0xb7,
0x31, 0x99, 0x8a, 0xde, 0xfc, 0xe9, 0x4e, 0xd3, 0xb0, 0x55, 0xbc, 0xcd, 0xf4, 0xbd, 0x79, 0xef,
0x6b, 0xe0, 0xf8, 0xd5, 0x95, 0x76, 0xed, 0xb4, 0xb7, 0xed, 0xc6, 0xeb, 0xc2, 0xba, 0x96, 0x5a,
0x9c, 0xd8, 0x4a, 0x5e, 0xc0, 0xde, 0xb3, 0xa9, 0x9f, 0x8c, 0x27, 0x44, 0x98, 0x75, 0xa6, 0xf6,
0x79, 0x72, 0x39, 0x5d, 0xcc, 0x54, 0x98, 0xe5, 0x23, 0x08, 0xa5, 0x7d, 0xd7, 0x10, 0x9e, 0x82,
0xf8, 0x28, 0x9b, 0x4e, 0x0f, 0xfa, 0x5c, 0xc5, 0x0d, 0xaf, 0x21, 0x65, 0xe7, 0x5b, 0x49, 0xce,
0x7c, 0xe6, 0x13, 0x96, 0xb2, 0x65, 0x56, 0xd8, 0xaa, 0x88, 0xa9, 0x6a, 0x54, 0xe5, 0x15, 0xcc,
0x57, 0x3d, 0x86, 0xd2, 0xef, 0x7c, 0x49, 0x78, 0x02, 0xbb, 0x3c, 0xb8, 0x2f, 0x4e, 0x4c, 0x16,
0xa9, 0x1a, 0x16, 0xf9, 0x9d, 0xc0, 0x41, 0xb4, 0x0d, 0xb4, 0x78, 0x0e, 0x69, 0x49, 0x9c, 0x50,
0x75, 0xa4, 0xa3, 0x77, 0xfc, 0x80, 0x12, 0x84, 0x0b, 0x88, 0xdc, 0x9e, 0x70, 0x3b, 0xf4, 0xed,
0x03, 0xb4, 0x8a, 0x0a, 0xde, 0xc0, 0xfe, 0xcb, 0xda, 0x34, 0xb5, 0xd3, 0x9b, 0x7c, 0x1a, 0x18,
0x8f, 0x7a, 0xd7, 0x9f, 0x1a, 0xb5, 0xb5, 0x2c, 0xef, 0x41, 0x3c, 0x04, 0x0d, 0xef, 0x20, 0x5b,
0x69, 0xda, 0x92, 0x1c, 0xfe, 0xba, 0x0a, 0xff, 0x70, 0xf6, 0x3f, 0x47, 0xee, 0x54, 0x22, 0xbc,
0xef, 0xed, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb2, 0xab, 0xec, 0x88, 0x76, 0x01, 0x00, 0x00,
// 255 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x90, 0x4f, 0x4b, 0xc4, 0x30,
0x10, 0xc5, 0xed, 0xfe, 0x89, 0x76, 0x76, 0x05, 0x1d, 0x45, 0x8a, 0x28, 0x48, 0xf0, 0xb0, 0x1e,
0xec, 0x61, 0xbd, 0xf8, 0x01, 0x04, 0x0f, 0x7a, 0x71, 0xc0, 0x0f, 0xd0, 0xda, 0xe0, 0x06, 0xea,
0x36, 0x26, 0xa9, 0xe8, 0xcd, 0x8f, 0xee, 0x34, 0x09, 0xae, 0xb2, 0xb7, 0x99, 0xbc, 0x97, 0xf7,
0x7e, 0x0c, 0x1c, 0xbd, 0xda, 0xca, 0xac, 0xac, 0x72, 0xa6, 0x5b, 0x3b, 0x55, 0x1a, 0xdb, 0xf9,
0x0e, 0x47, 0xa6, 0x96, 0xe7, 0xb0, 0xfb, 0xac, 0x9b, 0x47, 0xed, 0x3c, 0x22, 0x4c, 0x7a, 0xdd,
0xb8, 0x22, 0xbb, 0x18, 0x2f, 0x26, 0x14, 0x66, 0xf9, 0x00, 0x82, 0x94, 0xeb, 0x5b, 0x8f, 0x27,
0x20, 0x3e, 0xaa, 0xb6, 0x57, 0x51, 0x9f, 0x53, 0xda, 0xf0, 0x0a, 0x72, 0x76, 0xbe, 0x55, 0xde,
0xea, 0xcf, 0x62, 0xc4, 0xd2, 0x6c, 0x39, 0x2b, 0x4d, 0x5d, 0xa6, 0x54, 0xda, 0xa8, 0xf2, 0x12,
0xe6, 0xf7, 0x03, 0x06, 0xa9, 0x77, 0xfe, 0xe9, 0xf1, 0x18, 0xa6, 0x3c, 0xd8, 0x2f, 0x4e, 0xcc,
0x16, 0x39, 0xc5, 0x45, 0x7e, 0x67, 0xb0, 0x9f, 0x6c, 0x91, 0x16, 0xcf, 0x20, 0xaf, 0x3c, 0x27,
0xd4, 0xbd, 0x57, 0xc9, 0xbb, 0x79, 0x40, 0x09, 0xc2, 0x06, 0x44, 0x6e, 0xcf, 0xb8, 0x1d, 0x86,
0xf6, 0x08, 0x4d, 0x49, 0xc1, 0x6b, 0xd8, 0x7b, 0x59, 0xe9, 0xb6, 0xb1, 0x6a, 0x5d, 0x8c, 0x03,
0xe3, 0xe1, 0xe0, 0xfa, 0x57, 0x43, 0xbf, 0x96, 0xe5, 0x2d, 0x88, 0xbb, 0xa0, 0x61, 0x09, 0xd3,
0xa7, 0x81, 0x0a, 0x0f, 0xfe, 0xf8, 0x03, 0xfd, 0xe9, 0x76, 0x82, 0xdc, 0xa9, 0x45, 0xb8, 0xec,
0xcd, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd9, 0x42, 0x7b, 0x6e, 0x70, 0x01, 0x00, 0x00,
}
......@@ -2,7 +2,7 @@ syntax="proto3";
package pb;
service DGraph {
rpc GetResponse (GraphRequest) returns (GraphResponse) {}
rpc Query (GraphRequest) returns (GraphResponse) {}
}
message UidList {
......
......@@ -252,22 +252,14 @@ func (g *SubGraph) ToJson(l *Latency) (js []byte, rerr error) {
return json.Marshal(r)
}
func preTraverse(g *SubGraph) (gr *pb.GraphResponse, rerr error) {
gr = &pb.GraphResponse{}
if len(g.query) == 0 {
return gr, nil
}
gr.Attribute = g.Attr
ro := flatbuffers.GetUOffsetT(g.result)
r := new(task.Result)
r.Init(g.result, ro)
// This method take in a flatbuffer result, extracts values and uids from it
// and converts it to a protocol buffer result
func extract(r *task.Result) (*pb.Result, error) {
var result = &pb.Result{}
var ul task.UidList
result := &pb.Result{}
for i := 0; i < r.UidmatrixLength(); i++ {
if ok := r.Uidmatrix(&ul, i); !ok {
return gr, fmt.Errorf("While parsing UidList")
return result, fmt.Errorf("While parsing UidList")
}
uidList := &pb.UidList{}
......@@ -281,12 +273,12 @@ func preTraverse(g *SubGraph) (gr *pb.GraphResponse, rerr error) {
var tv task.Value
for i := 0; i < r.ValuesLength(); i++ {
if ok := r.Values(&tv, i); !ok {
return gr, fmt.Errorf("While parsing value")
return result, fmt.Errorf("While parsing value")
}
var ival interface{}
if err := posting.ParseValue(&ival, tv.ValBytes()); err != nil {
return gr, err
return result, err
}
if ival == nil {
......@@ -294,11 +286,29 @@ func preTraverse(g *SubGraph) (gr *pb.GraphResponse, rerr error) {
}
result.Values = append(result.Values, []byte(ival.(string)))
}
return result, nil
}
func (g *SubGraph) PreTraverse() (gr *pb.GraphResponse, rerr error) {
gr = &pb.GraphResponse{}
if len(g.query) == 0 {
return gr, nil
}
gr.Attribute = g.Attr
ro := flatbuffers.GetUOffsetT(g.result)
r := new(task.Result)
r.Init(g.result, ro)
result, err := extract(r)
if err != nil {
return gr, err
}
gr.Result = result
for _, child := range g.Children {
childPb, err := preTraverse(child)
childPb, err := child.PreTraverse()
if err != nil {
x.Err(glog, err).Error("Error while traversal")
return gr, err
......@@ -309,16 +319,6 @@ func preTraverse(g *SubGraph) (gr *pb.GraphResponse, rerr error) {
return gr, nil
}
func (g *SubGraph) ToProtocolBuffer() (gr *pb.GraphResponse, rerr error) {
gr, err := preTraverse(g)
if err != nil {
x.Err(glog, err).Error("Error while traversal")
return gr, err
}
return gr, nil
}
func treeCopy(gq *gql.GraphQuery, sg *SubGraph) {
for _, gchild := range gq.Children {
dst := new(SubGraph)
......
......@@ -320,7 +320,7 @@ func TestToJson(t *testing.T) {
fmt.Printf(string(js))
}
func TestToProtocolBuffer(t *testing.T) {
func TestPreTraverse(t *testing.T) {
dir, _ := populateGraph(t)
defer os.RemoveAll(dir)
......@@ -353,7 +353,7 @@ func TestToProtocolBuffer(t *testing.T) {
t.Error(err)
}
ugr, err := sg.ToProtocolBuffer()
ugr, err := sg.PreTraverse()
if err != nil {
t.Error(err)
}
......
......@@ -205,17 +205,16 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
// server is used to implement pb.DGraphServer
type server struct{}
func (s *server) GetResponse(ctx context.Context, r *pb.GraphRequest) (gr *pb.GraphResponse, rerr error) {
q := []byte(r.Query)
if len(q) == 0 {
func (s *server) Query(ctx context.Context, r *pb.GraphRequest) (gr *pb.GraphResponse, rerr error) {
if len(r.Query) == 0 {
glog.Error("While reading query")
return gr, fmt.Errorf("Empty query")
}
// TODO(pawan): Refactor query parsing and graph processing code to a common
// function used by pbQueryHandler and queryHandler
glog.WithField("q", string(q)).Debug("Query received.")
gq, _, err := gql.Parse(string(q))
// function used by Query and queryHandler
glog.WithField("q", r.Query).Debug("Query received.")
gq, _, err := gql.Parse(r.Query)
if err != nil {
x.Err(glog, err).Error("While parsing query")
return gr, err
......@@ -226,7 +225,7 @@ func (s *server) GetResponse(ctx context.Context, r *pb.GraphRequest) (gr *pb.Gr
x.Err(glog, err).Error("While conversion to internal format")
return gr, err
}
glog.WithField("q", string(q)).Debug("Query parsed.")
glog.WithField("q", r.Query).Debug("Query parsed.")
rch := make(chan error)
go query.ProcessGraph(sg, rch)
......@@ -236,17 +235,17 @@ func (s *server) GetResponse(ctx context.Context, r *pb.GraphRequest) (gr *pb.Gr
return gr, err
}
glog.WithField("q", string(q)).Debug("Graph processed.")
gr, err = sg.ToProtocolBuffer()
glog.WithField("q", r.Query).Debug("Graph processed.")
gr, err = sg.PreTraverse()
if err != nil {
x.Err(glog, err).Error("While converting to Json.")
x.Err(glog, err).Error("While converting to protocol buffer.")
return gr, err
}
return gr, err
}
func runServerForClient(address string) error {
func runGrpcServer(address string) error {
ln, err := net.Listen("tcp", address)
if err != nil {
glog.Fatalf("While running server for client: %v", err)
......@@ -306,7 +305,7 @@ func main() {
worker.Connect(addrs)
runServerForClient(":" + *clientPort)
runGrpcServer(":" + *clientPort)
http.HandleFunc("/query", queryHandler)
glog.WithField("port", *port).Info("Listening for requests...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment