Skip to content

Commit cf13c8d

Browse files
author
filipe.xavier
committed
Added Consumer Groups on Consumer.
1 parent ab6fcd6 commit cf13c8d

File tree

6 files changed

+85
-16
lines changed

6 files changed

+85
-16
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.vscode

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# go-redis-streams
2-
Demo using messageria with Redis Streams in Golang
2+
Demo using messaging with Redis Streams in Golang

consumer/handler/like.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handler
22

33
import (
4+
"errors"
45
"fmt"
56

67
"github.com/felipeagger/go-redis-streams/packages/event"
@@ -21,6 +22,10 @@ func (h *likeHandler) Handle(e event.Event) error {
2122
return fmt.Errorf("incorrect event type")
2223
}
2324

25+
if like.UserID == 5 {
26+
return errors.New("Falhou")
27+
}
28+
2429
fmt.Printf("completed like %+v UserID: %v Extra:%v \n", like, like.UserID, like.Extra)
2530

2631
return nil

consumer/handler/view.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handler
22

33
import (
4+
"errors"
45
"fmt"
56

67
"github.com/felipeagger/go-redis-streams/packages/event"
@@ -21,6 +22,10 @@ func (h *viewHandler) Handle(e event.Event) error {
2122
return fmt.Errorf("incorrect event type")
2223
}
2324

25+
if view.UserID == 5 {
26+
return errors.New("Falhou")
27+
}
28+
2429
fmt.Printf("completed view %+v UserID: %v Extra:%v \n", view, view.UserID, view.Extra)
2530

2631
return nil

consumer/main.go

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ package main
22

33
import (
44
"fmt"
5+
"log"
6+
"os"
7+
"os/signal"
8+
"strings"
59
"sync"
10+
"syscall"
611

712
"github.com/felipeagger/go-redis-streams/consumer/handler"
813
"github.com/felipeagger/go-redis-streams/packages/event"
@@ -11,12 +16,15 @@ import (
1116
)
1217

1318
const (
14-
streamName = "events"
19+
streamName = "events"
20+
consumerGroup = "consumersOne"
21+
consumerName = "consumerOne"
1522
)
1623

1724
var (
18-
mutex sync.Mutex
19-
start string = "-"
25+
waitGrp sync.WaitGroup
26+
mutex sync.Mutex
27+
start string = ">" //"0" //"0-0" //"-"
2028
)
2129

2230
func main() {
@@ -25,12 +33,30 @@ func main() {
2533
panic(err)
2634
}
2735

28-
fmt.Printf("Initializing consumer on Stream: %v ...\n", streamName)
36+
createConsumerGroup(client)
2937

38+
fmt.Printf("Initializing consumerGroup: %v on Stream: %v ...\n", consumerGroup, streamName)
3039
go consumeEvents(client)
3140

32-
quit := make(chan bool)
33-
<-quit
41+
//Gracefully end
42+
chanOS := make(chan os.Signal)
43+
signal.Notify(chanOS, syscall.SIGINT, syscall.SIGTERM)
44+
<-chanOS
45+
46+
waitGrp.Wait()
47+
client.Close()
48+
}
49+
50+
func createConsumerGroup(client *redis.Client) {
51+
52+
if _, err := client.XGroupCreateMkStream(streamName, consumerGroup, "0").Result(); err != nil {
53+
54+
if !strings.Contains(fmt.Sprint(err), "BUSYGROUP") {
55+
fmt.Printf("Error on create Consumer Group: %v ...\n", consumerGroup)
56+
panic(err)
57+
}
58+
59+
}
3460
}
3561

3662
// start consume events
@@ -39,13 +65,31 @@ func consumeEvents(client *redis.Client) {
3965
for {
4066
func() {
4167

42-
redisRange, err := client.XRange(streamName, start, "+").Result()
68+
/*
69+
redisRange, err := client.XRange(streamName, start, "+").Result()
70+
if err != nil {
71+
panic(err)
72+
}
73+
*/
74+
75+
streams, err := client.XReadGroup(&redis.XReadGroupArgs{
76+
Streams: []string{streamName, start},
77+
Group: consumerGroup,
78+
Consumer: consumerName,
79+
//NoAck: true,
80+
Count: 10,
81+
Block: 0, // Wait for new messages without a timeout.
82+
}).Result()
4383
if err != nil {
44-
panic(err)
84+
log.Printf("err: %+v\n", err)
85+
return
4586
}
4687

47-
for _, stream := range redisRange {
88+
fmt.Println("new round")
89+
90+
for _, stream := range streams[0].Messages {
4891

92+
waitGrp.Add(1)
4993
go processStream(stream, client, handler.HandlerFactory())
5094

5195
}
@@ -56,6 +100,7 @@ func consumeEvents(client *redis.Client) {
56100
}
57101

58102
func processStream(stream redis.XMessage, client *redis.Client, handlerFactory func(t event.Type) handler.Handler) {
103+
defer waitGrp.Done()
59104

60105
typeEvent := stream.Values["type"].(string)
61106
newEvent, _ := event.New(event.Type(typeEvent))
@@ -73,13 +118,25 @@ func processStream(stream redis.XMessage, client *redis.Client, handlerFactory f
73118
if err != nil {
74119
fmt.Printf("error on process event:%v\n", newEvent)
75120
fmt.Println(err)
121+
122+
client.XClaimJustID(&redis.XClaimArgs{
123+
Stream: streamName,
124+
Group: consumerGroup,
125+
Consumer: "consumerTwo",
126+
Messages: []string{stream.ID},
127+
//MinIdle: 5 * time.Second,
128+
})
76129
return
77130
}
78131

79132
//Delete stream from redis
80-
client.XDel(streamName, stream.ID)
81-
82-
mutex.Lock()
83-
start = stream.ID
84-
mutex.Unlock()
133+
//client.XDel(streamName, stream.ID)
134+
client.XAck(streamName, consumerGroup, stream.ID)
135+
136+
/*
137+
mutex.Lock()
138+
start = stream.ID
139+
//fmt.Printf("new start: %v \n", start)
140+
mutex.Unlock()
141+
*/
85142
}

producer/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ func main() {
2424
}
2525

2626
func generateEvent(client *redis.Client) {
27+
var userID uint64 = 0
2728
for i := 0; i < 10; i++ {
2829

2930
eventType := []event.Type{event.ViewType, event.LikeType}[rand.Intn(2)]
3031
extra := []string{"test", "gopher", "streams"}[rand.Intn(3)]
31-
userID := uint64(rand.Intn(1000))
32+
userID++ //uint64(rand.Intn(1000))
3233

3334
strCMD := client.XAdd(&redis.XAddArgs{
3435
Stream: streamName,

0 commit comments

Comments
 (0)