@@ -3,16 +3,20 @@ package sync
33
44import (
55"context"
6+ "encoding/json"
67"fmt"
78"time"
89
10+ "github.com/jackc/pgx/v5/pgconn"
911"github.com/sirupsen/logrus"
1012clientv3 "go.etcd.io/etcd/client/v3"
1113
1214"github.com/cybertec-postgresql/etcd_fdw/internal/db"
1315"github.com/cybertec-postgresql/etcd_fdw/internal/etcd"
1416)
1517
18+ const InvalidRevision = - 1
19+
1620// Service orchestrates bidirectional synchronization between etcd and PostgreSQL
1721type Service struct {
1822pgPool db.PgxPoolIface
@@ -224,25 +228,81 @@ func (s *Service) syncPostgreSQLToEtcd(ctx context.Context) error {
224228}
225229
226230if err := s .processPostgreSQLNotification (ctx , notification ); err != nil {
227- logrus .WithError (err ).WithField ("payload" , "unknown" ).Error ("Failed to process PostgreSQL notification" )
231+ logrus .WithError (err ).WithField ("payload" , notification . Payload ).Error ("Failed to process PostgreSQL notification" )
228232// Continue processing other notifications rather than failing entirely
229233}
230234}
231235}
232236}
233237
234238// processPostgreSQLNotification processes a PostgreSQL NOTIFY and syncs to etcd
235- func (s * Service ) processPostgreSQLNotification (ctx context.Context , notification interface {}) error {
236- // In a real implementation, we would parse the JSON payload to get WAL entry details
237- // For now, we'll log that we received the notification
238- logrus .WithField ("notification" , notification ).Info ("Received PostgreSQL notification" )
239-
240- // TODO: Parse the notification and sync the change to etcd
241- // This would involve:
242- // 1. Parse notification JSON to get key, value, revision
243- // 2. Apply conflict resolution logic
244- // 3. Put/Delete to etcd
245- // 4. Mark WAL entry as processed
239+ func (s * Service ) processPostgreSQLNotification (ctx context.Context , notification * pgconn.Notification ) error {
240+ // Parse the JSON payload from the notification
241+ var walNotification struct {
242+ Key string `json:"key"`
243+ Ts string `json:"ts"`
244+ Value * string `json:"value"`
245+ Revision * int64 `json:"revision"`
246+ Operation string `json:"operation"`
247+ }
246248
247- return nil
249+ if err := json .Unmarshal ([]byte (notification .Payload ), & walNotification ); err != nil {
250+ return fmt .Errorf ("failed to parse notification payload: %w" , err )
251+ }
252+
253+ logrus .WithFields (logrus.Fields {
254+ "key" : walNotification .Key ,
255+ "ts" : walNotification .Ts ,
256+ "operation" : walNotification .Operation ,
257+ }).Info ("Processing PostgreSQL notification" )
258+
259+ // Apply conflict resolution: check if etcd has a newer version
260+ etcdKV , err := s .etcdClient .Get (ctx , walNotification .Key )
261+ if err != nil {
262+ return fmt .Errorf ("failed to get key from etcd for conflict resolution: %w" , err )
263+ }
264+
265+ // Conflict resolution: etcd wins (if etcd has newer revision, skip this change)
266+ if etcdKV != nil && walNotification .Revision != nil && etcdKV .Revision > * walNotification .Revision {
267+ logrus .WithFields (logrus.Fields {
268+ "key" : walNotification .Key ,
269+ "etcd_revision" : etcdKV .Revision ,
270+ "local_revision" : * walNotification .Revision ,
271+ }).Warn ("Conflict detected: etcd has newer revision, skipping local change" )
272+
273+ // Mark WAL entry as failed (conflict resolved - etcd wins)
274+ return db .UpdateWALEntry (ctx , s .pgPool , walNotification .Key , walNotification .Ts , InvalidRevision )
275+ }
276+
277+ // Apply the change to etcd
278+ var newRevision int64
279+ switch walNotification .Operation {
280+ case "CREATE" , "UPDATE" :
281+ if walNotification .Value != nil {
282+ resp , err := s .etcdClient .Put (ctx , walNotification .Key , * walNotification .Value )
283+ if err != nil {
284+ // Mark WAL entry as failed
285+ db .UpdateWALEntry (ctx , s .pgPool , walNotification .Key , walNotification .Ts , InvalidRevision )
286+ return fmt .Errorf ("failed to put key to etcd: %w" , err )
287+ }
288+ newRevision = resp .Header .Revision
289+ logrus .WithField ("key" , walNotification .Key ).Info ("Synced PostgreSQL change to etcd (PUT)" )
290+ }
291+ case "DELETE" :
292+ resp , err := s .etcdClient .Delete (ctx , walNotification .Key )
293+ if err != nil {
294+ // Mark WAL entry as failed
295+ db .UpdateWALEntry (ctx , s .pgPool , walNotification .Key , walNotification .Ts , InvalidRevision )
296+ return fmt .Errorf ("failed to delete key from etcd: %w" , err )
297+ }
298+ newRevision = resp .Header .Revision
299+ logrus .WithField ("key" , walNotification .Key ).Info ("Synced PostgreSQL change to etcd (DELETE)" )
300+ default :
301+ // Mark WAL entry as failed due to unknown operation
302+ db .UpdateWALEntry (ctx , s .pgPool , walNotification .Key , walNotification .Ts , InvalidRevision )
303+ return fmt .Errorf ("unknown operation type: %s" , walNotification .Operation )
304+ }
305+
306+ // Mark WAL entry as successfully processed with the new etcd revision
307+ return db .UpdateWALEntry (ctx , s .pgPool , walNotification .Key , walNotification .Ts , newRevision )
248308}
0 commit comments