Skip to content

Commit da911ef

Browse files
authored
[!] rewrite with consolidated sync package (#7)
* Complete etcd and postgresql consolidation into internal/sync package - Consolidated all PostgreSQL operations from internal/db into internal/sync/postgresql.go - Consolidated all etcd operations from internal/etcd into internal/sync/etcd.go - Replaced COPY with INSERT ON CONFLICT using pgx.Batch for better upsert handling - Added SendBatch method to PgxIface interface for batch operations - Moved all types, connection functions, and retry logic - All build tests passing - Remove old packages - Consolidate some configs
1 parent ed95657 commit da911ef

File tree

19 files changed

+707
-757
lines changed

19 files changed

+707
-757
lines changed

.github/copilot-instructions.md

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# GitHub Copilot Instructions: etcd_fdw Single Package Architecture
2+
3+
## Project Context
4+
5+
**Tool**: etcd_fdw - Bidirectional synchronization between etcd and PostgreSQL
6+
**Language**: Go 1.25
7+
**Architecture**: Single package consolidation (refactoring from separate internal/etcd and internal/db packages)
8+
**Database**: PostgreSQL with single `etcd` table using revision status encoding
9+
**Key Principle**: KISS (Keep It Simple, Stupid) - maximum simplicity, minimal complexity
10+
11+
## Current Refactor Context
12+
13+
**Objective**: Consolidate `internal/etcd` and `internal/db` packages into single `internal/sync` package
14+
**Dependencies**: Only use existing packages from go.mod (pgx/v5, etcd client/v3, logrus, go-retry, testcontainers)
15+
**Constraints**: No new dependencies, use INSERT ON CONFLICT with pgx.Batch instead of COPY, minimal test cases
16+
17+
## Architecture Overview
18+
19+
### Single Table Design
20+
```sql
21+
CREATE TABLE etcd (
22+
ts timestamp with time zone NOT NULL DEFAULT now(),
23+
key text NOT NULL,
24+
value text,
25+
revision bigint NOT NULL,
26+
tombstone boolean NOT NULL DEFAULT false,
27+
PRIMARY KEY(key, revision)
28+
);
29+
```
30+
31+
**Revision Encoding**:
32+
- `revision = -1`: Pending sync to etcd (PostgreSQL → etcd)
33+
- `revision > 0`: Real etcd revision (etcd → PostgreSQL)
34+
35+
### Package Structure (Target)
36+
```
37+
internal/sync/
38+
├── sync.go # Main service orchestration
39+
├── postgresql.go # PostgreSQL operations (consolidated from internal/db)
40+
├── etcd.go # etcd client operations (consolidated from internal/etcd)
41+
├── config.go # Connection management
42+
└── sync_test.go # Consolidated minimal tests
43+
```
44+
45+
## Key Functions to Consolidate
46+
47+
### From internal/db/postgres.go:
48+
- `BulkInsert()` - Replace COPY with INSERT ON CONFLICT using pgx.Batch
49+
- `GetPendingRecords()` - Get records with revision = -1
50+
- `UpdateRevision()` - Update revision after etcd sync
51+
- `GetLatestRevision()` - For watch resume points
52+
53+
### From internal/etcd/client.go:
54+
- `NewEtcdClient()` - etcd connection management
55+
- `GetAllKeys()` - Initial sync from etcd
56+
- `WatchPrefix()` - Continuous etcd monitoring
57+
- `Put()`, `Delete()` - etcd operations
58+
59+
### From internal/sync/sync.go (existing):
60+
- `Service` - Main orchestration service
61+
- `Start()` - Bidirectional sync coordination
62+
63+
## Code Style Preferences
64+
65+
**Simplicity First**:
66+
- No nested function calls unless necessary
67+
- Direct error handling (no complex wrapping)
68+
- Minimal logging (only essential events)
69+
- Concrete types over interfaces within package
70+
- pgx.Batch for bulk operations instead of COPY
71+
72+
**Testing**:
73+
- Minimal test cases covering maximum functionality
74+
- Use testcontainers for integration tests
75+
- No bloated test code
76+
- Focus on essential behavior validation
77+
78+
**Error Handling**:
79+
- Use existing retry logic from internal/retry package
80+
- Return errors directly with context
81+
- Log errors at appropriate levels only
82+
83+
## Recent Changes
84+
85+
1. **Specification Created**: Single package architecture refactor defined
86+
2. **Research Completed**: Package consolidation strategy determined
87+
3. **Data Model Designed**: Unified structures for sync package
88+
4. **Contracts Defined**: Public API for consolidated package
89+
5. **Quickstart Created**: Step-by-step refactor process
90+
91+
## Current Task Context
92+
93+
**Phase**: Implementation planning complete, ready for /tasks command
94+
**Next**: Generate specific implementation tasks for package consolidation
95+
**Focus**: Maintain identical behavior while simplifying architecture
96+
97+
## Key Principles for Implementation
98+
99+
- Preserve all existing functionality exactly
100+
- Maintain test coverage without bloating
101+
- Use only existing dependencies from go.mod
102+
- Follow KISS principle throughout
103+
- No performance regression
104+
- Simplify imports and reduce cognitive complexity

cmd/etcd_fdw/integration_test.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ import (
1414
"github.com/testcontainers/testcontainers-go/modules/postgres"
1515
"github.com/testcontainers/testcontainers-go/wait"
1616

17-
"github.com/cybertec-postgresql/etcd_fdw/internal/db"
18-
"github.com/cybertec-postgresql/etcd_fdw/internal/etcd"
17+
"github.com/cybertec-postgresql/etcd_fdw/internal/sync"
1918
)
2019

2120
func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool, testcontainers.Container) {
@@ -54,7 +53,7 @@ func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool,
5453
return pool, pgContainer
5554
}
5655

