Skip to content

Commit 214837e

Browse files
Jobs script check and upgrade (#1248)
* feat: Add dbjobs_launcher script for cleanup and job execution - Modify `dbjobs_launcher.sh` to clean up old `.run` directories before continuously execute `dbjobs_new.sh`. - Enhanced `dbjobs_new.sh` with improved encryption functions, allowing for custom passwords and better API request handling. - Added generic functions for sending encrypted data to APIs with retry logic. - Implemented job check and upgrade functions to manage job execution and script updates. - Updated logging mechanisms for better traceability and error handling. fix: Trim whitespace in checksum generation - Modified `GenerateChecksum` function to ignore comments more robustly by trimming whitespace before checking for comment lines. * feat: Add API swagger for jobs upgrade and job checks on servers * refactor: Remove verbose logging for server needs check * Refactor jobsCheck and jobsUpgrade functions for improved process handling - Ensured environment variables LOG_DIR, CHECKPOINT_DIR, and LOCK_DIR are exported for subshells. - Reintroduced jobsCheck and jobsUpgrade calls in the main execution flow for better clarity and structure. * Enhance database table validation and version comparison logic - Updated the `table_exists` function to perform a more comprehensive check on the 'jobs' table structure in the 'replication_manager_schema' database, ensuring all required columns are present with correct types and constraints. - Introduced a new `create_jobs_table` function that interacts with an API to create the jobs table if it does not exist, improving the robustness of the table creation process. - Added a `check_jobs_table` function to streamline the process of checking for the jobs table and creating it if necessary. - Modified the version parsing logic in `version.go` to handle case-insensitive flavor detection for MariaDB, PostgreSQL, and Percona. - Implemented a new `Compare` method for version comparison, allowing for more precise version checks between different database flavors. - Added a `HasVersionChanged` function to determine if the database version has changed, considering flavor differences and providing error handling for nil versions. * Update API endpoints for refreshing app templates and checking cluster/server error states * Implement rolling jobs upgrade functionality and related API endpoints * Add support for rolling jobs upgrade in API and Redux slice * Implement rolling jobs upgrade functionality and update related state management * Refactor RollingJobsUpgrade to return error on timeout and improve logging * Add timeout error handling for jobs upgrade in RollingJobsUpgrade
1 parent 23b3fe4 commit 214837e

32 files changed

+2877
-383
lines changed

cluster/cluster.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ func (cluster *Cluster) InitFromConf() {
583583
//cluster.Conf.PrintConf()
584584
cluster.initScheduler()
585585
cluster.CheckDefaultUser(true)
586-
cluster.SetToolVersions()
586+
cluster.RefreshToolVersions()
587587
cluster.StartResticRepo()
588588

589589
cluster.Conf.TopologyTarget = cluster.GetTopologyFromConf()
@@ -642,6 +642,22 @@ func (cluster *Cluster) initScheduler() {
642642

643643
}
644644

645+
var pstates30 = []string{
646+
"WARN0084", // Variables diff
647+
"ERR00090", "WARN0102", // Config related
648+
"WARN0093", "WARN0095", "WARN0134", "WARN0145", // Restic related
649+
"WARN0101", "WARN0111", "WARN0112", // Backup related
650+
"WARN0139", "WARN0140", "WARN0141", "WARN0142", "WARN0143", "WARN0150", "WARN0151", // Tresholds
651+
"WARN0147", "WARN0148", "WARN0153", "WARN0154", // Jobs related
652+
"CREDIT01", // Credit related
653+
}
654+
655+
var pstates3600 = []string{
656+
"WARN0094", // Restic
657+
"WARN0132", "WARN0137", // App templates
658+
"WARN0117", "WARN0118", "WARN0119", "WARN0120", "WARN0121", // Tools versions
659+
}
660+
645661
func (cluster *Cluster) Run() {
646662
defer cluster.LogPanicToFile("cluster")
647663
interval := time.Second
@@ -739,23 +755,25 @@ func (cluster *Cluster) Run() {
739755
cluster.CheckAllBackupFreeSpace()
740756
cluster.CheckAvailableCredit()
741757
cluster.CheckOpenSVCTresholds()
758+
cluster.CheckJobsVersion()
759+
cluster.JobsCheckSchedulerTable()
742760
} else {
743-
cluster.StateMachine.PreserveState("WARN0093", "WARN0084", "WARN0095", "WARN0101", "WARN0111", "WARN0112", "ERR00090", "WARN0102", "WARN0134", "WARN0139", "WARN0140", "WARN0141", "WARN0142", "WARN0143", "WARN0145", "WARN0150", "WARN0151", "CREDIT01")
761+
cluster.StateMachine.PreserveState(pstates30...)
744762
}
745763
if !cluster.CanInitNodes {
746764
cluster.SetState("ERR00082", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00082"], cluster.errorInitNodes), ErrFrom: "OPENSVC"})
747765
}
748766
if !cluster.CanConnectVault {
749767
cluster.SetState("ERR00089", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00089"], cluster.errorConnectVault), ErrFrom: "OPENSVC"})
750768
}
751-
if cluster.StateMachine.GetHeartbeats()%36000 == 0 {
769+
if cluster.StateMachine.GetHeartbeats()%3600 == 0 {
752770
// Set in parallel since it will wait for fetch to finish
753771
go cluster.ResticPurgeRepo()
754772
go cluster.RefreshAllAppTemplateMD5()
773+
cluster.RefreshToolVersions()
755774
} else {
756775
// Preserve tools if not installed or has problem
757-
cluster.StateMachine.PreserveState("WARN0094", "WARN0117", "WARN0118", "WARN0119", "WARN0120", "WARN0121")
758-
cluster.StateMachine.PreserveState("WARN0132", "WARN0137")
776+
cluster.StateMachine.PreserveState(pstates3600...)
759777
}
760778
if cluster.SlavesOldestMasterFile.Suffix == 0 {
761779
go cluster.CheckSlavesReplicationsPurge()
@@ -910,6 +928,13 @@ func (cluster *Cluster) StateProcessing() {
910928
}
911929
}
912930
}
931+
if s.ErrKey == "WARN0148" && servertoreseed != nil {
932+
go servertoreseed.UpgradeJobsScript()
933+
}
934+
935+
if s.ErrKey == "WARN0155" {
936+
go cluster.RollingJobsUpgrade()
937+
}
913938

914939
// cluster.statecloseChan <- s
915940
cluster.CheckAlert(s, true)

cluster/cluster_acl.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ func (cluster *Cluster) IsURLPassDatabasesACL(strUser string, URL string) bool {
413413
return true
414414
}
415415
}
416-
if cluster.APIUsers[strUser].Grants[config.GrantProxyConfigFlag] {
416+
if cluster.APIUsers[strUser].Grants[config.GrantDBConfigFlag] {
417417
if strings.Contains(URL, "/config") {
418418
return true
419419
}
@@ -507,6 +507,9 @@ func (cluster *Cluster) IsURLPassDatabasesACL(strUser string, URL string) bool {
507507
if strings.Contains(URL, "/actions/wait-innodb-purge") {
508508
return true
509509
}
510+
if strings.Contains(URL, "/actions/jobs-upgrade") {
511+
return true
512+
}
510513
}
511514
/* if cluster.APIUsers[strUser].Grants[config.GrantDBConfigCreate] {
512515
if strings.Contains(URL, "/kill") {
@@ -796,6 +799,9 @@ func (cluster *Cluster) IsURLPassACL(strUser string, URL string, errorPrint bool
796799
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/cancel-rolling-reprov") {
797800
return true
798801
}
802+
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/jobs-upgrade") {
803+
return true
804+
}
799805
}
800806
if cluster.APIUsers[strUser].Grants[config.GrantClusterRotatePasswords] {
801807
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/rotate-passwords") {

cluster/cluster_bck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (cluster *Cluster) ResticGetEnv() []string {
4444

4545
func (cluster *Cluster) CheckResticInstallation() {
4646
if cluster.Conf.BackupRestic && cluster.VersionsMap.Get("restic") == nil {
47-
if err := cluster.SetResticVersion(); err != nil {
47+
if err := cluster.RefreshResticVersion(); err != nil {
4848
cluster.SetState("WARN0121", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0121"], err), ErrFrom: "CLUSTER"})
4949
} else {
5050
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Restic version: %s", cluster.VersionsMap.Get("restic").ToString())

cluster/cluster_chk.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,3 +1044,20 @@ func (cluster *Cluster) CheckDBCredentials() {
10441044
cluster.SetState("ERR00101", state.State{ErrType: "ERROR", ErrDesc: config.ClusterError["ERR00101"], ErrFrom: "CLUSTER"})
10451045
}
10461046
}
1047+
1048+
func (cluster *Cluster) CheckJobsVersion() {
1049+
for _, server := range cluster.Servers {
1050+
server.CheckJobsVersion()
1051+
}
1052+
}
1053+
1054+
func (cluster *Cluster) JobsCheckSchedulerTable() {
1055+
for _, server := range cluster.Servers {
1056+
ok, err := server.JobsCheckSchedulerTable()
1057+
if err != nil {
1058+
cluster.SetState("WARN0153", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0153"], server.URL), ErrFrom: "CLUSTER"})
1059+
} else if !ok {
1060+
cluster.SetState("WARN0154", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0154"], server.URL, ""), ErrFrom: "CLUSTER"})
1061+
}
1062+
}
1063+
}

cluster/cluster_get.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/signal18/replication-manager/utils/misc"
3333
"github.com/signal18/replication-manager/utils/state"
3434
"github.com/signal18/replication-manager/utils/tty"
35+
"github.com/signal18/replication-manager/utils/version"
3536
)
3637

3738
func (cluster *Cluster) GetCrcTable() *crc64.Table {
@@ -1677,3 +1678,8 @@ func (cluster *Cluster) GetStagingServerFromConfig() *ServerMonitor {
16771678

16781679
return nil
16791680
}
1681+
1682+
func (cluster *Cluster) GetToolsVersion(name string) (*version.Version, bool) {
1683+
toolVer, ok := cluster.VersionsMap.CheckAndGet(name)
1684+
return toolVer, ok
1685+
}

cluster/cluster_has.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,3 +606,13 @@ func (cluster *Cluster) HasDiscoverTopologyReachTarget() bool {
606606
func (cluster *Cluster) IsTopologyTargetEqual(target string) bool {
607607
return cluster.Conf.TopologyTarget == target
608608
}
609+
610+
func (cluster *Cluster) IsInErrorState(key, serverURL string) bool {
611+
if key == "" {
612+
return false
613+
} else if serverURL == "" {
614+
return cluster.StateMachine.IsInState(key)
615+
} else {
616+
return cluster.StateMachine.IsInState(fmt.Sprintf("%s@%s", key, serverURL))
617+
}
618+
}

cluster/cluster_job.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/signal18/replication-manager/config"
2222
"github.com/signal18/replication-manager/utils/dbhelper"
23+
"github.com/signal18/replication-manager/utils/state"
2324
)
2425

2526
func (cluster *Cluster) JobAnalyzeSQL(persistent bool) error {
@@ -271,3 +272,7 @@ func (cluster *Cluster) JobParseMyDumperMetaOld(dir string) (config.MyDumperMeta
271272

272273
return m, nil
273274
}
275+
276+
func (cluster *Cluster) SetJobsUpgradeSender(server *ServerMonitor) {
277+
cluster.SetState("WARN0148", state.State{ErrDesc: fmt.Sprintf(config.ClusterError["WARN0148"], server.URL), ErrType: config.LvlWarn, ErrFrom: "JOBS", ServerUrl: server.URL})
278+
}

cluster/cluster_roll.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,46 @@ func (cluster *Cluster) RollingOptimize() {
185185
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Optimize job id %d on %s ", jobid, s.URL)
186186
}
187187
}
188+
189+
func (cluster *Cluster) RollingJobsUpgrade() error {
190+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Rolling jobs upgrade")
191+
var ts time.Time
192+
193+
for _, s := range cluster.slaves {
194+
ts = time.Now()
195+
s.SetWaitJobsUpgradeCookie()
196+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Set jobs upgrade cookie on %s ", s.URL)
197+
198+
// Wait for the server to clear the cookie
199+
for s.HasRollingJobsUpgradeCookie() {
200+
if time.Since(ts) > 5*time.Minute {
201+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Timeout waiting for jobs upgrade on %s ", s.URL)
202+
return errors.New("Timeout waiting for jobs upgrade")
203+
}
204+
205+
time.Sleep(2 * time.Second)
206+
}
207+
208+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Jobs upgrade completed on %s ", s.URL)
209+
}
210+
211+
ts = time.Now()
212+
cluster.master.SetWaitJobsUpgradeCookie()
213+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Set jobs upgrade cookie on master %s ", cluster.master.URL)
214+
215+
// Wait for the server to clear the cookie
216+
for cluster.master.HasRollingJobsUpgradeCookie() {
217+
if time.Since(ts) > 5*time.Minute {
218+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Timeout waiting for jobs upgrade on master %s ", cluster.master.URL)
219+
return errors.New("Timeout waiting for jobs upgrade on master")
220+
}
221+
222+
time.Sleep(2 * time.Second)
223+
}
224+
225+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Jobs upgrade completed on master %s ", cluster.master.URL)
226+
227+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Rolling jobs upgrade completed")
228+
229+
return nil
230+
}

cluster/cluster_sec.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@ package cluster
99
import (
1010
"context"
1111
"crypto/tls"
12+
"encoding/json"
1213
"errors"
1314
"fmt"
15+
"io"
1416
"net"
1517
"net/smtp"
1618
"strings"
1719

1820
vault "github.com/hashicorp/vault/api"
1921
"github.com/jordan-wright/email"
2022
"github.com/signal18/replication-manager/config"
23+
"github.com/signal18/replication-manager/utils/crypto"
2124
"github.com/signal18/replication-manager/utils/dbhelper"
2225
"github.com/signal18/replication-manager/utils/misc"
2326
"github.com/sirupsen/logrus"
@@ -583,3 +586,46 @@ func (cluster *Cluster) RevokeDBUserGrants(user string) error {
583586
}
584587
return nil
585588
}
589+
590+
type DecodedData struct {
591+
Data string `json:"data"`
592+
}
593+
594+
func (cluster *Cluster) SecretLoginCheck(vars map[string]string, rbody io.ReadCloser) (*ServerMonitor, error, int) {
595+
var decodedData DecodedData
596+
body, err := io.ReadAll(rbody)
597+
if err != nil {
598+
return nil, fmt.Errorf("Decode reading body :%s", err.Error()), 500
599+
}
600+
601+
err = json.Unmarshal(body, &decodedData)
602+
if err != nil {
603+
return nil, fmt.Errorf("Decode body :%s. Err: %s", string(body), err.Error()), 400
604+
}
605+
606+
var node *ServerMonitor
607+
if vars["serverPort"] == "" {
608+
node = cluster.GetServerFromName(vars["serverName"])
609+
} else {
610+
node = cluster.GetServerFromURL(vars["serverName"] + ":" + vars["serverPort"])
611+
}
612+
if node == nil {
613+
return nil, fmt.Errorf("No server"), 500
614+
}
615+
// Decrypt the encrypted data
616+
key := crypto.GetSHA256Hash(node.Pass)
617+
iv := crypto.GetMD5Hash(node.Pass)
618+
619+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlDbg, "Received login with encrypted secret %s", decodedData.Data)
620+
621+
decrypted, err := node.DecodeSecret(decodedData.Data, key, iv)
622+
if err != nil {
623+
return nil, fmt.Errorf("Error decrypting data : %s", err.Error()), 500
624+
}
625+
626+
if decrypted != cluster.GetDbPass() {
627+
return nil, fmt.Errorf("Invalid secret"), 401
628+
}
629+
630+
return node, nil, 200
631+
}

0 commit comments

Comments
 (0)