When it comes to managing databases, data integrity is crucial. However, in certain situations, such as production database corruption, it's imperative to have a robust backup strategy. This article describes a PostgreSQL script that was developed out of necessity to generate SQL statements for an incremental backup from a specific point in time. This allows applying changes to a previous backup to restore a database to its most recent, accurate state.
Overview of the Script
The script is implemented as a PostgreSQL function named generate_sql_statements. Its primary purpose is to scan specified database tables for changes (new inserts or updates) that occurred after a given timestamp (reference_timestamp). It generates corresponding SQL statements to reflect these changes, allowing the user to recreate the database's state incrementally.
CREATE OR REPLACE FUNCTION generate_sql_statements(table_names TEXT[], reference_timestamp TIMESTAMPTZ) RETURNS TABLE (sql_statement TEXT) AS $$ DECLARE input_table_name TEXT; column_record RECORD; action_type TEXT; row_json JSONB; formatted_value TEXT; formatted_values TEXT; columns TEXT; update_set_clause TEXT; BEGIN -- Loop through each table name FOREACH input_table_name IN ARRAY table_names LOOP -- Step 1: Get column information for the current table FOR column_record IN SELECT column_name, data_type, ordinal_position FROM information_schema.columns WHERE table_schema = 'public' AND table_name = input_table_name ORDER BY ordinal_position LOOP -- Step 2: Identify rows for INSERT or UPDATE and convert rows to JSON FOR row_json, action_type IN EXECUTE format(' SELECT to_jsonb(cf), CASE WHEN cf."CreatedOn" > $1 THEN ''INSERT'' WHEN cf."UpdatedOn" > $1 AND cf."CreatedOn" <= $1 THEN ''UPDATE'' ELSE NULL END FROM %I cf WHERE cf."CreatedOn" > $1 OR cf."UpdatedOn" > $1', input_table_name) USING reference_timestamp LOOP -- Step 3: Generate formatted column values for SQL statements formatted_values := ''; columns := ''; update_set_clause := ''; FOR column_record IN SELECT column_name, data_type, ordinal_position FROM information_schema.columns WHERE table_schema = 'public' AND table_name = input_table_name ORDER BY ordinal_position LOOP formatted_value := CASE WHEN (row_json -> column_record.column_name) = 'null' THEN 'NULL' WHEN column_record.data_type IN ('character varying', 'text', 'date', 'timestamp without time zone', 'timestamp with time zone') THEN '''''' || REPLACE(row_json ->> column_record.column_name, '''', '''''') || '''''' WHEN column_record.data_type = 'jsonb' THEN '''''' || REPLACE((row_json -> column_record.column_name)::TEXT, '''', '''''') || '''''::jsonb' ELSE row_json ->> column_record.column_name END; -- Aggregate column names and values for the INSERT statements IF columns = '' THEN columns := quote_ident(column_record.column_name); formatted_values := formatted_value; ELSE columns := columns || ', ' || quote_ident(column_record.column_name); formatted_values := formatted_values || ', ' || formatted_value; END IF; -- Build the update_set_clause, excluding the "Id" column IF column_record.column_name <> 'Id' THEN IF update_set_clause = '' THEN update_set_clause := quote_ident(column_record.column_name) || ' = ' || formatted_value; ELSE update_set_clause := update_set_clause || ', ' || quote_ident(column_record.column_name) || ' = ' || formatted_value; END IF; END IF; END LOOP; -- Step 4: Construct and return the SQL statement based on the action type IF action_type = 'INSERT' THEN RETURN QUERY EXECUTE format(' SELECT ''INSERT INTO %I (%s) VALUES (%s);''', input_table_name, columns, formatted_values ); ELSIF action_type = 'UPDATE' THEN RETURN QUERY EXECUTE format(' SELECT ''UPDATE %I SET %s WHERE "Id" = %s;''', input_table_name, update_set_clause, row_json ->> 'Id' ); END IF; END LOOP; END LOOP; END LOOP; END; $$ LANGUAGE plpgsql;
How It Works
The script performs the following steps:
Iterate Over the Tables: The script accepts an array of table names (
table_names
) as input. It loops through each table to process rows that have changed (either inserted or updated) since the specified timestamp (reference_timestamp
).Identify Changes (Inserts and Updates): For each table, it queries the rows where either the "CreatedOn" or "UpdatedOn" timestamps are more recent than the
reference_timestamp
. Each identified row is converted to JSON format to facilitate the creation of SQL statements.-
Generate SQL Statements:
- Inserts: For rows where the "CreatedOn" timestamp is newer than the reference, an
INSERT
statement is generated. - Updates: For rows where the "UpdatedOn" timestamp is newer but the "CreatedOn" timestamp is older than the reference, an
UPDATE
statement is generated.
- Inserts: For rows where the "CreatedOn" timestamp is newer than the reference, an
-
Build and Format SQL Statements:
- For each column in the table, the script formats the column values to be compatible with SQL syntax. It handles various data types, including strings, dates, and JSONB.
- For
INSERT
statements, the script aggregates column names and their corresponding values. - For
UPDATE
statements, it builds aSET
clause to update all columns (except the "Id" column) of the specified row.
Return Generated SQL Statements: Depending on whether an action is identified as an
INSERT
orUPDATE
, the corresponding SQL statement is returned to be executed on the backup database.
Detailed Script Breakdown
Here's a step-by-step breakdown of the script:
- Step 1 - Column Information: For each table, it retrieves column metadata (name, data type, and position) from the
information_schema.columns
. This information is necessary to format the column values correctly in the SQL statements.
FOR column_record IN SELECT column_name, data_type, ordinal_position FROM information_schema.columns WHERE table_schema = 'public' AND table_name = input_table_name ORDER BY ordinal_position LOOP
- Step 2 - Identify Rows for Changes: Using a dynamic SQL statement, it identifies rows for
INSERT
orUPDATE
. Rows are selected if:- The "CreatedOn" timestamp is newer than the reference timestamp (
INSERT
). - The "UpdatedOn" timestamp is newer, but the "CreatedOn" timestamp is older than the reference (
UPDATE
).
- The "CreatedOn" timestamp is newer than the reference timestamp (
FOR row_json, action_type IN EXECUTE format(' SELECT to_jsonb(cf), CASE WHEN cf."CreatedOn" > $1 THEN ''INSERT'' WHEN cf."UpdatedOn" > $1 AND cf."CreatedOn" <= $1 THEN ''UPDATE'' ELSE NULL END FROM %I cf WHERE cf."CreatedOn" > $1 OR cf."UpdatedOn" > $1', input_table_name) USING reference_timestamp LOOP
- Step 3 - Generate Column Values: For each identified row, the script formats the column values to suit their data type. For example:
- Strings are enclosed in single quotes, and any existing single quotes are escaped.
- JSONB data types are cast to JSONB in the output.
It then aggregates these values into a comma-separated list to be used in the INSERT
statements. Similarly, it prepares the SET
clause for UPDATE
statements, excluding the "Id" column.
formatted_values := ''; columns := ''; update_set_clause := ''; FOR column_record IN SELECT column_name, data_type, ordinal_position FROM information_schema.columns WHERE table_schema = 'public' AND table_name = input_table_name ORDER BY ordinal_position LOOP formatted_value := CASE WHEN (row_json -> column_record.column_name) = 'null' THEN 'NULL' WHEN column_record.data_type IN ('character varying', 'text', 'date', 'timestamp without time zone', 'timestamp with time zone') THEN '''''' || REPLACE(row_json ->> column_record.column_name, '''', '''''') || '''''' WHEN column_record.data_type = 'jsonb' THEN '''''' || REPLACE((row_json -> column_record.column_name)::TEXT, '''', '''''') || '''''::jsonb' ELSE row_json ->> column_record.column_name END; -- Aggregate column names and values for the INSERT statements IF columns = '' THEN columns := quote_ident(column_record.column_name); formatted_values := formatted_value; ELSE columns := columns || ', ' || quote_ident(column_record.column_name); formatted_values := formatted_values || ', ' || formatted_value; END IF; -- Build the update_set_clause, excluding the "Id" column IF column_record.column_name <> 'Id' THEN IF update_set_clause = '' THEN update_set_clause := quote_ident(column_record.column_name) || ' = ' || formatted_value; ELSE update_set_clause := update_set_clause || ', ' || quote_ident(column_record.column_name) || ' = ' || formatted_value; END IF; END IF; END LOOP;
- Step 4 - Construct SQL Statements:
- For an
INSERT
, it generates an SQL statement to insert the row into the table with the formatted column values. - For an
UPDATE
, it generates an SQL statement to update the row, identified by its "Id" column.
- For an
IF action_type = 'INSERT' THEN RETURN QUERY EXECUTE format(' SELECT ''INSERT INTO %I (%s) VALUES (%s);''', input_table_name, columns, formatted_values ); ELSIF action_type = 'UPDATE' THEN RETURN QUERY EXECUTE format(' SELECT ''UPDATE %I SET %s WHERE "Id" = %s;''', input_table_name, update_set_clause, row_json ->> 'Id' ); END IF;
Top comments (0)