Skip to content

Commit 0c63d15

Browse files
committed
mongo
1 parent e2a1a4c commit 0c63d15

File tree

5 files changed

+77
-70
lines changed

5 files changed

+77
-70
lines changed

keeper/keeper.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package keeper
2+
3+
import (
4+
"gopkg.in/mgo.v2"
5+
)
6+
7+
const SubscribersDatabase string = "subscribers"
8+
9+
type Keeper struct {
10+
mongo *mgo.Session
11+
databaseName string
12+
}
13+
14+
func NewKeeper(session *mgo.Session) *Keeper {
15+
return &Keeper{session, SubscribersDatabase}
16+
}
17+
18+
func (k *Keeper) SetCustomDatabaseName(name string) {
19+
k.databaseName = name
20+
}
21+
22+
func (k *Keeper) StoreSubscriber(pluginName string, data interface{}) error {
23+
return k.mongo.DB(k.databaseName).C("stored_" + pluginName).Insert(data)
24+
}
25+
26+
func (k *Keeper) RemoveSubscriber(pluginName string, data interface{}) error {
27+
return k.mongo.DB(k.databaseName).C("stored_" + pluginName).Remove(data)
28+
}
29+
30+
func (k *Keeper) GetSubscribers(pluginName string, data interface{}) error {
31+
return k.mongo.DB(k.databaseName).C("stored_" + pluginName).Find(nil).All(data)
32+
}

keeper/master.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ func main() {
3939
panic(err)
4040
}
4141

42-
master := keeper.NewMongoMaster(msession)
42+
kpr := keeper.NewKeeper(msession)
4343

4444
pluginStorage := plugins.NewPluginStorage()
45-
pluginStorage.Register(web.NewPluginWeb(master.GetCollection("web")))
45+
pluginStorage.Register(web.NewPluginWeb(kpr))
4646
pluginStorage.Register(logPlugin.NewLog())
4747

4848
eventService := event.NewEventService(eventRmq)

plugins/web/plugin.go

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,32 @@ import (
1010

1111
log "github.com/Sirupsen/logrus"
1212
"github.com/qa-dev/universe/event"
13+
"github.com/qa-dev/universe/keeper"
1314
"github.com/qa-dev/universe/plugins"
14-
"gopkg.in/mgo.v2"
15-
"gopkg.in/mgo.v2/bson"
1615
)
1716

18-
const PLUGIN_TAG string = "web"
17+
const PluginTag string = "web"
1918

2019
type PluginWeb struct {
21-
client HttpRequester
22-
collection *mgo.Collection
20+
client HttpRequester
21+
keeper *keeper.Keeper
2322
}
2423

2524
type HttpRequester interface {
2625
Do(req *http.Request) (*http.Response, error)
2726
}
2827

29-
func NewPluginWeb(collection *mgo.Collection) *PluginWeb {
28+
func NewPluginWeb(kpr *keeper.Keeper) *PluginWeb {
3029
httpClient := &http.Client{
3130
Timeout: time.Second * 10,
3231
}
33-
return &PluginWeb{httpClient, collection}
32+
return &PluginWeb{httpClient, kpr}
3433
}
3534

3635
func (p *PluginWeb) GetPluginInfo() *plugins.PluginInfo {
3736
return &plugins.PluginInfo{
3837
Name: "Web",
39-
Tag: PLUGIN_TAG,
38+
Tag: PluginTag,
4039
Version: 1,
4140
}
4241
}
@@ -49,7 +48,7 @@ func (p *PluginWeb) Subscribe(input []byte) error {
4948
errorText := fmt.Sprintf("Invalid input data: %s", string(input))
5049
return errors.New(errorText)
5150
}
52-
return p.collection.Insert(subscribeData)
51+
return p.keeper.StoreSubscriber(PluginTag, subscribeData)
5352
}
5453

5554
func (p *PluginWeb) Unsubscribe(input []byte) error {
@@ -60,12 +59,12 @@ func (p *PluginWeb) Unsubscribe(input []byte) error {
6059
errorText := fmt.Sprintf("Invalid input data: %s", string(input))
6160
return errors.New(errorText)
6261
}
63-
return p.collection.Remove(unsubscribeData)
62+
return p.keeper.RemoveSubscriber(PluginTag, unsubscribeData)
6463
}
6564

