@@ -16,14 +16,37 @@ package util
1616*/
1717
1818import (
19+ "database/sql"
1920"errors"
21+ "fmt"
2022log "github.com/Sirupsen/logrus"
23+ crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
24+ msgs "github.com/crunchydata/postgres-operator/apiservermsgs"
2125"github.com/crunchydata/postgres-operator/kubeapi"
26+ _ "github.com/lib/pq"
2227"k8s.io/api/core/v1"
2328"k8s.io/api/extensions/v1beta1"
2429"k8s.io/client-go/kubernetes"
30+ "k8s.io/client-go/rest"
2531)
2632
33+ const (
34+ replInfoQueryFormat = "SELECT %s(%s(), '0/0')::bigint, %s(%s(), '0/0')::bigint"
35+
36+ recvV9 = "pg_last_xlog_receive_location"
37+ replayV9 = "pg_last_xlog_replay_location"
38+ locationDiffV9 = "pg_xlog_location_diff"
39+
40+ recvV10 = "pg_last_wal_receive_lsn"
41+ replayV10 = "pg_last_wal_replay_lsn"
42+ locationDiffV10 = "pg_wal_lsn_diff"
43+ )
44+
45+ type ReplicationInfo struct {
46+ ReceiveLocation uint64
47+ ReplayLocation uint64
48+ }
49+
2750// GetBestTarget
2851func GetBestTarget (clientset * kubernetes.Clientset , clusterName , namespace string ) (* v1.Pod , * v1beta1.Deployment , error ) {
2952
@@ -101,3 +124,139 @@ func GetPod(clientset *kubernetes.Clientset, deploymentName, namespace string) (
101124
102125return pod , err
103126}
127+
128+ func GetRepStatus (restclient * rest.RESTClient , clientset * kubernetes.Clientset , dep * v1beta1.Deployment , namespace string ) (uint64 , uint64 ) {
129+ var receiveLocation , replayLocation uint64
130+
131+ //get the pods for this deployment
132+ selector := "primary=false,replica-name=" + dep .Name
133+ podList , err := kubeapi .GetPods (clientset , selector , namespace )
134+ if err != nil {
135+ log .Error (err .Error ())
136+ return receiveLocation , replayLocation
137+ }
138+
139+ if len (podList .Items ) != 1 {
140+ log .Debug ("no replicas found for dep " + dep .Name )
141+ return receiveLocation , replayLocation
142+ }
143+
144+ pod := podList .Items [0 ]
145+
146+ //get the crd for this dep
147+ cluster := crv1.Pgcluster {}
148+ var clusterfound bool
149+ clusterfound , err = kubeapi .Getpgcluster (restclient , & cluster , dep .ObjectMeta .Labels [LABEL_PG_CLUSTER ], namespace )
150+ if err != nil || ! clusterfound {
151+ log .Error ("Getpgcluster error: " + err .Error ())
152+ return receiveLocation , replayLocation
153+ }
154+
155+ //get the postgres secret for this dep
156+ var secretInfo []msgs.ShowUserSecret
157+ secretInfo , err = GetSecrets (clientset , & cluster , namespace )
158+ var pgSecret msgs.ShowUserSecret
159+ var found bool
160+ for _ , si := range secretInfo {
161+ if si .Username == "postgres" {
162+ pgSecret = si
163+ found = true
164+ log .Debug ("postgres secret found" )
165+ }
166+ }
167+
168+ if ! found {
169+ log .Error ("postgres secret not found for " + dep .Name )
170+ return receiveLocation , replayLocation
171+ }
172+
173+ port := "5432"
174+ databaseName := "postgres"
175+ target := getSQLTarget (& pod , pgSecret .Username , pgSecret .Password , port , databaseName )
176+ var repInfo * ReplicationInfo
177+ repInfo , err = GetReplicationInfo (target )
178+ if err != nil {
179+ log .Error (err )
180+ return receiveLocation , replayLocation
181+ }
182+
183+ receiveLocation = repInfo .ReceiveLocation
184+ replayLocation = repInfo .ReplayLocation
185+ return receiveLocation , replayLocation
186+ }
187+
188+ func getSQLTarget (pod * v1.Pod , username , password , port , db string ) string {
189+ target := fmt .Sprintf (
190+ "postgresql://%s:%s@%s:%s/%s?sslmode=disable" ,
191+ username ,
192+ password ,
193+ pod .Status .PodIP ,
194+ port ,
195+ db ,
196+ )
197+ return target
198+
199+ }
200+ func GetReplicationInfo (target string ) (* ReplicationInfo , error ) {
201+ conn , err := sql .Open ("postgres" , target )
202+
203+ if err != nil {
204+ log .Errorf ("Could not connect to: %s" , target )
205+ return nil , err
206+ }
207+
208+ defer conn .Close ()
209+
210+ // Get PG version
211+ var version int
212+
213+ rows , err := conn .Query ("SELECT current_setting('server_version_num')" )
214+
215+ if err != nil {
216+ log .Errorf ("Could not perform query for version: %s" , target )
217+ return nil , err
218+ }
219+
220+ defer rows .Close ()
221+
222+ for rows .Next () {
223+ if err := rows .Scan (& version ); err != nil {
224+ return nil , err
225+ }
226+ }
227+ // Get replication info
228+ var replicationInfoQuery string
229+ var recvLocation uint64
230+ var replayLocation uint64
231+
232+ if version < 100000 {
233+ replicationInfoQuery = fmt .Sprintf (
234+ replInfoQueryFormat ,
235+ locationDiffV9 , recvV9 ,
236+ locationDiffV9 , replayV9 ,
237+ )
238+ } else {
239+ replicationInfoQuery = fmt .Sprintf (
240+ replInfoQueryFormat ,
241+ locationDiffV10 , recvV10 ,
242+ locationDiffV10 , replayV10 ,
243+ )
244+ }
245+
246+ rows , err = conn .Query (replicationInfoQuery )
247+
248+ if err != nil {
249+ log .Errorf ("Could not perform replication info query: %s" , target )
250+ return nil , err
251+ }
252+
253+ defer rows .Close ()
254+
255+ for rows .Next () {
256+ if err := rows .Scan (& recvLocation , & replayLocation ); err != nil {
257+ return nil , err
258+ }
259+ }
260+
261+ return & ReplicationInfo {recvLocation , replayLocation }, nil
262+ }
0 commit comments