57-
func setupEtcdContainer(ctx context.Context, t *testing.T) (*etcd.EtcdClient, testcontainers.Container) {
56+
func setupEtcdContainer(ctx context.Context, t *testing.T) (*sync.EtcdClient, testcontainers.Container) {
5857
etcdContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
5958
ContainerRequest: testcontainers.ContainerRequest{
6059
Image: "quay.io/coreos/etcd:v3.5.9",
@@ -77,13 +76,13 @@ func setupEtcdContainer(ctx context.Context, t *testing.T) (*etcd.EtcdClient, te
7776
require.NoError(t, err)
7877

7978
dsn := "etcd://" + endpoint + "/test"
80-
etcdClient, err := etcd.NewEtcdClient(dsn)
79+
etcdClient, err := sync.NewEtcdClient(dsn)
8180
require.NoError(t, err)
8281

8382
return etcdClient, etcdContainer
8483
}
8584

86-
func setupTestContainers(t *testing.T) (*pgxpool.Pool, *etcd.EtcdClient, func()) {
85+
func setupTestContainers(t *testing.T) (*pgxpool.Pool, *sync.EtcdClient, func()) {
8786
ctx := context.Background()
8887

8988
pool, pgContainer := setupPostgreSQLContainer(ctx, t)
@@ -118,18 +117,18 @@ func TestPollingMechanism(t *testing.T) {
118117
require.NoError(t, err)
119118

120119
// Test GetPendingRecords function
121-
pendingRecords, err := db.GetPendingRecords(ctx, pool)
120+
pendingRecords, err := sync.GetPendingRecords(ctx, pool)
122121
require.NoError(t, err)
123122
assert.Len(t, pendingRecords, 1)
124123
assert.Equal(t, "test/polling/key1", pendingRecords[0].Key)
125124
assert.Equal(t, "value1", pendingRecords[0].Value)
126125

127126
// Test UpdateRevision function
128-
err = db.UpdateRevision(ctx, pool, "test/polling/key1", 123)
127+
err = sync.UpdateRevision(ctx, pool, "test/polling/key1", 123)
129128
require.NoError(t, err)
130129

131130
// Verify record was updated
132-
pendingAfterUpdate, err := db.GetPendingRecords(ctx, pool)
131+
pendingAfterUpdate, err := sync.GetPendingRecords(ctx, pool)
133132
require.NoError(t, err)
134133
assert.Len(t, pendingAfterUpdate, 0, "No pending records should remain after update")
135134

@@ -155,7 +154,7 @@ func TestBulkInsert(t *testing.T) {
155154
defer cancel()
156155

157156
// Prepare test records
158-
records := []db.KeyValueRecord{
157+
records := []sync.KeyValueRecord{
159158
{
160159
Key: "test/bulk/key1",
161160
Value: ("value1"),
@@ -180,7 +179,7 @@ func TestBulkInsert(t *testing.T) {
180179
}
181180

182181
// Test BulkInsert function
183-
err := db.BulkInsert(ctx, pool, records)
182+
err := sync.BulkInsert(ctx, pool, records)
184183
require.NoError(t, err)
185184

186185
// Verify records were inserted correctly
@@ -228,7 +227,7 @@ func TestInsertPendingRecord(t *testing.T) {
228227
defer cancel()
229228

230229
// Test inserting a new pending record
231-
err := db.InsertPendingRecord(ctx, pool, "test/pending/key1", ("value1"), false)
230+
err := sync.InsertPendingRecord(ctx, pool, "test/pending/key1", ("value1"), false)
232231
require.NoError(t, err)
233232

234233
// Verify record was inserted with revision = -1
@@ -244,7 +243,7 @@ func TestInsertPendingRecord(t *testing.T) {
244243
assert.Equal(t, "value1", value.String)
245244

246245
// Test inserting second record with same key (should create new record with different timestamp)
247-
err = db.InsertPendingRecord(ctx, pool, "test/pending/key1", ("updated_value"), false)
246+
err = sync.InsertPendingRecord(ctx, pool, "test/pending/key1", ("updated_value"), false)
248247
require.NoError(t, err)
249248

250249
// Verify both records exist (different timestamps, both with revision = -1)
@@ -257,7 +256,7 @@ func TestInsertPendingRecord(t *testing.T) {
257256
assert.Equal(t, 1, count, "Should have 1 pending records for the same key with latest value")
258257

259258
// Test inserting tombstone record
260-
err = db.InsertPendingRecord(ctx, pool, "test/pending/key2", "", true)
259+
err = sync.InsertPendingRecord(ctx, pool, "test/pending/key2", "", true)
261260
require.NoError(t, err)
262261

263262
// Verify tombstone record
@@ -284,7 +283,7 @@ func TestGetLatestRevision(t *testing.T) {
284283
defer cancel()
285284

286285
// Test with empty table
287-
latestRevision, err := db.GetLatestRevision(ctx, pool)
286+
latestRevision, err := sync.GetLatestRevision(ctx, pool)
288287
require.NoError(t, err)
289288
assert.Equal(t, int64(0), latestRevision)
290289

@@ -299,7 +298,7 @@ func TestGetLatestRevision(t *testing.T) {
299298
require.NoError(t, err)
300299

301300
// Test latest revision (should ignore -1 pending records)
302-
latestRevision, err = db.GetLatestRevision(ctx, pool)
301+
latestRevision, err = sync.GetLatestRevision(ctx, pool)
303302
require.NoError(t, err)
304303
assert.Equal(t, int64(150), latestRevision)
305304
}
@@ -327,7 +326,7 @@ func TestPendingRecordFiltering(t *testing.T) {
327326
require.NoError(t, err)
328327

329328
// Test GetPendingRecords only returns revision = -1
330-
pendingRecords, err := db.GetPendingRecords(ctx, pool)
329+
pendingRecords, err := sync.GetPendingRecords(ctx, pool)
331330
require.NoError(t, err)
332331
assert.Len(t, pendingRecords, 3)
333332

@@ -345,7 +344,7 @@ func TestPendingRecordFiltering(t *testing.T) {
345344
for _, record := range pendingRecords {
346345
if record.Key == "test/filter/pending2" {
347346
assert.True(t, record.Tombstone)
348-
assert.Nil(t, record.Value)
347+
assert.Equal(t, "", record.Value) // Empty string for tombstones
349348
}
350349
}
351350
}
@@ -362,22 +361,22 @@ func TestConflictResolution(t *testing.T) {
362361
defer cancel()
363362

364363
// Insert a pending record
365-
err := db.InsertPendingRecord(ctx, pool, "test/conflict/key1", "pending_value", false)
364+
err := sync.InsertPendingRecord(ctx, pool, "test/conflict/key1", "pending_value", false)
366365
require.NoError(t, err)
367366

368367
// Verify it's pending
369-
pendingRecords, err := db.GetPendingRecords(ctx, pool)
368+
pendingRecords, err := sync.GetPendingRecords(ctx, pool)
370369
require.NoError(t, err)
371370
assert.Len(t, pendingRecords, 1)
372371
assert.Equal(t, "test/conflict/key1", pendingRecords[0].Key)
373372
assert.Equal(t, int64(-1), pendingRecords[0].Revision)
374373

375374
// Simulate etcd sync by updating revision
376-
err = db.UpdateRevision(ctx, pool, "test/conflict/key1", 300)
375+
err = sync.UpdateRevision(ctx, pool, "test/conflict/key1", 300)
377376
require.NoError(t, err)
378377

379378
// Verify record is no longer pending
380-
pendingAfterUpdate, err := db.GetPendingRecords(ctx, pool)
379+
pendingAfterUpdate, err := sync.GetPendingRecords(ctx, pool)
381380
require.NoError(t, err)
382381
assert.Len(t, pendingAfterUpdate, 0)
383382

@@ -391,7 +390,7 @@ func TestConflictResolution(t *testing.T) {
391390
assert.Equal(t, int64(300), revision)
392391

393392
// Test updating non-existent pending record (should fail gracefully)
394-
err = db.UpdateRevision(ctx, pool, "test/conflict/nonexistent", 400)
393+
err = sync.UpdateRevision(ctx, pool, "test/conflict/nonexistent", 400)
395394
assert.Error(t, err)
396395
assert.Contains(t, err.Error(), "no pending record found")
397396
}
@@ -411,10 +410,10 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
411410
recordCount := 1000
412411
start := time.Now()
413412

414-
records := make([]db.KeyValueRecord, recordCount)
413+
records := make([]sync.KeyValueRecord, recordCount)
415414
for i := 0; i < recordCount; i++ {
416415
value := fmt.Sprintf("test_value_%d", i)
417-
records[i] = db.KeyValueRecord{
416+
records[i] = sync.KeyValueRecord{
418417
Key: fmt.Sprintf("test/perf/key%d", i),
419418
Value: value,
420419
Revision: int64(i + 1),
@@ -423,7 +422,7 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
423422
}
424423
}
425424

426-
err := db.BulkInsert(ctx, pool, records)
425+
err := sync.BulkInsert(ctx, pool, records)
427426
require.NoError(t, err)
428427

429428
elapsed := time.Since(start)
@@ -454,11 +453,11 @@ func TestPerformanceSyncLatency(t *testing.T) {
454453
// Insert pending record
455454
key := fmt.Sprintf("test/latency/key%d", i)
456455
value := fmt.Sprintf("test_value_%d", i)
457-
err := db.InsertPendingRecord(ctx, pool, key, value, false)
456+
err := sync.InsertPendingRecord(ctx, pool, key, value, false)
458457
require.NoError(t, err)
459458

460459
// Update revision (simulating sync completion)
461-
err = db.UpdateRevision(ctx, pool, key, int64(i+1))
460+
err = sync.UpdateRevision(ctx, pool, key, int64(i+1))
462461
require.NoError(t, err)
463462

464463
latency := time.Since(start)

cmd/etcd_fdw/main.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"github.com/jessevdk/go-flags"
1414
"github.com/sirupsen/logrus"
1515

16-
"github.com/cybertec-postgresql/etcd_fdw/internal/db"
17-
"github.com/cybertec-postgresql/etcd_fdw/internal/etcd"
1816
"github.com/cybertec-postgresql/etcd_fdw/internal/log"
1917
"github.com/cybertec-postgresql/etcd_fdw/internal/sync"
2018
)
@@ -131,25 +129,25 @@ func main() {
131129
}()
132130

133131
// Connect to PostgreSQL with retry logic
134-
var pgPool db.PgxPoolIface
135-
if pgPool, err = db.NewWithRetry(ctx, config.PostgresDSN); err != nil {
132+
pgPool, err := sync.NewWithRetry(ctx, config.PostgresDSN)
133+
if err != nil {
136134
logrus.WithError(err).Fatal("Failed to connect to PostgreSQL after retries")
137135
}
138136
defer pgPool.Close()
139137

140138
// Connect to etcd with retry logic
141-
var etcdClient *etcd.EtcdClient
139+
var etcdClient *sync.EtcdClient
142140
if config.EtcdDSN != "" {
143141
var err error
144-
etcdClient, err = etcd.NewEtcdClientWithRetry(ctx, config.EtcdDSN)
142+
etcdClient, err = sync.NewEtcdClientWithRetry(ctx, config.EtcdDSN)
145143
if err != nil {
146144
logrus.WithError(err).Fatal("Failed to connect to etcd after retries")
147145
}
148146
defer etcdClient.Close()
149147
}
150148

151149
// Get prefix from etcd DSN
152-
prefix := etcd.GetPrefix(config.EtcdDSN)
150+
prefix := sync.GetPrefix(config.EtcdDSN)
153151

154152
// Parse polling interval
155153
pollingInterval, err := time.ParseDuration(config.PollingInterval)

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ require (
77
github.com/jackc/pgx/v5 v5.7.6
88
github.com/jessevdk/go-flags v1.6.1
99
github.com/pashagolub/pgxmock/v4 v4.8.0
10-
github.com/sethvargo/go-retry v0.3.0
1110
github.com/sirupsen/logrus v1.9.3
1211
github.com/stretchr/testify v1.11.1
1312
github.com/testcontainers/testcontainers-go v0.38.0

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF
119119
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
120120
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
121121
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
122-
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
123-
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
124122
github.com/shirou/gopsutil/v4 v4.25.5 h1:rtd9piuSMGeU8g1RMXjZs9y9luK5BwtnG7dZaQUJAsc=
125123
github.com/shirou/gopsutil/v4 v4.25.5/go.mod h1:PfybzyydfZcN+JMMjkF6Zb8Mq1A/VcogFFg7hj50W9c=
126124
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=

0 commit comments

Comments
 (0)