6665
func (p *PluginWeb) ProcessEvent(eventData event.Event) {
6766
var result []SubscribeData
68-
p.collection.Find(bson.M{"eventname": eventData.Name}).All(&result)
67+
p.keeper.GetSubscribers(PluginTag, &result)
6968
for _, data := range result {
7069
go p.sendRequest(data.Url, eventData.Payload)
7170
}
@@ -82,16 +81,4 @@ func (p *PluginWeb) sendRequest(url string, payload []byte) {
8281
p.client.Do(req)
8382
}
8483

85-
func (p *PluginWeb) Loaded() {
86-
index := mgo.Index{
87-
Key: []string{"eventname", "url"},
88-
Unique: true,
89-
DropDups: true,
90-
Background: true,
91-
Sparse: true,
92-
}
93-
err := p.collection.EnsureIndex(index)
94-
if err != nil {
95-
panic(err)
96-
}
97-
}
84+
func (p *PluginWeb) Loaded() {}

plugins/web/plugin_test.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import (
1010
"time"
1111

1212
"github.com/qa-dev/universe/event"
13+
"github.com/qa-dev/universe/keeper"
1314
"github.com/stretchr/testify/assert"
1415
"gopkg.in/mgo.v2"
15-
"gopkg.in/mgo.v2/bson"
1616
)
1717

1818
type FakeClosingBuffer struct {
@@ -44,6 +44,7 @@ func (c FakePostClient) Do(r *http.Request) (*http.Response, error) {
4444
return response, nil
4545
}
4646

47+
var kpr *keeper.Keeper
4748
var database *mgo.Database
4849

4950
func init() {
@@ -53,85 +54,87 @@ func init() {
5354
panic("cant connect mongo")
5455
}
5556
database = session.DB("test_plugin_web")
56-
database.DropDatabase()
57+
kpr = keeper.NewKeeper(session)
58+
kpr.SetCustomDatabaseName("test_plugin_web")
5759
}
5860

5961
func TestNewPluginWeb(t *testing.T) {
60-
c := database.C("test_new_plugin_web")
61-
p := NewPluginWeb(c)
62-
assert.NotNil(t, p.collection)
62+
p := NewPluginWeb(kpr)
63+
assert.NotNil(t, p.keeper)
6364
}
6465

6566
func TestPluginWeb_GetPluginInfo(t *testing.T) {
66-
c := database.C("test_plugin_web_get_plugin_info")
67-
p := NewPluginWeb(c)
67+
p := NewPluginWeb(kpr)
6868
assert.Equal(t, "web", p.GetPluginInfo().Tag)
6969
assert.Equal(t, "Web", p.GetPluginInfo().Name)
7070
}
7171

7272
func TestPluginWeb_Subscribe(t *testing.T) {
73-
c := database.C("test_plugin_web_subscribe")
74-
p := NewPluginWeb(c)
73+
database.DropDatabase()
74+
p := NewPluginWeb(kpr)
7575
inJson := []byte(`{"event_name": "test", "url": "hello"}`)
7676
err := p.Subscribe(inJson)
7777
assert.NoError(t, err)
78-
resCount, err := c.Find(bson.M{"eventname": "test"}).Count()
78+
var result []SubscribeData
79+
err = kpr.GetSubscribers(PluginTag, &result)
7980
assert.NoError(t, err)
80-
assert.Equal(t, 1, resCount)
81+
assert.Equal(t, 1, len(result))
8182
}
8283

8384
func TestPluginWeb_Subscribe_WrongInput(t *testing.T) {
84-
c := database.C("test_plugin_web_wrong_input")
85-
p := NewPluginWeb(c)
85+
p := NewPluginWeb(kpr)
8686
inJson := []byte("{}")
8787
err := p.Subscribe(inJson)
8888
assert.Error(t, err)
8989
}
9090

9191
func TestPluginWeb_Unsubscribe(t *testing.T) {
92-
c := database.C("test_plugin_web_unsubscribe")
93-
p := NewPluginWeb(c)
92+
database.DropDatabase()
93+
p := NewPluginWeb(kpr)
9494
inJson := []byte(`{"event_name": "test", "url": "hello"}`)
9595
err := p.Subscribe(inJson)
9696
assert.NoError(t, err)
97-
resCount, err := c.Find(bson.M{"eventname": "test"}).Count()
97+
var result []SubscribeData
98+
err = kpr.GetSubscribers(PluginTag, &result)
9899
assert.NoError(t, err)
99-
assert.Equal(t, 1, resCount)
100+
assert.Equal(t, 1, len(result))
100101
err = p.Unsubscribe(inJson)
101102
assert.NoError(t, err)
102-
resCount, err = c.Find(bson.M{"eventname": "test"}).Count()
103+
result = nil
104+
err = kpr.GetSubscribers(PluginTag, &result)
103105
assert.NoError(t, err)
104-
assert.Equal(t, 0, resCount)
106+
assert.Equal(t, 0, len(result))
105107
}
106108

107109
func TestPluginWeb_Unsubscribe_WrongInput(t *testing.T) {
108-
c := database.C("test_plugin_web_unsubscribe_wrong_input")
109-
p := NewPluginWeb(c)
110+
p := NewPluginWeb(kpr)
110111
inJson := []byte("{}")
111112
err := p.Unsubscribe(inJson)
112113
assert.Error(t, err)
113114
}
114115

115116
func TestPluginWeb_Unsubscribe_NonExistentSubscriber(t *testing.T) {
116-
c := database.C("test_plugin_web_non_existent_subscriber")
117-
p := NewPluginWeb(c)
117+
database.DropDatabase()
118+
p := NewPluginWeb(kpr)
118119
subscribeJson := []byte(`{"event_name": "test", "url": "hello"}`)
119120
err := p.Subscribe(subscribeJson)
120121
assert.NoError(t, err)
121-
resCount, err := c.Find(bson.M{"eventname": "test"}).Count()
122+
var result []SubscribeData
123+
err = kpr.GetSubscribers(PluginTag, &result)
122124
assert.NoError(t, err)
123-
assert.Equal(t, 1, resCount)
125+
assert.Equal(t, 1, len(result))
124126
unsubscribeJson := []byte(`{"event_name": "test", "url": "bye"}`)
125127
err = p.Unsubscribe(unsubscribeJson)
126128
assert.Error(t, err)
127-
resCount, err = c.Find(bson.M{"eventname": "test"}).Count()
129+
result = nil
130+
err = kpr.GetSubscribers(PluginTag, &result)
128131
assert.NoError(t, err)
129-
assert.Equal(t, 1, resCount)
132+
assert.Equal(t, 1, len(result))
130133
}
131134

132135
func TestPluginWeb_ProcessEvent(t *testing.T) {
133-
c := database.C("test_plugin_web_process_event")
134-
p := NewPluginWeb(c)
136+
database.DropDatabase()
137+
p := NewPluginWeb(kpr)
135138
expectedUrl := "test_url"
136139
expectedData := []byte(`{"hello": "world"}`)
137140
fakeClient := FakePostClient{t, expectedUrl, expectedData}

0 commit comments

Comments
 (0)