Skip to content

Commit 096316f

Browse files
committed
Merge branch 'reinvent-rabbit' into mongo+rabbit
# Conflicts: # .travis.yml # main.go # plugins/plugin.go # plugins/storage_test.go # plugins/web/plugin.go
2 parents 0c63d15 + 823ffdf commit 096316f

File tree

18 files changed

+249
-156
lines changed

18 files changed

+249
-156
lines changed

dispatcher/dispatcher.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,45 +4,56 @@ import (
44
"encoding/json"
55
"net/http"
66

7+
log "github.com/Sirupsen/logrus"
78
"github.com/qa-dev/universe/event"
89
"github.com/qa-dev/universe/plugins"
9-
"github.com/qa-dev/universe/rabbitmq"
10+
"github.com/qa-dev/universe/queue"
1011
)
1112

1213
type ClientInterface interface {
1314
Do(req *http.Request) (*http.Response, error)
1415
}
1516

1617
type Dispatcher struct {
17-
rmq *rabbitmq.RabbitMQ
18+
queue *queue.Queue
1819
pluginStorage *plugins.PluginStorage
1920
}
2021

21-
func NewDispatcher(rmq *rabbitmq.RabbitMQ, storage *plugins.PluginStorage) *Dispatcher {
22-
return &Dispatcher{rmq, storage}
22+
func NewDispatcher(queue *queue.Queue, storage *plugins.PluginStorage) *Dispatcher {
23+
return &Dispatcher{queue, storage}
2324
}
2425

2526
func (d *Dispatcher) Run() {
2627
go d.worker()
2728
}
2829

2930
func (d *Dispatcher) worker() {
30-
consumeObj, err := d.rmq.Consume("consumer")
31+
msgs, err := d.queue.GetConsumer("consumer")
3132
if err != nil {
32-
panic(err)
33+
log.Error("Error get consumer in event dispatcher worker")
3334
}
34-
3535
for {
36-
rawData := <-consumeObj
37-
var e event.Event
38-
err = json.Unmarshal(rawData.Body(), &e)
36+
if d.queue.IsOnline() == false {
37+
log.Info("Worker lost connection to queue. Waiting...")
38+
backOnlineChan := make(chan *error)
39+
d.queue.NotifyReconnect(backOnlineChan)
40+
_ = <-backOnlineChan
41+
newMsgs, err := d.queue.GetConsumer("consumer")
42+
if err != nil {
43+
log.Error("Error get consumer in event dispatcher worker")
44+
}
45+
msgs = newMsgs
46+
log.Info("Worker established connection to queue")
47+
}
48+
data := <-msgs
49+
var ev event.Event
50+
err = json.Unmarshal(data.Body(), &ev)
3951
if err != nil {
40-
// TODO log?
41-
panic(err)
52+
data.Reject()
53+
log.Error("Error unmarchal event in event dispatcher worker")
54+
continue
4255
}
43-
44-
d.pluginStorage.ProcessEvent(e)
45-
46-
rawData.Ack(false)
56+
d.pluginStorage.ProcessEvent(&ev)
57+
data.Ack()
4758
}
4859
}

dispatcher/dispatcher_test.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
"net/http"
99
"os"
1010
"testing"
11-
"time"
1211

13-
log "github.com/Sirupsen/logrus"
1412
"github.com/qa-dev/universe/event"
1513
"github.com/qa-dev/universe/plugins"
14+
"github.com/qa-dev/universe/queue"
1615
"github.com/qa-dev/universe/rabbitmq"
1716
"github.com/qa-dev/universe/subscribe"
1817
"github.com/stretchr/testify/assert"
@@ -23,7 +22,7 @@ var amqpUri string
2322
func init() {
2423
amqpUri = os.Getenv("AMQP_URI")
2524
if amqpUri == "" {
26-
log.Fatal("AMQP_URI is required to run rabbitmq tests")
25+
amqpUri = "amqp://guest:guest@127.0.0.1:5672/"
2726
}
2827
}
2928

@@ -57,30 +56,26 @@ func (c *FakePostClient) Do(r *http.Request) (*http.Response, error) {
5756
}
5857

5958
func TestNewDispatcher(t *testing.T) {
60-
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_event_service_push_event_queue")
61-
defer rmq.Close()
59+
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_new_dispatcher")
60+
q := queue.NewQueue(rmq)
6261
storage := plugins.NewPluginStorage()
63-
// Даем время на подключение
64-
time.Sleep(5 * time.Second)
65-
dsp := NewDispatcher(rmq, storage)
66-
assert.Equal(t, fmt.Sprintf("%p", rmq), fmt.Sprintf("%p", dsp.rmq))
62+
dsp := NewDispatcher(q, storage)
63+
assert.Equal(t, fmt.Sprintf("%p", q), fmt.Sprintf("%p", dsp.queue))
6764
}
6865

6966
func TestDispatcher_Run(t *testing.T) {
70-
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_event_service_push_event_queue")
71-
defer rmq.Close()
67+
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_new_dispatcher")
68+
q := queue.NewQueue(rmq)
7269
storage := plugins.NewPluginStorage()
73-
// Даем время на подключение
74-
time.Sleep(5 * time.Second)
7570
requestData := []byte(`{"test": "test"}`)
7671
subscrService := subscribe.NewSubscribeService(storage)
77-
eventService := event.NewEventService(rmq)
72+
eventService := event.NewEventService(q)
7873
subscribeData := []byte(`{"test": "hello"}`)
7974
subscrService.ProcessSubscribe("log", subscribeData)
80-
dsp := NewDispatcher(rmq, storage)
75+
dsp := NewDispatcher(q, storage)
8176
assert.NotNil(t, dsp)
8277
dsp.Run()
83-
err := eventService.Publish(event.Event{"test.event", requestData})
78+
err := eventService.Publish(&event.Event{"test.event", requestData})
8479
assert.NoError(t, err)
8580
// TODO: assert log
8681
}

event/service.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,23 @@ import (
44
"errors"
55

66
log "github.com/Sirupsen/logrus"
7-
"github.com/qa-dev/universe/rabbitmq"
7+
"github.com/qa-dev/universe/queue"
88
)
99

1010
type EventService struct {
11-
rmq *rabbitmq.RabbitMQ
11+
queue *queue.Queue
1212
}
1313

14-
func NewEventService(rmq *rabbitmq.RabbitMQ) *EventService {
15-
return &EventService{rmq}
14+
func NewEventService(queue *queue.Queue) *EventService {
15+
return &EventService{queue}
1616
}
1717

18-
func (e *EventService) Publish(ev Event) error {
18+
func (e *EventService) Publish(ev *Event) error {
1919
if ev.Name == "" {
2020
log.Println("Got blank event name")
2121
return errors.New("BLANK EVENT NAME")
2222
}
2323
log.Println("Got event name", ev.Name)
24-
e.rmq.PublishWithPriority(ev, 1)
24+
e.queue.SendEvent(ev)
2525
return nil
2626
}

event/service_test.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"testing"
88
"time"
99

10-
log "github.com/Sirupsen/logrus"
10+
"github.com/qa-dev/universe/queue"
1111
"github.com/qa-dev/universe/rabbitmq"
1212
"github.com/stretchr/testify/assert"
1313
)
@@ -17,47 +17,47 @@ var amqpUri string
1717
func init() {
1818
amqpUri = os.Getenv("AMQP_URI")
1919
if amqpUri == "" {
20-
log.Fatal("AMQP_URI is required to run rabbitmq tests")
20+
amqpUri = "amqp://guest:guest@127.0.0.1:5672/"
2121
}
2222
}
2323

2424
func TestNewEventService(t *testing.T) {
25-
rmq := &rabbitmq.RabbitMQ{}
26-
es := NewEventService(rmq)
27-
assert.Equal(t, fmt.Sprintf("%p", rmq), fmt.Sprintf("%p", es.rmq))
25+
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_new_dispatcher")
26+
time.Sleep(2 * time.Second)
27+
q := queue.NewQueue(rmq)
28+
es := NewEventService(q)
29+
assert.Equal(t, fmt.Sprintf("%p", q), fmt.Sprintf("%p", es.queue))
2830
}
2931

3032
func TestEventService_PushEvent(t *testing.T) {
31-
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_event_service_push_event_queue")
32-
defer rmq.Close()
33-
// Даем время на подключение
34-
time.Sleep(5 * time.Second)
35-
es := NewEventService(rmq)
36-
37-
consumeObj, err := rmq.Consume("test_consumer")
38-
assert.NoError(t, err)
33+
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_new_dispatcher")
34+
time.Sleep(2 * time.Second)
35+
q := queue.NewQueue(rmq)
36+
es := NewEventService(q)
3937

4038
go func() {
41-
raw := <-consumeObj
39+
msgs, err := q.GetConsumer("test_consumer")
40+
assert.NoError(t, err)
41+
data := <-msgs
4242
var e Event
43-
err = json.Unmarshal(raw.Body(), &e)
43+
err = json.Unmarshal(data.Body(), &e)
44+
data.Ack()
4445
assert.NoError(t, err)
4546
assert.Equal(t, "test.event", e.Name, "Wrong event name generated")
4647
}()
4748

48-
err = es.Publish(Event{"test.event", []byte("test")})
49+
err := es.Publish(&Event{"test.event", []byte("test")})
4950
assert.NoError(t, err)
5051
time.Sleep(1 * time.Second)
5152
}
5253

5354
func TestEventService_PushEvent_Blank(t *testing.T) {
54-
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_event_service_push_event_queue")
55-
defer rmq.Close()
56-
// Даем время на подключение
57-
time.Sleep(5 * time.Second)
58-
es := NewEventService(rmq)
55+
rmq := rabbitmq.NewRabbitMQ(amqpUri, "test_new_dispatcher")
56+
time.Sleep(2 * time.Second)
57+
q := queue.NewQueue(rmq)
58+
es := NewEventService(q)
5959

60-
err := es.Publish(Event{"", []byte("test")})
60+
err := es.Publish(&Event{"", []byte("test")})
6161

6262
assert.Error(t, err)
6363
assert.Equal(t, "BLANK EVENT NAME", err.Error())

handlers/event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type EventHandler struct {
1414
}
1515

1616
type EventPublisher interface {
17-
Publish(event.Event) error
17+
Publish(*event.Event) error
1818
}
1919

2020
func NewEventHandler(eventService EventPublisher) *EventHandler {
@@ -36,7 +36,7 @@ func (h *EventHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
3636
}
3737

3838
e := event.Event{eventName, payload}
39-
err = h.eventService.Publish(e)
39+
err = h.eventService.Publish(&e)
4040
if err != nil {
4141
resp.WriteHeader(http.StatusInternalServerError)
4242
resp.Write([]byte(`{"error": ` + strconv.Quote(err.Error()) + `}`))

main.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"flag"
55
"fmt"
66
"net/http"
7-
"time"
87

98
log "github.com/Sirupsen/logrus"
109
"github.com/qa-dev/universe/config"
@@ -15,9 +14,10 @@ import (
1514
"github.com/qa-dev/universe/plugins"
1615
logPlugin "github.com/qa-dev/universe/plugins/log"
1716
"github.com/qa-dev/universe/plugins/web"
18-
"github.com/qa-dev/universe/rabbitmq"
1917
"github.com/qa-dev/universe/subscribe"
20-
"gopkg.in/mgo.v2"
18+
"github.com/qa-dev/universe/rabbitmq"
19+
"time"
20+
"github.com/qa-dev/universe/queue"
2121
)
2222

2323
func main() {
@@ -34,6 +34,8 @@ func main() {
3434
time.Sleep(2 * time.Second)
3535
defer eventRmq.Close()
3636

37+
eventQueue := queue.NewQueue(eventRmq)
38+
3739
msession, err := mgo.Dial(cfg.Mongo.Host + ":" + cfg.Mongo.Port)
3840
if err != nil {
3941
panic(err)
@@ -45,9 +47,9 @@ func main() {
4547
pluginStorage.Register(web.NewPluginWeb(kpr))
4648
pluginStorage.Register(logPlugin.NewLog())
4749

48-
eventService := event.NewEventService(eventRmq)
50+
eventService := event.NewEventService(eventQueue)
4951
subscribeService := subscribe.NewSubscribeService(pluginStorage)
50-
dispatcherService := dispatcher.NewDispatcher(eventRmq, pluginStorage)
52+
dispatcherService := dispatcher.NewDispatcher(eventQueue, pluginStorage)
5153
dispatcherService.Run()
5254

5355
for _, plg := range pluginStorage.GetPlugins() {

plugins/log/log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (l Log) GetPluginInfo() *plugins.PluginInfo {
2222
}
2323
}
2424

25-
func (l Log) ProcessEvent(eventData event.Event) {
25+
func (l Log) ProcessEvent(eventData *event.Event) {
2626
l.logger.Info(eventData)
2727
}
2828

plugins/log/log_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestLog_ProcessEvent(t *testing.T) {
1818
l.logger.Out = &b
1919
o.Register(l)
2020

21-
o.ProcessEvent(event.Event{"Event", []byte(`{"hello": "test"}`)})
21+
o.ProcessEvent(&event.Event{"Event", []byte(`{"hello": "test"}`)})
2222
time.Sleep(200 * time.Millisecond)
2323

2424
t.Log(b.String())

plugins/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ type Plugin interface {
88
GetPluginInfo() *PluginInfo
99
Subscribe(input []byte) error
1010
Unsubscribe(input []byte) error
11-
ProcessEvent(eventData event.Event)
11+
ProcessEvent(eventData *event.Event)
1212
Loaded()
1313
}
1414

plugins/storage.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ func (o *PluginStorage) Register(v Plugin) {
1919
o.plugins = append(o.plugins, v)
2020
}
2121

22-
func (o *PluginStorage) ProcessEvent(eventData event.Event) {
22+
func (o *PluginStorage) ProcessEvent(eventData *event.Event) {
2323
for _, ob := range o.plugins {
24-
go func(o Plugin) {
25-
o.ProcessEvent(eventData)
26-
}(ob)
24+
ob.ProcessEvent(eventData)
2725
}
2826
}
2927

0 commit comments

Comments
 (0)