Skip to content

Commit 5361d9e

Browse files
committed
Move router setup to pkg, add some cleanups
1 parent 7a54282 commit 5361d9e

File tree

3 files changed

+144
-138
lines changed

3 files changed

+144
-138
lines changed

cmd/main.go

Lines changed: 2 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
11
package main
22

33
import (
4-
"math/rand"
5-
"time"
6-
7-
"github.com/m110/webhooks/pkg"
4+
"github.com/ThreeDotsLabs/event-driven-example/pkg"
85

96
"github.com/ThreeDotsLabs/watermill"
107
"github.com/ThreeDotsLabs/watermill/components/metrics"
118
"github.com/ThreeDotsLabs/watermill/message"
12-
"github.com/ThreeDotsLabs/watermill/message/infrastructure/amqp"
13-
"github.com/ThreeDotsLabs/watermill/message/infrastructure/http"
14-
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
159
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
1610
)
1711

@@ -31,7 +25,7 @@ func main() {
3125
metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
3226
metricsBuilder.AddPrometheusRouterMetrics(router)
3327

34-
err = setupRouter(router, config, logger)
28+
err = pkg.SetupRouter(router, config, logger)
3529
if err != nil {
3630
panic(err)
3731
}
@@ -41,132 +35,3 @@ func main() {
4135
panic(err)
4236
}
4337
}
44-
45-
func setupRouter(router *message.Router, c pkg.Config, logger watermill.LoggerAdapter) error {
46-
amqpPublisher, err := amqp.NewPublisher(amqp.NewDurableQueueConfig(c.AMQPURI), logger)
47-
if err != nil {
48-
return err
49-
}
50-
51-
amqpConfig := amqp.NewDurableQueueConfig(c.AMQPURI)
52-
amqpSubscriber, err := amqp.NewSubscriber(amqpConfig, logger)
53-
if err != nil {
54-
return err
55-
}
56-
57-
httpConfig := http.SubscriberConfig{
58-
UnmarshalMessageFunc: http.DefaultUnmarshalMessageFunc,
59-
}
60-
httpSubscriber, err := http.NewSubscriber(c.BindAddr, httpConfig, logger)
61-
if err != nil {
62-
return err
63-
}
64-
65-
kafkaPublisher, err := kafka.NewPublisher(
66-
c.KafkaBrokers,
67-
kafka.DefaultMarshaler{},
68-
nil,
69-
logger)
70-
if err != nil {
71-
return err
72-
}
73-
74-
kafkaConfig := kafka.SubscriberConfig{
75-
Brokers: c.KafkaBrokers,
76-
}
77-
kafkaSubscriber, err := kafka.NewSubscriber(
78-
kafkaConfig,
79-
nil,
80-
kafka.DefaultMarshaler{},
81-
logger)
82-
if err != nil {
83-
return err
84-
}
85-
86-
grafanaPublisher, err := http.NewPublisher(
87-
http.PublisherConfig{
88-
MarshalMessageFunc: pkg.GrafanaMarshaller(c.GrafanaCredentials),
89-
}, logger)
90-
if err != nil {
91-
return err
92-
}
93-
94-
router.AddHandler(
95-
"http-to-kafka",
96-
"/",
97-
httpSubscriber,
98-
c.KafkaTopic,
99-
kafkaPublisher,
100-
pkg.GithubWebhookHandler,
101-
)
102-
103-
router.AddHandler(
104-
"rabbitmq-to-kafka",
105-
c.AMQPQueue,
106-
amqpSubscriber,
107-
c.KafkaTopic,
108-
kafkaPublisher,
109-
pkg.AMQPHandler,
110-
)
111-
112-
router.AddHandler(
113-
"kafka-to-grafana",
114-
c.KafkaTopic,
115-
kafkaSubscriber,
116-
c.GrafanaURL+"/api/annotations",
117-
grafanaPublisher,
118-
pkg.GrafanaHandler,
119-
)
120-
121-
// Simulate commit deploys with delays
122-
stagingDelay := time.Second * time.Duration(rand.Intn(60)+30)
123-
productionDelay := stagingDelay + time.Second*time.Duration(rand.Intn(120)+60)
124-
125-
router.AddHandler(
126-
"deploy-staging-simulator",
127-
c.KafkaTopic,
128-
kafkaSubscriber,
129-
c.AMQPQueue,
130-
amqpPublisher,
131-
pkg.DeploySimulator{"staging", stagingDelay}.Handle,
132-
)
133-
134-
router.AddHandler(
135-
"deploy-production-simulator",
136-
c.KafkaTopic,
137-
kafkaSubscriber,
138-
c.AMQPQueue,
139-
amqpPublisher,
140-
pkg.DeploySimulator{"production", productionDelay}.Handle,
141-
)
142-
143-
if c.SlackWebhookURL != "" {
144-
slackPublisher, err := http.NewPublisher(
145-
http.PublisherConfig{
146-
MarshalMessageFunc: pkg.SlackMarshaller,
147-
}, logger)
148-
if err != nil {
149-
return err
150-
}
151-
152-
router.AddHandler(
153-
"kafka-to-slack",
154-
c.KafkaTopic,
155-
kafkaSubscriber,
156-
c.SlackWebhookURL,
157-
slackPublisher,
158-
pkg.SlackHandler,
159-
)
160-
}
161-
162-
go func() {
163-
// Start HTTP server only after the router is running
164-
<-router.Running()
165-
err = httpSubscriber.StartHTTPServer()
166-
if err != nil {
167-
panic(err)
168-
}
169-
}()
170-
171-
return nil
172-
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module github.com/m110/webhooks
1+
module github.com/ThreeDotsLabs/event-driven-example
22

33
require (
44
github.com/DataDog/zstd v1.3.5 // indirect

pkg/router.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package pkg
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
7+
"github.com/ThreeDotsLabs/watermill"
8+
"github.com/ThreeDotsLabs/watermill/message"
9+
"github.com/ThreeDotsLabs/watermill/message/infrastructure/amqp"
10+
"github.com/ThreeDotsLabs/watermill/message/infrastructure/http"
11+
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
12+
)
13+
14+
func SetupRouter(router *message.Router, c Config, logger watermill.LoggerAdapter) error {
15+
amqpPublisher, err := amqp.NewPublisher(amqp.NewDurableQueueConfig(c.AMQPURI), logger)
16+
if err != nil {
17+
return err
18+
}
19+
20+
amqpConfig := amqp.NewDurableQueueConfig(c.AMQPURI)
21+
amqpSubscriber, err := amqp.NewSubscriber(amqpConfig, logger)
22+
if err != nil {
23+
return err
24+
}
25+
26+
httpConfig := http.SubscriberConfig{
27+
UnmarshalMessageFunc: http.DefaultUnmarshalMessageFunc,
28+
}
29+
httpSubscriber, err := http.NewSubscriber(c.BindAddr, httpConfig, logger)
30+
if err != nil {
31+
return err
32+
}
33+
34+
kafkaPublisher, err := kafka.NewPublisher(
35+
c.KafkaBrokers,
36+
kafka.DefaultMarshaler{},
37+
nil,
38+
logger)
39+
if err != nil {
40+
return err
41+
}
42+
43+
kafkaConfig := kafka.SubscriberConfig{
44+
Brokers: c.KafkaBrokers,
45+
}
46+
kafkaSubscriber, err := kafka.NewSubscriber(
47+
kafkaConfig,
48+
nil,
49+
kafka.DefaultMarshaler{},
50+
logger)
51+
if err != nil {
52+
return err
53+
}
54+
55+
grafanaPublisher, err := http.NewPublisher(
56+
http.PublisherConfig{
57+
MarshalMessageFunc: GrafanaMarshaller(c.GrafanaCredentials),
58+
}, logger)
59+
if err != nil {
60+
return err
61+
}
62+
63+
router.AddHandler(
64+
"http-to-kafka",
65+
"/",
66+
httpSubscriber,
67+
c.KafkaTopic,
68+
kafkaPublisher,
69+
GithubWebhookHandler,
70+
)
71+
72+
router.AddHandler(
73+
"rabbitmq-to-kafka",
74+
c.AMQPQueue,
75+
amqpSubscriber,
76+
c.KafkaTopic,
77+
kafkaPublisher,
78+
AMQPHandler,
79+
)
80+
81+
router.AddHandler(
82+
"kafka-to-grafana",
83+
c.KafkaTopic,
84+
kafkaSubscriber,
85+
c.GrafanaURL+"/api/annotations",
86+
grafanaPublisher,
87+
GrafanaHandler,
88+
)
89+
90+
if c.SlackWebhookURL != "" {
91+
slackPublisher, err := http.NewPublisher(
92+
http.PublisherConfig{
93+
MarshalMessageFunc: SlackMarshaller,
94+
}, logger)
95+
if err != nil {
96+
return err
97+
}
98+
99+
router.AddHandler(
100+
"kafka-to-slack",
101+
c.KafkaTopic,
102+
kafkaSubscriber,
103+
c.SlackWebhookURL,
104+
slackPublisher,
105+
SlackHandler,
106+
)
107+
}
108+
109+
// Simulate commit deploys with delays
110+
stagingDelay := time.Second * time.Duration(rand.Intn(60)+30)
111+
productionDelay := stagingDelay + time.Second*time.Duration(rand.Intn(120)+60)
112+
113+
router.AddHandler(
114+
"deploy-staging-simulator",
115+
c.KafkaTopic,
116+
kafkaSubscriber,
117+
c.AMQPQueue,
118+
amqpPublisher,
119+
DeploySimulator{"staging", stagingDelay}.Handle,
120+
)
121+
122+
router.AddHandler(
123+
"deploy-production-simulator",
124+
c.KafkaTopic,
125+
kafkaSubscriber,
126+
c.AMQPQueue,
127+
amqpPublisher,
128+
DeploySimulator{"production", productionDelay}.Handle,
129+
)
130+
131+
go func() {
132+
// Start HTTP server only after the router is running
133+
<-router.Running()
134+
err = httpSubscriber.StartHTTPServer()
135+
if err != nil {
136+
panic(err)
137+
}
138+
}()
139+
140+
return nil
141+
}

0 commit comments

Comments
 (0)