Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions conf/gbe_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package conf
import (
"encoding/json"
"io/ioutil"
"sync"
)

type GbeConfig struct {
Expand Down Expand Up @@ -56,20 +55,16 @@ type RestServerConfig struct {
Addr string `json:"addr"`
}

var config GbeConfig
var configOnce sync.Once
var Config *GbeConfig

func GetConfig() *GbeConfig {
configOnce.Do(func() {
bytes, err := ioutil.ReadFile("conf.json")
if err != nil {
panic(err)
}

err = json.Unmarshal(bytes, &config)
if err != nil {
panic(err)
}
})
return &config
func Init() {
bytes, err := ioutil.ReadFile("conf.json")
if err != nil {
panic(err)
}
err = json.Unmarshal(bytes, &Config)
if err != nil {
panic(err)
}
}

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ require (
go.uber.org/zap v1.10.0 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/stretchr/testify.v1 v1.2.2 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2 h1:wBORZD4gvEKK0tG
github.com/pingcap/kvproto v0.0.0-20190904075355-9a1bd6a31da2/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 h1:U9Kdnknj4n2v76Mg7wazevZ5N9U1OIaMwSNRVLEcLX0=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.18+incompatible h1:ytgHQQlEwvuYWN6gKjivUq8QOL7TIeaPfb5kapaaFNo=
github.com/pingcap/tidb v2.0.11+incompatible h1:Shz+ry1DzQNsPk1QAejnM+5tgjbwZuzPnIER5aCjQ6c=
github.com/pingcap/tidb v2.0.11+incompatible/go.mod h1:I8C6jrPINP2rrVunTRd7C9fRRhQrtR43S1/CL5ix/yQ=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw=
Expand Down Expand Up @@ -348,6 +349,8 @@ gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3M
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/stretchr/testify.v1 v1.2.2 h1:yhQC6Uy5CqibAIlk1wlusa/MJ3iAN49/BsR/dCCKz3M=
gopkg.in/stretchr/testify.v1 v1.2.2/go.mod h1:QI5V/q6UbPmuhtm10CaFZxED9NreB8PnFYN9JcR6TxU=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
Expand Down
14 changes: 8 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package main

import (
"net/http"
_ "net/http/pprof"

"github.com/gitbitex/gitbitex-spot/conf"
"github.com/gitbitex/gitbitex-spot/matching"
"github.com/gitbitex/gitbitex-spot/models"
Expand All @@ -23,12 +26,11 @@ import (
"github.com/gitbitex/gitbitex-spot/service"
"github.com/gitbitex/gitbitex-spot/worker"
"github.com/prometheus/common/log"
"net/http"
_ "net/http/pprof"
)

func main() {
gbeConfig := conf.GetConfig()
// gbeConfig := conf.GetConfig()
conf.Init()

go func() {
log.Info(http.ListenAndServe("localhost:6060", nil))
Expand All @@ -47,9 +49,9 @@ func main() {
panic(err)
}
for _, product := range products {
worker.NewTickMaker(product.Id, matching.NewKafkaLogReader("tickMaker", product.Id, gbeConfig.Kafka.Brokers)).Start()
worker.NewFillMaker(matching.NewKafkaLogReader("fillMaker", product.Id, gbeConfig.Kafka.Brokers)).Start()
worker.NewTradeMaker(matching.NewKafkaLogReader("tradeMaker", product.Id, gbeConfig.Kafka.Brokers)).Start()
worker.NewTickMaker(product.Id, matching.NewKafkaLogReader("tickMaker", product.Id, conf.Config.Kafka.Brokers)).Start()
worker.NewFillMaker(matching.NewKafkaLogReader("fillMaker", product.Id, conf.Config.Kafka.Brokers)).Start()
worker.NewTradeMaker(matching.NewKafkaLogReader("tradeMaker", product.Id, conf.Config.Kafka.Brokers)).Start()
}

rest.StartServer()
Expand Down
6 changes: 2 additions & 4 deletions matching/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ import (
)

func StartEngine() {
gbeConfig := conf.GetConfig()

products, err := service.GetProducts()
if err != nil {
panic(err)
}
for _, product := range products {
orderReader := NewKafkaOrderReader(product.Id, gbeConfig.Kafka.Brokers)
orderReader := NewKafkaOrderReader(product.Id, conf.Config.Kafka.Brokers)
snapshotStore := NewRedisSnapshotStore(product.Id)
logStore := NewKafkaLogStore(product.Id, gbeConfig.Kafka.Brokers)
logStore := NewKafkaLogStore(product.Id, conf.Config.Kafka.Brokers)
matchEngine := NewEngine(product, orderReader, logStore, snapshotStore)
matchEngine.Start()
}
Expand Down
5 changes: 2 additions & 3 deletions matching/redis_snapshot_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ type RedisSnapshotStore struct {
}

func NewRedisSnapshotStore(productId string) SnapshotStore {
gbeConfig := conf.GetConfig()

redisClient := redis.NewClient(&redis.Options{
Addr: gbeConfig.Redis.Addr,
Password: gbeConfig.Redis.Password,
Addr: conf.Config.Redis.Addr,
Password: conf.Config.Redis.Password,
DB: 0,
})

Expand Down
17 changes: 7 additions & 10 deletions models/binlog_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ type BinLogStream struct {
}

func NewBinLogStream() *BinLogStream {
gbeConfig := conf.GetConfig()

redisClient := redis.NewClient(&redis.Options{
Addr: gbeConfig.Redis.Addr,
Password: gbeConfig.Redis.Password,
Addr: conf.Config.Redis.Addr,
Password: conf.Config.Redis.Password,
DB: 0,
})

Expand Down Expand Up @@ -158,16 +156,15 @@ func (s *BinLogStream) getColumnIndexByName(e *canal.RowsEvent, name string) int
}

func (s *BinLogStream) Start() {
gbeConfig := conf.GetConfig()

cfg := canal.NewDefaultConfig()
cfg.Addr = gbeConfig.DataSource.Addr
cfg.User = gbeConfig.DataSource.User
cfg.Password = gbeConfig.DataSource.Password
cfg.Addr = conf.Config.DataSource.Addr
cfg.User = conf.Config.DataSource.User
cfg.Password = conf.Config.DataSource.Password
cfg.Dump.ExecutionPath = ""
cfg.Dump.TableDB = gbeConfig.DataSource.Database
cfg.Dump.TableDB = conf.Config.DataSource.Database
cfg.ParseTime = true
cfg.IncludeTableRegex = []string{gbeConfig.DataSource.Database + "\\..*"}
cfg.IncludeTableRegex = []string{conf.Config.DataSource.Database + "\\..*"}
cfg.ExcludeTableRegex = []string{"mysql\\..*"}
c, err := canal.NewCanal(cfg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion models/mysql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewStore(db *gorm.DB) *Store {
}

func initDb() error {
cfg := conf.GetConfig()
cfg := conf.Config

url := fmt.Sprintf("%v:%v@tcp(%v)/%v?charset=utf8&parseTime=True&loc=Local",
cfg.DataSource.User, cfg.DataSource.Password, cfg.DataSource.Addr, cfg.DataSource.Database)
Expand Down
9 changes: 4 additions & 5 deletions pushing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

func StartServer() {
gbeConfig := conf.GetConfig()

sub := newSubscription()

Expand All @@ -33,12 +32,12 @@ func StartServer() {
panic(err)
}
for _, product := range products {
newTickerStream(product.Id, sub, matching.NewKafkaLogReader("tickerStream", product.Id, gbeConfig.Kafka.Brokers)).Start()
newMatchStream(product.Id, sub, matching.NewKafkaLogReader("matchStream", product.Id, gbeConfig.Kafka.Brokers)).Start()
newOrderBookStream(product.Id, sub, matching.NewKafkaLogReader("orderBookStream", product.Id, gbeConfig.Kafka.Brokers)).Start()
newTickerStream(product.Id, sub, matching.NewKafkaLogReader("tickerStream", product.Id, conf.Config.Kafka.Brokers)).Start()
newMatchStream(product.Id, sub, matching.NewKafkaLogReader("matchStream", product.Id, conf.Config.Kafka.Brokers)).Start()
newOrderBookStream(product.Id, sub, matching.NewKafkaLogReader("orderBookStream", product.Id, conf.Config.Kafka.Brokers)).Start()
}

go NewServer(gbeConfig.PushServer.Addr, gbeConfig.PushServer.Path, sub).Run()
go NewServer(conf.Config.PushServer.Addr, conf.Config.PushServer.Path, sub).Run()

log.Info("websocket server ok")
}
5 changes: 2 additions & 3 deletions pushing/order_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,10 @@ var onceStore sync.Once

func sharedSnapshotStore() *redisSnapshotStore {
onceStore.Do(func() {
gbeConfig := conf.GetConfig()

redisClient := redis.NewClient(&redis.Options{
Addr: gbeConfig.Redis.Addr,
Password: gbeConfig.Redis.Password,
Addr: conf.Config.Redis.Addr,
Password: conf.Config.Redis.Password,
DB: 0,
})

Expand Down
5 changes: 2 additions & 3 deletions pushing/redis_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ func newRedisStream(sub *subscription) *redisStream {
}

func (s *redisStream) Start() {
gbeConfig := conf.GetConfig()

redisClient := redis.NewClient(&redis.Options{
Addr: gbeConfig.Redis.Addr,
Password: gbeConfig.Redis.Password,
Addr: conf.Config.Redis.Addr,
Password: conf.Config.Redis.Password,
DB: 0,
})
_, err := redisClient.Ping().Result()
Expand Down
3 changes: 1 addition & 2 deletions rest/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
)

func StartServer() {
gbeConfig := conf.GetConfig()

httpServer := NewHttpServer(gbeConfig.RestServer.Addr)
httpServer := NewHttpServer(conf.Config.RestServer.Addr)
go httpServer.Start()

log.Info("rest server ok")
Expand Down
4 changes: 1 addition & 3 deletions rest/order_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ func getWriter(productId string) *kafka.Writer {
return writer.(*kafka.Writer)
}

gbeConfig := conf.GetConfig()

newWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: gbeConfig.Kafka.Brokers,
Brokers: conf.Config.Kafka.Brokers,
Topic: matching.TopicOrderPrefix + productId,
Balancer: &kafka.LeastBytes{},
BatchTimeout: 5 * time.Millisecond,
Expand Down
4 changes: 2 additions & 2 deletions service/user_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func RefreshAccessToken(email, password string) (string, error) {
"expiredAt": time.Now().Unix(),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claim)
return token.SignedString([]byte(conf.GetConfig().JwtSecret))
return token.SignedString([]byte(conf.Config.JwtSecret))
}

func CheckToken(tokenStr string) (*models.User, error) {
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (interface{}, error) {
return []byte(conf.GetConfig().JwtSecret), nil
return []byte(conf.Config.JwtSecret), nil
})
if err != nil {
return nil, err
Expand Down
6 changes: 2 additions & 4 deletions worker/bill_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,9 @@ func (s *BillExecutor) Start() {
}

func (s *BillExecutor) runMqListener() {
gbeConfig := conf.GetConfig()

redisClient := redis.NewClient(&redis.Options{
Addr: gbeConfig.Redis.Addr,
Password: gbeConfig.Redis.Password,
Addr: conf.Config.Redis.Addr,
Password: conf.Config.Redis.Password,
DB: 0,
})

Expand Down
5 changes: 2 additions & 3 deletions worker/fill_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ func (s *FillExecutor) Start() {

// 监听消息队列通知
func (s *FillExecutor) runMqListener() {
gbeConfig := conf.GetConfig()

redisClient := redis.NewClient(&redis.Options{
Addr: gbeConfig.Redis.Addr,
Password: gbeConfig.Redis.Password,
Addr: conf.Config.Redis.Addr,
Password: conf.Config.Redis.Password,
DB: 0,
})

Expand Down