Skip to content
This repository was archived by the owner on Jun 8, 2021. It is now read-only.

Commit 9afcdf4

Browse files
committed
clean up
1 parent f6649ec commit 9afcdf4

File tree

3 files changed

+57
-23
lines changed

3 files changed

+57
-23
lines changed

proxy.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,18 @@ type ReverseProxy struct {
3030

3131
// ServeRedis satisfies the Handler interface.
3232
func (proxy *ReverseProxy) ServeRedis(w ResponseWriter, r *Request) {
33-
for _, cmd := range r.Cmds {
34-
switch cmd.Cmd {
33+
if len(r.Cmds) == 1 {
34+
switch r.Cmds[0].Cmd {
3535
case "SUBSCRIBE", "PSUBSCRIBE":
3636
var channels []string
3737
var channel string
3838

39-
for cmd.Args.Next(&channel) {
39+
for r.Cmds[0].Args.Next(&channel) {
4040
// TOOD: limit the number of channels read here
4141
channels = append(channels, channel)
4242
}
4343

44-
if err := cmd.Args.Close(); err != nil {
44+
if err := r.Cmds[0].Args.Close(); err != nil {
4545
proxy.log(err)
4646
return
4747
}
@@ -52,13 +52,11 @@ func (proxy *ReverseProxy) ServeRedis(w ResponseWriter, r *Request) {
5252
return
5353
}
5454

55-
proxy.servePubSub(conn, rw, cmd.Cmd, channels...)
56-
// TOD: figure out a way to pass the connection back in regular mode
57-
58-
default:
59-
proxy.serveRequest(w, r)
55+
proxy.servePubSub(conn, rw, r.Cmds[0].Cmd, channels...)
56+
// TOD0: figure out a way to pass the connection back in regular mode
6057
}
6158
}
59+
proxy.serveRequest(w, r)
6260
}
6361

6462
func (proxy *ReverseProxy) serveRequest(w ResponseWriter, req *Request) {

response.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redis
22

33
import (
44
"io"
5+
"strings"
56

67
"github.com/segmentio/objconv/resp"
78
)
@@ -48,7 +49,7 @@ func (res *Response) Close() error {
4849
}
4950

5051
func newResponse(parser *resp.Parser, req *Request, done chan<- error) *Response {
51-
argsDone := make(chan error, 1)
52+
argsDone := make(chan error, len(req.Cmds))
5253

5354
args := make([]Args, len(req.Cmds))
5455
for i := 0; i < len(req.Cmds); i++ {
@@ -60,12 +61,15 @@ func newResponse(parser *resp.Parser, req *Request, done chan<- error) *Response
6061
for range req.Cmds {
6162
if e := <-argsDone; err == nil && e != nil {
6263
err = e
64+
if redisErr, ok := e.(*resp.Error); ok {
65+
if strings.HasPrefix(redisErr.Error(), "EXECABORT") {
66+
break
67+
}
68+
}
6369
}
6470

6571
}
66-
if done != nil {
67-
done <- err
68-
}
72+
done <- err
6973
}()
7074

7175
return &Response{

server.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -305,21 +305,41 @@ func (s *Server) serveConnection(ctx context.Context, c *serverConn, config serv
305305
}
306306

307307
func (s *Server) serveRequest(res *responseWriter, req *Request) (err error) {
308-
for _, cmd := range req.Cmds {
309-
switch cmd.Cmd {
308+
if len(req.Cmds) == 1 {
309+
switch req.Cmds[0].Cmd {
310310
case "PING":
311311
msg := "PONG"
312312
req.ParseArgs(&msg)
313313
res.Write(msg)
314-
315-
default:
316-
err = s.serveRedis(res, req)
317-
cmd.Args.Close()
314+
return
318315
}
316+
}
317+
//for _, cmd := range req.Cmds {
318+
// switch cmd.Cmd {
319+
// case "PING":
320+
// msg := "PONG"
321+
// req.ParseArgs(&msg)
322+
// res.Write(msg)
323+
324+
// default:
325+
// err = s.serveRedis(res, req)
326+
// cmd.Args.Close()
327+
// }
328+
329+
// if err == nil {
330+
// err = res.Flush()
331+
// }
332+
//}
333+
err = s.serveRedis(res, req)
334+
if err == nil {
335+
err = res.Flush()
336+
}
337+
for _, cmd := range req.Cmds {
338+
cmd.Args.Close()
339+
}
319340

320-
if err == nil {
321-
err = res.Flush()
322-
}
341+
if err == nil {
342+
err = res.Flush()
323343
}
324344

325345
return
@@ -504,9 +524,11 @@ func readRequest(ctx context.Context, conn *serverConn, done chan<- error) (*Req
504524

505525
cmds := []Command{cmd}
506526
if cmd.Cmd == "MULTI" {
527+
argsDone := make(chan error, 1)
528+
507529
for cmd.Cmd != "EXEC" {
508530
cmd.Args = nil
509-
args = newByteArgsReader(&conn.p, nil)
531+
args = newByteArgsReader(&conn.p, argsDone)
510532

511533
if !args.Next(&cmd.Cmd) {
512534
break
@@ -524,6 +546,16 @@ func readRequest(ctx context.Context, conn *serverConn, done chan<- error) (*Req
524546
cmd.Args = List(a...)
525547
cmds = append(cmds, cmd)
526548
}
549+
550+
go func() {
551+
var err error
552+
for range cmds {
553+
if e := <-argsDone; err == nil && e != nil {
554+
err = e
555+
}
556+
}
557+
done <- err
558+
}()
527559
}
528560

529561
req := &Request{

0 commit comments

Comments
 (0)