1- package  main 
1+ package  sync 
22
33import  (
44"context" 
5- "database/sql" 
65"fmt" 
76"testing" 
87"time" 
@@ -13,8 +12,6 @@ import (
1312"github.com/testcontainers/testcontainers-go" 
1413"github.com/testcontainers/testcontainers-go/modules/postgres" 
1514"github.com/testcontainers/testcontainers-go/wait" 
16- 
17- "github.com/cybertec-postgresql/etcd_fdw/internal/sync" 
1815)
1916
2017func  setupPostgreSQLContainer (ctx  context.Context , t  * testing.T ) (* pgxpool.Pool , testcontainers.Container ) {
@@ -40,7 +37,7 @@ func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool,
4037CREATE TABLE etcd ( 
4138ts timestamp with time zone NOT NULL DEFAULT now(), 
4239key text NOT NULL, 
43- value text, 
40+ value text NOT NULL , 
4441revision bigint NOT NULL, 
4542tombstone boolean NOT NULL DEFAULT false, 
4643PRIMARY KEY(key, revision) 
@@ -53,7 +50,7 @@ func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool,
5350return  pool , pgContainer 
5451}
5552
56- func  setupEtcdContainer (ctx  context.Context , t  * testing.T ) (* sync. EtcdClient , testcontainers.Container ) {
53+ func  setupEtcdContainer (ctx  context.Context , t  * testing.T ) (* EtcdClient , testcontainers.Container ) {
5754etcdContainer , err  :=  testcontainers .GenericContainer (ctx , testcontainers.GenericContainerRequest {
5855ContainerRequest : testcontainers.ContainerRequest {
5956Image : "quay.io/coreos/etcd:v3.5.9" ,
@@ -76,13 +73,13 @@ func setupEtcdContainer(ctx context.Context, t *testing.T) (*sync.EtcdClient, te
7673require .NoError (t , err )
7774
7875dsn  :=  "etcd://"  +  endpoint  +  "/test" 
79- etcdClient , err  :=  sync . NewEtcdClient (dsn )
76+ etcdClient , err  :=  NewEtcdClient (dsn )
8077require .NoError (t , err )
8178
8279return  etcdClient , etcdContainer 
8380}
8481
85- func  setupTestContainers (t  * testing.T ) (* pgxpool.Pool , * sync. EtcdClient , func ()) {
82+ func  setupTestContainers (t  * testing.T ) (* pgxpool.Pool , * EtcdClient , func ()) {
8683ctx  :=  context .Background ()
8784
8885pool , pgContainer  :=  setupPostgreSQLContainer (ctx , t )
@@ -117,18 +114,18 @@ func TestPollingMechanism(t *testing.T) {
117114require .NoError (t , err )
118115
119116// Test GetPendingRecords function 
120- pendingRecords , err  :=  sync . GetPendingRecords (ctx , pool )
117+ pendingRecords , err  :=  GetPendingRecords (ctx , pool )
121118require .NoError (t , err )
122119assert .Len (t , pendingRecords , 1 )
123120assert .Equal (t , "test/polling/key1" , pendingRecords [0 ].Key )
124121assert .Equal (t , "value1" , pendingRecords [0 ].Value )
125122
126123// Test UpdateRevision function 
127- err  =  sync . UpdateRevision (ctx , pool , "test/polling/key1" , 123 )
124+ err  =  UpdateRevision (ctx , pool , "test/polling/key1" , 123 )
128125require .NoError (t , err )
129126
130127// Verify record was updated 
131- pendingAfterUpdate , err  :=  sync . GetPendingRecords (ctx , pool )
128+ pendingAfterUpdate , err  :=  GetPendingRecords (ctx , pool )
132129require .NoError (t , err )
133130assert .Len (t , pendingAfterUpdate , 0 , "No pending records should remain after update" )
134131
@@ -154,7 +151,7 @@ func TestBulkInsert(t *testing.T) {
154151defer  cancel ()
155152
156153// Prepare test records 
157- records  :=  []sync. KeyValueRecord {
154+ records  :=  []KeyValueRecord {
158155{
159156Key : "test/bulk/key1" ,
160157Value : ("value1" ),
@@ -179,7 +176,7 @@ func TestBulkInsert(t *testing.T) {
179176}
180177
181178// Test BulkInsert function 
182- err  :=  sync . BulkInsert (ctx , pool , records )
179+ err  :=  BulkInsert (ctx , pool , records )
183180require .NoError (t , err )
184181
185182// Verify records were inserted correctly 
@@ -189,17 +186,17 @@ func TestBulkInsert(t *testing.T) {
189186assert .Equal (t , 3 , count )
190187
191188// Verify specific record details 
192- var  key , value  sql. NullString 
189+ var  key , value  string 
193190var  revision  int64 
194191var  tombstone  bool 
195192err  =  pool .QueryRow (ctx , ` 
196193SELECT key, value, revision, tombstone  
197194FROM etcd WHERE key = 'test/bulk/key1' 
198195` ).Scan (& key , & value , & revision , & tombstone )
199196require .NoError (t , err )
200- assert .Equal (t , "test/bulk/key1" , key . String )
201- assert .True (t , value . Valid )
202- assert .Equal (t , "value1" , value . String )
197+ assert .Equal (t , "test/bulk/key1" , key )
198+ assert .NotEmpty (t , value )
199+ assert .Equal (t , "value1" , value )
203200assert .Equal (t , int64 (100 ), revision )
204201assert .False (t , tombstone )
205202
@@ -209,8 +206,8 @@ func TestBulkInsert(t *testing.T) {
209206FROM etcd WHERE key = 'test/bulk/key3' 
210207` ).Scan (& key , & value , & revision , & tombstone )
211208require .NoError (t , err )
212- assert .Equal (t , "test/bulk/key3" , key . String )
213- assert .False (t , value . Valid ) // NULL value 
209+ assert .Equal (t , "test/bulk/key3" , key )
210+ assert .Empty (t , value ) // NULL value 
214211assert .Equal (t , int64 (102 ), revision )
215212assert .True (t , tombstone )
216213}
@@ -227,23 +224,23 @@ func TestInsertPendingRecord(t *testing.T) {
227224defer  cancel ()
228225
229226// Test inserting a new pending record 
230- err  :=  sync . InsertPendingRecord (ctx , pool , "test/pending/key1" , ("value1" ), false )
227+ err  :=  InsertPendingRecord (ctx , pool , "test/pending/key1" , ("value1" ), false )
231228require .NoError (t , err )
232229
233230// Verify record was inserted with revision = -1 
234231var  revision  int64 
235- var  value  sql. NullString 
232+ var  value  string 
236233err  =  pool .QueryRow (ctx , ` 
237234SELECT revision, value FROM etcd  
238235WHERE key = 'test/pending/key1' 
239236` ).Scan (& revision , & value )
240237require .NoError (t , err )
241238assert .Equal (t , int64 (- 1 ), revision )
242- assert .True (t , value . Valid )
243- assert .Equal (t , "value1" , value . String )
239+ assert .NotEmpty (t , value )
240+ assert .Equal (t , "value1" , value )
244241
245242// Test inserting second record with same key (should create new record with different timestamp) 
246- err  =  sync . InsertPendingRecord (ctx , pool , "test/pending/key1" , ("updated_value" ), false )
243+ err  =  InsertPendingRecord (ctx , pool , "test/pending/key1" , ("updated_value" ), false )
247244require .NoError (t , err )
248245
249246// Verify both records exist (different timestamps, both with revision = -1) 
@@ -256,7 +253,7 @@ func TestInsertPendingRecord(t *testing.T) {
256253assert .Equal (t , 1 , count , "Should have 1 pending records for the same key with latest value" )
257254
258255// Test inserting tombstone record 
259- err  =  sync . InsertPendingRecord (ctx , pool , "test/pending/key2" , "" , true )
256+ err  =  InsertPendingRecord (ctx , pool , "test/pending/key2" , "" , true )
260257require .NoError (t , err )
261258
262259// Verify tombstone record 
@@ -267,7 +264,7 @@ func TestInsertPendingRecord(t *testing.T) {
267264` ).Scan (& revision , & value , & tombstone )
268265require .NoError (t , err )
269266assert .Equal (t , int64 (- 1 ), revision )
270- assert .False (t , value . Valid )  // NULL value 
267+ assert .Empty (t , value ) 
271268assert .True (t , tombstone )
272269}
273270
@@ -283,7 +280,7 @@ func TestGetLatestRevision(t *testing.T) {
283280defer  cancel ()
284281
285282// Test with empty table 
286- latestRevision , err  :=  sync . GetLatestRevision (ctx , pool )
283+ latestRevision , err  :=  GetLatestRevision (ctx , pool )
287284require .NoError (t , err )
288285assert .Equal (t , int64 (0 ), latestRevision )
289286
@@ -298,7 +295,7 @@ func TestGetLatestRevision(t *testing.T) {
298295require .NoError (t , err )
299296
300297// Test latest revision (should ignore -1 pending records) 
301- latestRevision , err  =  sync . GetLatestRevision (ctx , pool )
298+ latestRevision , err  =  GetLatestRevision (ctx , pool )
302299require .NoError (t , err )
303300assert .Equal (t , int64 (150 ), latestRevision )
304301}
@@ -320,13 +317,13 @@ func TestPendingRecordFiltering(t *testing.T) {
320317('test/filter/synced1', 'value1', 100, false), 
321318('test/filter/pending1', 'value2', -1, false), 
322319('test/filter/synced2', 'value3', 200, false), 
323- ('test/filter/pending2', NULL , -1, true), 
320+ ('test/filter/pending2', '' , -1, true), 
324321('test/filter/pending3', 'value4', -1, false) 
325322` )
326323require .NoError (t , err )
327324
328325// Test GetPendingRecords only returns revision = -1 
329- pendingRecords , err  :=  sync . GetPendingRecords (ctx , pool )
326+ pendingRecords , err  :=  GetPendingRecords (ctx , pool )
330327require .NoError (t , err )
331328assert .Len (t , pendingRecords , 3 )
332329
@@ -361,22 +358,22 @@ func TestConflictResolution(t *testing.T) {
361358defer  cancel ()
362359
363360// Insert a pending record 
364- err  :=  sync . InsertPendingRecord (ctx , pool , "test/conflict/key1" , "pending_value" , false )
361+ err  :=  InsertPendingRecord (ctx , pool , "test/conflict/key1" , "pending_value" , false )
365362require .NoError (t , err )
366363
367364// Verify it's pending 
368- pendingRecords , err  :=  sync . GetPendingRecords (ctx , pool )
365+ pendingRecords , err  :=  GetPendingRecords (ctx , pool )
369366require .NoError (t , err )
370367assert .Len (t , pendingRecords , 1 )
371368assert .Equal (t , "test/conflict/key1" , pendingRecords [0 ].Key )
372369assert .Equal (t , int64 (- 1 ), pendingRecords [0 ].Revision )
373370
374371// Simulate etcd sync by updating revision 
375- err  =  sync . UpdateRevision (ctx , pool , "test/conflict/key1" , 300 )
372+ err  =  UpdateRevision (ctx , pool , "test/conflict/key1" , 300 )
376373require .NoError (t , err )
377374
378375// Verify record is no longer pending 
379- pendingAfterUpdate , err  :=  sync . GetPendingRecords (ctx , pool )
376+ pendingAfterUpdate , err  :=  GetPendingRecords (ctx , pool )
380377require .NoError (t , err )
381378assert .Len (t , pendingAfterUpdate , 0 )
382379
@@ -390,7 +387,7 @@ func TestConflictResolution(t *testing.T) {
390387assert .Equal (t , int64 (300 ), revision )
391388
392389// Test updating non-existent pending record (should fail gracefully) 
393- err  =  sync . UpdateRevision (ctx , pool , "test/conflict/nonexistent" , 400 )
390+ err  =  UpdateRevision (ctx , pool , "test/conflict/nonexistent" , 400 )
394391assert .Error (t , err )
395392assert .Contains (t , err .Error (), "no pending record found" )
396393}
@@ -410,10 +407,10 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
410407recordCount  :=  1000 
411408start  :=  time .Now ()
412409
413- records  :=  make ([]sync. KeyValueRecord , recordCount )
410+ records  :=  make ([]KeyValueRecord , recordCount )
414411for  i  :=  0 ; i  <  recordCount ; i ++  {
415412value  :=  fmt .Sprintf ("test_value_%d" , i )
416- records [i ] =  sync. KeyValueRecord {
413+ records [i ] =  KeyValueRecord {
417414Key : fmt .Sprintf ("test/perf/key%d" , i ),
418415Value : value ,
419416Revision : int64 (i  +  1 ),
@@ -422,7 +419,7 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
422419}
423420}
424421
425- err  :=  sync . BulkInsert (ctx , pool , records )
422+ err  :=  BulkInsert (ctx , pool , records )
426423require .NoError (t , err )
427424
428425elapsed  :=  time .Since (start )
@@ -453,11 +450,11 @@ func TestPerformanceSyncLatency(t *testing.T) {
453450// Insert pending record 
454451key  :=  fmt .Sprintf ("test/latency/key%d" , i )
455452value  :=  fmt .Sprintf ("test_value_%d" , i )
456- err  :=  sync . InsertPendingRecord (ctx , pool , key , value , false )
453+ err  :=  InsertPendingRecord (ctx , pool , key , value , false )
457454require .NoError (t , err )
458455
459456// Update revision (simulating sync completion) 
460- err  =  sync . UpdateRevision (ctx , pool , key , int64 (i + 1 ))
457+ err  =  UpdateRevision (ctx , pool , key , int64 (i + 1 ))
461458require .NoError (t , err )
462459
463460latency  :=  time .Since (start )
0 commit comments