Skip to content

Commit f7c06ee

Browse files
committed
fix mysql
1 parent 3ed7660 commit f7c06ee

File tree

1 file changed

+60
-41
lines changed

1 file changed

+60
-41
lines changed

reader/sql/sql.go

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ func checkMagic(rawSql string) (valid bool) {
719719
}
720720

721721
func (r *Reader) Name() string {
722-
return strings.ToUpper(r.dbtype) + "_Reader:" + r.database + "_" + Hash(r.rawsqls)
722+
return strings.ToUpper(r.dbtype) + "_Reader:" + r.rawDatabase + "_" + Hash(r.rawsqls)
723723
}
724724

725725
func (r *Reader) setStatsError(err string) {
@@ -889,7 +889,7 @@ func (r *Reader) run() {
889889
}
890890
// 开始work逻辑
891891
for {
892-
if atomic.LoadInt32(&r.status) == reader.StatusStopping {
892+
if atomic.LoadInt32(&r.status) == reader.StatusStopping || atomic.LoadInt32(&r.status) == reader.StatusStopped {
893893
log.Warnf("Runner[%v] %v stopped from running", r.meta.RunnerName, r.Name())
894894
return
895895
}
@@ -1015,7 +1015,8 @@ func (r *Reader) exec(connectStr string) (err error) {
10151015
}
10161016
err = r.execReadDB(currentDB, now, recordTablesDone)
10171017
if err != nil {
1018-
log.Errorf("Runner[%v] %v exect read db: %v error: %v", r.meta.RunnerName, currentDB, currentDB, err)
1018+
log.Errorf("Runner[%v] %v exect read db: %v error: %v,will retry read it", r.meta.RunnerName, currentDB, currentDB, err)
1019+
return err
10191020
}
10201021
if atomic.LoadInt32(&r.status) == reader.StatusStopping || atomic.LoadInt32(&r.status) == reader.StatusStopped {
10211022
log.Warnf("Runner[%v] %v stopped from running", r.meta.RunnerName, currentDB)
@@ -1049,24 +1050,15 @@ func (r *Reader) execCountDB(curDB string, now time.Time, recordTablesDone Table
10491050
if err != nil {
10501051
return err
10511052
}
1052-
db, err := openSql(r.dbtype, connectStr, curDB)
1053-
if err != nil {
1054-
return err
1055-
}
1056-
defer func() {
1057-
db.Close()
1058-
}()
1059-
if err = db.Ping(); err != nil {
1060-
return err
1061-
}
1053+
10621054
log.Infof("Runner[%v] prepare %v change database success, current database is: %v", r.meta.RunnerName, r.dbtype, curDB)
10631055

10641056
//更新sqls
10651057
var tables []string
10661058
var sqls string
10671059
if r.rawsqls == "" {
10681060
// 获取符合条件的数据表,并且将计算表中记录数的query语句赋给 r.rawsqls
1069-
tables, sqls, err = r.getDatas(db, curDB, r.rawTable, now, COUNT)
1061+
tables, sqls, err = r.getDatas(connectStr, curDB, r.rawTable, now, COUNT)
10701062
if err != nil {
10711063
return err
10721064
}
@@ -1096,7 +1088,7 @@ func (r *Reader) execCountDB(curDB string, now time.Time, recordTablesDone Table
10961088

10971089
// 每张表的记录数
10981090
var tableSize int64
1099-
tableSize, err = r.execTableCount(db, idx, curDB, rawSql)
1091+
tableSize, err = r.execTableCount(connectStr, idx, curDB, rawSql)
11001092
if err != nil {
11011093
return err
11021094
}
@@ -1118,16 +1110,7 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
11181110
if err != nil {
11191111
return err
11201112
}
1121-
db, err := openSql(r.dbtype, connectStr, r.Name())
1122-
if err != nil {
1123-
return err
1124-
}
1125-
defer func() {
1126-
db.Close()
1127-
}()
1128-
if err = db.Ping(); err != nil {
1129-
return err
1130-
}
1113+
11311114
log.Infof("Runner[%v] %v prepare %v change database success", r.meta.RunnerName, curDB, r.dbtype)
11321115
r.database = curDB
11331116

@@ -1136,7 +1119,7 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
11361119
var sqls string
11371120
if r.rawsqls == "" {
11381121
// 获取符合条件的数据表,并且将获取表中所有记录的语句赋给 r.rawsqls
1139-
tables, sqls, err = r.getDatas(db, curDB, r.rawTable, now, TABLE)
1122+
tables, sqls, err = r.getDatas(connectStr, curDB, r.rawTable, now, TABLE)
11401123
if err != nil {
11411124
log.Errorf("Runner[%v] %v rawTable: %v get tables and sqls error %v", r.meta.RunnerName, r.Name(), r.rawTable, r.rawsqls, err)
11421125
if len(tables) == 0 && sqls == "" {
@@ -1181,8 +1164,10 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
11811164
}
11821165
}
11831166
// 执行每条 sql 语句
1184-
exit, isRawSql, readSize = r.execReadSql(db, idx, rawSql, tables)
1185-
1167+
exit, isRawSql, readSize, err = r.execReadSql(connectStr, curDB, idx, rawSql, tables)
1168+
if err != nil {
1169+
return err
1170+
}
11861171
if r.rawsqls == "" {
11871172
tmpTablesRecords.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1})
11881173
r.syncRecords.SetTableRecords(curDB, tmpTablesRecords)
@@ -1640,14 +1625,25 @@ type DataQuery struct {
16401625
sqls string
16411626
}
16421627

1643-
func (r *Reader) getValidData(db *sql.DB, curDB, matchData, matchStr string,
1628+
func (r *Reader) getValidData(connectStr, curDB, matchData, matchStr string,
16441629
startIndex, endIndex, timeIndex []int, queryType int) (validData []string, sqls string, err error) {
16451630
// get all databases and check validate database
16461631
query, err := r.getQuery(queryType, curDB)
16471632
if err != nil {
16481633
return validData, sqls, err
16491634
}
16501635

1636+
db, err := openSql(r.dbtype, connectStr, r.Name())
1637+
if err != nil {
1638+
return nil, "", err
1639+
}
1640+
defer func() {
1641+
db.Close()
1642+
}()
1643+
if err = db.Ping(); err != nil {
1644+
return nil, "", err
1645+
}
1646+
16511647
rowsDBs, err := db.Query(query)
16521648
if err != nil {
16531649
log.Errorf("Runner[%v] %v prepare %v <%v> query error %v", r.meta.RunnerName, curDB, r.dbtype, query, err)
@@ -1785,7 +1781,7 @@ func getDefaultSql(database, dbtype string) (defaultSql string, err error) {
17851781

17861782
// 根据queryType获取符合要求的数据和需要执行的原始sql语句mr.rawsqls
17871783
// queryType 可以为TABLE DATABASE COUNT
1788-
func (r *Reader) getDatas(db *sql.DB, curDB, rawData string, now time.Time, queryType int) (datas []string, rawsqls string, err error) {
1784+
func (r *Reader) getDatas(connectStr, curDB, rawData string, now time.Time, queryType int) (datas []string, rawsqls string, err error) {
17891785
var startIndex, endIndex, timeIndex []int
17901786
var matchData string
17911787

@@ -1796,7 +1792,7 @@ func (r *Reader) getDatas(db *sql.DB, curDB, rawData string, now time.Time, quer
17961792
}
17971793
if checkAll {
17981794
// 导入所有数据
1799-
datas, rawsqls, err = r.getAllDatas(db, curDB, queryType)
1795+
datas, rawsqls, err = r.getAllDatas(connectStr, curDB, queryType)
18001796
if err != nil {
18011797
return datas, rawsqls, err
18021798
}
@@ -1819,7 +1815,7 @@ func (r *Reader) getDatas(db *sql.DB, curDB, rawData string, now time.Time, quer
18191815
}
18201816

18211817
matchStr := getRemainStr(matchData, timeIndex)
1822-
datas, rawsqls, err = r.getValidData(db, curDB, matchData, matchStr, startIndex, endIndex, timeIndex, queryType)
1818+
datas, rawsqls, err = r.getValidData(connectStr, curDB, matchData, matchStr, startIndex, endIndex, timeIndex, queryType)
18231819
if err != nil {
18241820
return datas, rawsqls, err
18251821
}
@@ -1875,13 +1871,25 @@ func (r *Reader) getQuery(queryType int, curDB string) (query string, err error)
18751871
}
18761872

18771873
// 计算每个table的记录条数
1878-
func (r *Reader) execTableCount(db *sql.DB, idx int, curDB, rawSql string) (tableSize int64, err error) {
1874+
func (r *Reader) execTableCount(connectStr string, idx int, curDB, rawSql string) (tableSize int64, err error) {
18791875
execSQL, err := r.getSQL(idx, rawSql)
18801876
if err != nil {
18811877
log.Errorf("Runner[%v] get SQL error %v, use raw SQL", r.meta.RunnerName, err)
18821878
execSQL = rawSql
18831879
}
18841880
log.Infof("Runner[%v] reader <%v> exec sql <%v>", r.meta.RunnerName, curDB, execSQL)
1881+
1882+
db, err := openSql(r.dbtype, connectStr, curDB)
1883+
if err != nil {
1884+
return 0, err
1885+
}
1886+
defer func() {
1887+
db.Close()
1888+
}()
1889+
if err = db.Ping(); err != nil {
1890+
return 0, err
1891+
}
1892+
18851893
rows, err := db.Query(execSQL)
18861894
if err != nil {
18871895
log.Errorf("Runner[%v] %v prepare %v <%v> query error %v", r.meta.RunnerName, curDB, r.dbtype, execSQL, err)
@@ -1909,7 +1917,7 @@ func (r *Reader) execTableCount(db *sql.DB, idx int, curDB, rawSql string) (tabl
19091917
}
19101918

19111919
// 执行每条 sql 语句
1912-
func (r *Reader) execReadSql(db *sql.DB, idx int, rawSql string, tables []string) (exit bool, isRawSql bool, readSize int64) {
1920+
func (r *Reader) execReadSql(connectStr, curDB string, idx int, rawSql string, tables []string) (exit bool, isRawSql bool, readSize int64, err error) {
19131921
exit = true
19141922

19151923
execSQL, err := r.getSQL(idx, r.syncSQLs[idx])
@@ -1922,13 +1930,24 @@ func (r *Reader) execReadSql(db *sql.DB, idx int, rawSql string, tables []string
19221930
isRawSql = true
19231931
}
19241932

1933+
db, err := openSql(r.dbtype, connectStr, curDB)
1934+
if err != nil {
1935+
return exit, isRawSql, 0, err
1936+
}
1937+
defer func() {
1938+
db.Close()
1939+
}()
1940+
if err = db.Ping(); err != nil {
1941+
return exit, isRawSql, 0, err
1942+
}
1943+
19251944
log.Infof("Runner[%v] reader <%v> exec sql <%v>", r.meta.RunnerName, r.Name(), execSQL)
19261945
rows, err := db.Query(execSQL)
19271946
if err != nil {
19281947
err = fmt.Errorf("runner[%v] %v prepare %v <%v> query error %v", r.meta.RunnerName, r.Name(), r.dbtype, execSQL, err)
19291948
log.Error(err)
19301949
r.sendError(err)
1931-
return exit, isRawSql, readSize
1950+
return exit, isRawSql, readSize, err
19321951
}
19331952
defer rows.Close()
19341953
// Get column names
@@ -1937,7 +1956,7 @@ func (r *Reader) execReadSql(db *sql.DB, idx int, rawSql string, tables []string
19371956
err = fmt.Errorf("runner[%v] %v prepare %v <%v> columns error %v", r.meta.RunnerName, r.Name(), r.dbtype, execSQL, err)
19381957
log.Error(err)
19391958
r.sendError(err)
1940-
return exit, isRawSql, readSize
1959+
return exit, isRawSql, readSize, err
19411960
}
19421961
log.Infof("Runner[%v] SQL :<%v>, schemas: <%v>", r.meta.RunnerName, execSQL, strings.Join(columns, ", "))
19431962
scanArgs, nochiced := r.getInitScans(len(columns), rows, r.dbtype)
@@ -2053,7 +2072,7 @@ func (r *Reader) execReadSql(db *sql.DB, idx int, rawSql string, tables []string
20532072
}
20542073
if atomic.LoadInt32(&r.status) == reader.StatusStopping || atomic.LoadInt32(&r.status) == reader.StatusStopped {
20552074
log.Warnf("Runner[%v] %v stopped from running", r.meta.RunnerName, r.Name())
2056-
return exit, isRawSql, readSize
2075+
return exit, isRawSql, readSize, nil
20572076
}
20582077
r.readChan <- readInfo{data, totalBytes}
20592078
r.CurrentCount++
@@ -2082,12 +2101,12 @@ func (r *Reader) execReadSql(db *sql.DB, idx int, rawSql string, tables []string
20822101
}
20832102
}
20842103

2085-
return exit, isRawSql, readSize
2104+
return exit, isRawSql, readSize, rows.Err()
20862105
}
20872106

2088-
func (r *Reader) getAllDatas(db *sql.DB, curDB string, queryType int) (datas []string, sqls string, err error) {
2107+
func (r *Reader) getAllDatas(connectStr, curDB string, queryType int) (datas []string, sqls string, err error) {
20892108
// 拿到数据库中所有表及对应的sql语句
2090-
datas, sqls, err = r.getValidData(db, curDB, "", "", []int{}, []int{}, []int{}, queryType)
2109+
datas, sqls, err = r.getValidData(connectStr, curDB, "", "", []int{}, []int{}, []int{}, queryType)
20912110
if err != nil {
20922111
return datas, sqls, err
20932112
}
@@ -2185,7 +2204,7 @@ func (r *Reader) getDBs(connectStr string, now time.Time) ([]string, error) {
21852204
if err = db.Ping(); err != nil {
21862205
return nil, err
21872206
}
2188-
dbsAll, _, err := r.getDatas(db, "", r.rawDatabase, now, DATABASE)
2207+
dbsAll, _, err := r.getDatas(connectStr, "", r.rawDatabase, now, DATABASE)
21892208
if err != nil {
21902209
return dbsAll, err
21912210
}

0 commit comments

Comments
 (0)