diff --git a/test/client.go b/test/client.go new file mode 100644 index 0000000000000000000000000000000000000000..49fc877330239e48d9b371fff4f47f7c3c51f698 --- /dev/null +++ b/test/client.go @@ -0,0 +1,102 @@ +package main + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "net/rpc" +) + +type ccodec struct { + rwc io.ReadWriteCloser + ebuf *bufio.Writer + payloadLen int32 +} + +func writeHeader(rwc io.ReadWriteCloser, seq uint64, + method string, data []byte) error { + + var bh bytes.Buffer + var rerr error + + setError(&rerr, binary.Write(&bh, binary.LittleEndian, seq)) + setError(&rerr, binary.Write(&bh, binary.LittleEndian, int32(len(method)))) + setError(&rerr, binary.Write(&bh, binary.LittleEndian, int32(len(data)))) + _, err := bh.Write([]byte(method)) + setError(&rerr, err) + if rerr != nil { + return rerr + } + _, err = rwc.Write(bh.Bytes()) + return err +} + +func parseHeader(rwc io.ReadWriteCloser, seq *uint64, method *string, plen *int32) error { + var err error + var sz int32 + setError(&err, binary.Read(rwc, binary.LittleEndian, seq)) + setError(&err, binary.Read(rwc, binary.LittleEndian, &sz)) + setError(&err, binary.Read(rwc, binary.LittleEndian, plen)) + if err != nil { + return err + } + buf := make([]byte, sz) + n, err := rwc.Read(buf) + if err != nil { + return err + } + if n != int(sz) { + return fmt.Errorf("Expected: %v. Got: %v\n", sz, n) + } + *method = string(buf) + return nil +} + +func (c *ccodec) WriteRequest(r *rpc.Request, body interface{}) error { + if body == nil { + return errors.New("Nil body") + } + + query := body.(*Query) + if err := writeHeader(c.rwc, r.Seq, r.ServiceMethod, query.d); err != nil { + return err + } + + n, err := c.rwc.Write(query.d) + if n != len(query.d) { + return errors.New("Unable to write payload.") + } + return err +} + +func (c *ccodec) ReadResponseHeader(r *rpc.Response) error { + if len(r.Error) > 0 { + log.Fatal("client got response error: " + r.Error) + } + if err := parseHeader(c.rwc, &r.Seq, + &r.ServiceMethod, &c.payloadLen); err != nil { + return err + } + fmt.Println("Client got response:", r.Seq) + fmt.Println("Client got response:", r.ServiceMethod) + return nil +} + +func (c *ccodec) ReadResponseBody(body interface{}) error { + buf := make([]byte, c.payloadLen) + n, err := c.rwc.Read(buf) + if n != int(c.payloadLen) { + return fmt.Errorf("Client expected: %d. Got: %d\n", c.payloadLen, n) + } + reply := body.(*Reply) + reply.d = buf + return err +} + +func (c *ccodec) Close() error { + return c.rwc.Close() +} diff --git a/test/main.go b/test/main.go new file mode 100644 index 0000000000000000000000000000000000000000..31dda1b8580f137d936abd41b6f1cefa4017d16d --- /dev/null +++ b/test/main.go @@ -0,0 +1,115 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "log" + "math/rand" + "net" + "net/rpc" +) + +type Query struct { + d []byte +} + +type Reply struct { + d []byte +} + +func setError(prev *error, n error) { + if prev == nil { + prev = &n + } +} + +type Worker struct { +} + +func serveIt(conn io.ReadWriteCloser) { + for { + srv := &scodec{ + rwc: conn, + ebuf: bufio.NewWriter(conn), + } + rpc.ServeRequest(srv) + } +} + +func (w *Worker) Receive(query *Query, reply *Reply) error { + fmt.Printf("Worker received: [%s]\n", string(query.d)) + reply.d = []byte("abcdefghij-Hello World!") + return nil +} + +func runServer(address string) error { + w := new(Worker) + if err := rpc.Register(w); err != nil { + return err + } + + ln, err := net.Listen("tcp", address) + if err != nil { + fmt.Printf("listen(%q): %s\n", address, err) + return err + } + fmt.Printf("Worker listening on %s\n", ln.Addr()) + go func() { + for { + cxn, err := ln.Accept() + if err != nil { + log.Fatalf("listen(%q): %s\n", address, err) + return + } + log.Printf("Worker accepted connection to %s from %s\n", + cxn.LocalAddr(), cxn.RemoteAddr()) + go serveIt(cxn) + } + }() + return nil +} + +func main() { + addresses := map[int]string{ + 1: "127.0.0.1:10000", + // 2: "127.0.0.1:10001", + // 3: "127.0.0.1:10002", + } + + for _, address := range addresses { + runServer(address) + } + + clients := make(map[int]*rpc.Client) + for id, address := range addresses { + conn, err := net.Dial("tcp", address) + if err != nil { + log.Fatal("dial", err) + } + cc := &ccodec{ + rwc: conn, + ebuf: bufio.NewWriter(conn), + } + clients[id] = rpc.NewClientWithCodec(cc) + } + + for i := 0; i < 1; i++ { + client := clients[1] + if client == nil { + log.Fatal("Worker is nil") + } + + id := 0 + // for id, server := range servers { + query := new(Query) + query.d = []byte(fmt.Sprintf("id:%d Rand: %d", id, rand.Int())) + reply := new(Reply) + if err := client.Call("Worker.Receive", query, reply); err != nil { + log.Fatal("call", err) + } + + fmt.Printf("Returned: %s\n", string(reply.d)) + // } + } +} diff --git a/test/server.go b/test/server.go new file mode 100644 index 0000000000000000000000000000000000000000..b4ff9255e942e5b20efc82db5966ad01287fd04e --- /dev/null +++ b/test/server.go @@ -0,0 +1,79 @@ +package main + +import ( + "bufio" + "errors" + "fmt" + "io" + "log" + "net/rpc" + "reflect" +) + +type scodec struct { + rwc io.ReadWriteCloser + ebuf *bufio.Writer + payloadLen int32 +} + +func (c *scodec) ReadRequestHeader(r *rpc.Request) error { + var err error + if err = parseHeader(c.rwc, &r.Seq, + &r.ServiceMethod, &c.payloadLen); err != nil { + return err + } + + fmt.Println("server using custom codec to read header") + fmt.Println("server method called:", r.ServiceMethod) + fmt.Println("server method called:", r.Seq) + return nil +} + +func (c *scodec) ReadRequestBody(data interface{}) error { + if data == nil { + log.Fatal("Why is data nil here?") + } + value := reflect.ValueOf(data) + if value.Type().Kind() != reflect.Ptr { + log.Fatal("Should of of type pointer") + } + + b := make([]byte, c.payloadLen) + n, err := c.rwc.Read(b) + fmt.Printf("Worker read n bytes: %v %s\n", n, string(b)) + if err != nil { + log.Fatal("server", err) + } + if n != int(c.payloadLen) { + return errors.New("Server unable to read request.") + } + + query := data.(*Query) + query.d = b + return nil +} + +func (c *scodec) WriteResponse(resp *rpc.Response, data interface{}) error { + if len(resp.Error) > 0 { + log.Fatal("Response has error: " + resp.Error) + } + if data == nil { + log.Fatal("Worker write response data is nil") + } + reply, ok := data.(*Reply) + if !ok { + log.Fatal("Unable to convert to reply") + } + + if err := writeHeader(c.rwc, resp.Seq, + resp.ServiceMethod, reply.d); err != nil { + return err + } + + _, err := c.rwc.Write(reply.d) + return err +} + +func (c *scodec) Close() error { + return c.rwc.Close() +}