Skip to content

Commit 47ed7c3

Browse files
committed
[+] add PostgreSQL functions to work with etcd replicated table
1 parent 021cd73 commit 47ed7c3

File tree

3 files changed

+97
-28
lines changed

3 files changed

+97
-28
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
-- Main etcd table for key-value storage with revision history
2+
CREATE TABLE etcd (
3+
ts timestamp with time zone NOT NULL DEFAULT now(),
4+
key text NOT NULL,
5+
value text,
6+
revision bigint NOT NULL,
7+
tombstone boolean NOT NULL DEFAULT false,
8+
PRIMARY KEY(key, revision)
9+
);
10+
11+
-- Write-ahead log table for tracking changes to be synchronized
12+
CREATE TABLE etcd_wal (
13+
ts timestamp with time zone NOT NULL DEFAULT now(),
14+
key text NOT NULL,
15+
value text,
16+
revision bigint, -- Current revision before modification (null = new key)
17+
PRIMARY KEY(key, ts)
18+
);
19+
20+
-- Performance indexes
21+
CREATE INDEX idx_etcd_key_revision ON etcd(key, revision DESC);
22+
CREATE INDEX idx_etcd_wal_key ON etcd_wal(key);
23+
CREATE INDEX idx_etcd_wal_ts ON etcd_wal(ts);
24+
25+
-- Function: Get latest value for a key with revision enforcement
26+
CREATE OR REPLACE FUNCTION etcd_get(p_key text)
27+
RETURNS TABLE(key text, value text, revision bigint, tombstone boolean, ts timestamp with time zone)
28+
LANGUAGE sql STABLE AS $$
29+
SELECT e.key, e.value, e.revision, e.tombstone, e.ts
30+
FROM etcd e
31+
WHERE e.key = p_key
32+
ORDER BY e.revision DESC
33+
LIMIT 1;
34+
$$;
35+
36+
-- Function: Get all revisions for a key higher than passed revision
37+
CREATE OR REPLACE FUNCTION etcd_get_all(p_key text, p_min_revision bigint DEFAULT 0)
38+
RETURNS TABLE(key text, value text, revision bigint, tombstone boolean, ts timestamp with time zone)
39+
LANGUAGE sql STABLE AS $$
40+
SELECT e.key, e.value, e.revision, e.tombstone, e.ts
41+
FROM etcd e
42+
WHERE e.key = p_key AND e.revision > p_min_revision
43+
ORDER BY e.revision ASC;
44+
$$;
45+
46+
-- Function: Set key-value (logs to WAL for synchronization to etcd)
47+
CREATE OR REPLACE FUNCTION etcd_set(p_key text, p_value text)
48+
RETURNS timestamp with time zone
49+
LANGUAGE sql AS $$
50+
INSERT INTO etcd_wal (key, value, revision)
51+
SELECT p_key, p_value, (SELECT revision FROM etcd_get(p_key))
52+
RETURNING ts;
53+
$$;
54+
55+
-- Function: Delete key (logs to WAL for synchronization to etcd)
56+
CREATE OR REPLACE FUNCTION etcd_delete(p_key text)
57+
RETURNS timestamp with time zone
58+
LANGUAGE sql AS $$
59+
INSERT INTO etcd_wal (key, value, revision)
60+
SELECT p_key, NULL, (SELECT revision FROM etcd_get(p_key))
61+
RETURNING ts;
62+
$$;
63+
64+
-- Trigger function to notify on WAL changes
65+
CREATE OR REPLACE FUNCTION notify_etcd_change()
66+
RETURNS TRIGGER AS $$
67+
BEGIN
68+
PERFORM pg_notify('etcd_changes',
69+
json_build_object(
70+
'key', NEW.key,
71+
'ts', NEW.ts,
72+
'value', NEW.value,
73+
'revision', NEW.revision,
74+
'operation', CASE
75+
WHEN NEW.value IS NULL THEN 'DELETE'
76+
WHEN NEW.revision IS NULL THEN 'CREATE'
77+
ELSE 'UPDATE'
78+
END
79+
)::text
80+
);
81+
RETURN NEW;
82+
END;
83+
$$ LANGUAGE plpgsql;
84+
85+
-- Trigger on WAL table for real-time notifications
86+
CREATE TRIGGER etcd_wal_notify
87+
AFTER INSERT ON etcd_wal
88+
FOR EACH ROW
89+
EXECUTE FUNCTION notify_etcd_change();

internal/migrations/migrations.go

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,25 @@ package migrations
33

44
import (
55
"context"
6+
_ "embed"
67
"fmt"
78
"sync"
89

910
migrator "github.com/cybertec-postgresql/pgx-migrator"
1011
"github.com/jackc/pgx/v5"
1112
)
1213

14+
//go:embed 001_create_tables.sql
15+
var createTablesSQL string
16+
1317
// migrations holds function returning all upgrade migrations needed
1418
var migrations func() migrator.Option = func() migrator.Option {
1519
return migrator.Migrations(
1620
&migrator.Migration{
1721
Name: "001_create_tables",
1822
Func: func(ctx context.Context, tx pgx.Tx) error {
19-
// Create all tables and indexes in a single transaction
20-
_, err := tx.Exec(ctx, `
21-
-- Main etcd table for key-value storage with revision history
22-
CREATE TABLE etcd (
23-
ts timestamp with time zone NOT NULL DEFAULT now(),
24-
key text NOT NULL,
25-
value text,
26-
revision bigint NOT NULL,
27-
tombstone boolean NOT NULL DEFAULT false,
28-
PRIMARY KEY(key, revision)
29-
);
30-
31-
-- Write-ahead log table for tracking changes to be synchronized
32-
CREATE TABLE etcd_wal (
33-
id serial PRIMARY KEY,
34-
ts timestamp with time zone NOT NULL DEFAULT now(),
35-
key text NOT NULL,
36-
value text,
37-
revision bigint -- Current revision before modification (null = new key)
38-
);
39-
40-
-- Performance indexes
41-
CREATE INDEX idx_etcd_key_revision ON etcd(key, revision DESC);
42-
CREATE INDEX idx_etcd_wal_key ON etcd_wal(key);
43-
CREATE INDEX idx_etcd_wal_ts ON etcd_wal(ts);
44-
`)
23+
// Execute the embedded SQL file
24+
_, err := tx.Exec(ctx, createTablesSQL)
4525
return err
4626
},
4727
},

specs/001-describe-building-a/tasks.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030

3131
## Phase 3.2: Database Schema & Infrastructure
3232
- [x] T005 Create PostgreSQL schema file: migrations/001_create_tables.sql with etcd and etcd_wal tables
33-
- [ ] T006 Create PostgreSQL functions: migrations/002_create_functions.sql for get/set/delete with revision enforcement
34-
- [ ] T007 Create trigger and NOTIFY: migrations/003_create_triggers.sql for etcd_wal table notifications
33+
- [x] T006 Create PostgreSQL functions: migrations/002_create_functions.sql for get/set/delete with revision enforcement
34+
- [x] T007 Create trigger and NOTIFY: migrations/003_create_triggers.sql for etcd_wal table notifications
3535
- [ ] T008 [P] Create testcontainers setup: tests/infrastructure_test.go for PostgreSQL and etcd containers
3636

3737
## Phase 3.3: Tests First (TDD) ⚠️ MUST COMPLETE BEFORE 3.4

0 commit comments

Comments
 (0)