Home PostgreSQL Data sectioning and “live snapshots” in PostgreSQL

Data sectioning and “live snapshots” in PostgreSQL

by admin

Although the topic of sectioning has already been raised previously , I want to return to it to talk about my experience with this problem, arising from the need for analytical processing of large amounts of data. In addition to sectioning, I will consider an extremely simplified implementation of "snapshots" of aggregated queries, automatically updated as the raw data changes…
One of the main requirements to the developed system was the use of free software, and therefore I chose PostgreSQL. At the time when I started the project, I knew PostgreSQLrather superficially, but I was pretty well acquainted with Oracle Database. Since it was about analytical processing, I wanted to have analogues of Oracle options such as Partitioning and Materialized Views After familiarizing yourself with the capabilities of PostgreSQL , it became clear that this functionality will have to be written manually one way or another.
Of course, we were not talking about any full-fledged implementation of Materialized Views, providing rewriting queries For my needs, the ability to create automatically updated aggregate single-table selections was sufficient (support for table joins will likely be added in the near future). For sectioning, I planned to use the many times described approach using inherited tables, with data insertion controlled by a trigger. I had the idea of using a sectioning control Rules but I gave it up because, in my case, data insertion by single entries prevailed.
I started, of course, with the tables for storing metadata :
ps_tables.sql

create sequence ps_table_seq;create table ps_table (id bigint default nextval('ps_table_seq') not null, name varchar(50) not null unique, primary key(id));create sequence ps_column_seq;create table ps_column (id bigint default nextval('ps_column_seq') not null, table_id bigint not null references ps_table(id), name varchar(50) not null, parent_name varchar(50), type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')), unique (table_id, name), primary key(id));create table ps_range_partition (table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), start_value date not null, end_value date not null, primary key(table_id, start_value));create table ps_snapshot (snapshot_id bigint not null references ps_table(id), table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), primary key(snapshot_id));

Everything here is pretty obvious. The only thing worth mentioning is the column types :

Type Description
date Column with calendar date used for data sectioning and aggregation (PostgreSQL date and timestamp types are supported)
key Key used in group by phrase when aggregating data (all PostgreSQL integer types are supported)
nullable Key used in data aggregation, possibly containing null value
sum Value summation
min Minimum value
max Maximum value
cnt Counting the number of non-null values

The basis of the entire solution was a function that rebuilds the trigger functions for the table containing the raw data :
ps_trigger_regenerate(bigint)

create or replace function ps_trigger_regenerate(in p_table bigint) returns voidas $$declarel_sql text;l_table_name varchar(50);l_date_column varchar(50);l_flag boolean;tabs record;columns record;beginselect name into l_table_namefrom ps_table where id = p_table;l_sql :='create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' ||'as $'|| '$ ' ||'begin ';for tabs inselect a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_typefrom ps_snapshot a, ps_table bwhere a.table_id = p_tableand b.id = a.snapshot_idloopl_flag = FALSE;l_sql := l_sql ||'update ' || tabs.table_name || ' set ';for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idand not type_name in ('date', 'key', 'nullable')loopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'sum' thenl_sql := l_sql ||columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';end if;if columns.type_name = 'min' thenl_sql := l_sql ||columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';end if;if columns.type_name = 'max' thenl_sql := l_sql ||columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';end if;if columns.type_name = 'cnt' thenl_sql := l_sql ||columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';end if;end loop;l_flag = FALSE;l_sql := l_sql || 'where ';for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idand type_name in ('date', 'key', 'nullable')loopif l_flag thenl_sql := l_sql || 'and ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql ||columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';end if;if columns.type_name = 'key' thenl_sql := l_sql ||columns.name || ' = NEW.' || columns.parent_name || ' ';end if;if columns.type_name = 'nullable' thenl_sql := l_sql ||columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';end if;end loop;l_sql := l_sql || '; ' ||'if not FOUND then ' ||'insert into ' || tabs.table_name || '(';l_flag = FALSE;for columns inselect name, type_namefrom ps_columnwhere table_id = tabs.idloopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;l_sql := l_sql || columns.name;end loop;l_sql := l_sql || ') values (';l_flag = FALSE;for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idloopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';elsif columns.type_name = 'cnt' thenl_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end';elsif columns.type_name in ('nullable', 'sum') thenl_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';elsel_sql := l_sql || 'NEW.' || columns.parent_name;end if;end loop;l_sql := l_sql || '); ' ||'end if; ';end loop;select name into l_date_columnfrom ps_columnwhere table_id = p_tableand type_name = 'date';for tabs inselect to_char(start_value, 'YYYYMMDD') as start_value, to_char(end_value, 'YYYYMMDD') as end_value, type_namefrom ps_range_partitionwhere table_id = p_tableorder by start_value descloopl_sql := l_sql ||'if NEW.' || l_date_column || ' > = to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||'return null; ' ||'end if; ';end loop;l_sql := l_sql ||'return NEW; '||'end; '||'$'||'$ language plpgsql';execute l_sql;l_sql :='create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' ||'as $'|| '$ ' ||'begin ' ||'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' ||'end; '||'$'||'$ language plpgsql';execute l_sql;l_sql :='create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' ||'as $'|| '$ ' ||'begin ';for tabs inselect a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_typefrom ps_snapshot a, ps_table bwhere a.table_id = p_tableand b.id = a.snapshot_idloopl_flag = FALSE;l_sql := l_sql ||'update ' || tabs.table_name || ' set ';for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idand type_name in ('sum', 'cnt')loopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'sum' thenl_sql := l_sql ||columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';end if;if columns.type_name = 'cnt' thenl_sql := l_sql ||columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';end if;end loop;l_flag = FALSE;l_sql := l_sql || 'where ';for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idand type_name in ('date', 'key', 'nullable')loopif l_flag thenl_sql := l_sql || 'and ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql ||columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';end if;if columns.type_name = 'key' thenl_sql := l_sql ||columns.name || ' = NEW.' || columns.parent_name || ' ';end if;if columns.type_name = 'nullable' thenl_sql := l_sql ||columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';end if;end loop;l_sql := l_sql || '; ';end loop;l_sql := l_sql ||'return null; '||'end; '||'$'||'$ language plpgsql';execute l_sql;l_sql :='create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' ||'as $'|| '$ ' ||'begin ';for tabs inselect a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_typefrom ps_snapshot a, ps_table bwhere a.table_id = p_tableand b.id = a.snapshot_idloopl_flag = FALSE;l_sql := l_sql ||'update ' || tabs.table_name || ' set ';for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idand type_name in ('sum', 'cnt')loopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'sum' thenl_sql := l_sql ||columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';end if;if columns.type_name = 'cnt' thenl_sql := l_sql ||columns.name || ' = ' || columns.name ||' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' ||' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';end if;end loop;l_flag = FALSE;l_sql := l_sql || 'where ';for columns inselect name, parent_name, type_namefrom ps_columnwhere table_id = tabs.idand type_name in ('date', 'key', 'nullable')loopif l_flag thenl_sql := l_sql || 'and ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql ||columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';end if;if columns.type_name = 'key' thenl_sql := l_sql ||columns.name || ' = NEW.' || columns.parent_name || ' ';end if;if columns.type_name = 'nullable' thenl_sql := l_sql ||columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';end if;end loop;l_sql := l_sql || '; ';end loop;l_sql := l_sql ||'return null; '||'end; '||'$'||'$ language plpgsql';execute l_sql;end;$$ language plpgsql;

Despite its intimidating appearance, this function is quite simple. Its task is to generate (based on the available metadata), the four functions used in the construction of triggers :

  • ps_TABLE_insert_trigger() – Function controlling data insertion
  • ps_TABLE_update_trigger() – Controls data update function
  • ps_TABLE_delete_trigger() – Controls data deletion
  • ps_TABLE_raise_trigger() – Function controlling update and deletion of data

Here, the name of the table containing the source data is substituted for TABLE. A typical definition of the ps_TABLE_insert_trigger() function would look like this :

create or replace function ps_data_insert_trigger() returns triggeras $$beginupdate data_month setsum_field = sum_field + NEW.sum_field, min_field = least(min_field, NEW.min_field)where date_field = date_trunc('month', NEW.date_field)and key_field = NEW.key_field;if not FOUND theninsert into data_month(date_field, key_field, sum_field, min_field)values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);end if;if NEW.date_field > = to_date('20130101', 'YYYYMMDD') andNEW.date_field < to_date('20130201', 'YYYYMMDD') theninsert into data_20130101 values (NEW.*);return null;end if;return NEW;end;$$ language plpgsql;

Actually, the function looks a bit more complicated, because null values are handled in a special way. But, as an illustration, the above example is quite adequate. The logic of this code is obvious :

  • When inserted into the original data table, we try to update the counters in the aggregate view of data_month
  • If that fails (no record in data_month was found), add a new record
  • Next, we check if the date range for each section (one section in the example) is met, and if it is successful, we insert the record into the corresponding section (since the section inherits from the main table, you can safely use an asterisk) and return null to prevent the record from being inserted into the main table
  • If none of the sections fit, return NEW, allowing the insertion into the main table

The last point causes the data to be added to the main table if no matching section is found. In practice, this is quite convenient. Even if we do not create the section beforehand, or if we get data with an incorrect date, the data insertion will be successful. Subsequently, you can analyze the contents of the main table by running the query :

select * from only data

After that, create missing sections (as will be shown below, the data will be automatically transferred from the main table to the created section). In such cases, the number of records not in their section is usually not great, and the cost of transferring the data is negligible.
Now all that’s left is to do the bindings. Let’s start with the function to create a new section :
ps_add_range_partition(varchar, varchar, varchar, date)

create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, in p_type varchar, in p_start date) returns voidas $$declarel_sql text;l_end date;l_start_str varchar(10);l_end_str varchar(10);l_table bigint;l_flag boolean;columns record;beginperform 1from ps_table a, ps_column bwhere a.id = b.table_id and lower(a.name) = lower(p_table)and b.type_name = 'date' and lower(b.name) <> lower(p_column);if FOUND thenraise EXCEPTION 'Conflict DATE columns';end if;l_end := p_start + ('1 ' || p_type)::INTERVAL;perform 1from ps_table a, ps_range_partition bwhere a.id = b.table_id and lower(a.name) = lower(p_table)and (( p_start > = b.start_value and p_start < b.end_value ) or( b.start_value > = p_start and b.start_value < l_end ));if FOUND thenraise EXCEPTION 'Range intervals intersects';end if;perform 1from ps_tablewhere lower(name) = lower(p_table);if not FOUND theninsert into ps_table(name) values (lower(p_table));end if;select id into l_tablefrom ps_tablewhere lower(name) = lower(p_table);perform 1from ps_columnwhere table_id = l_table and type_name = 'date'and lower(name) = lower(p_column);if not FOUND theninsert into ps_column(table_id, name, type_name)values (l_table, lower(p_column), 'date');end if;insert into ps_range_partition(table_id, type_name, start_value, end_value)values (l_table, p_type, p_start, l_end);l_start_str = to_char(p_start, 'YYYYMMDD');l_end_str = to_char(l_end, 'YYYYMMDD');l_sql :='create table ' || p_table || '_' || l_start_str || '(' ||'check (' || p_column || ' > = to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||'primary key (';l_flag := FALSE;for columns inselect f.name as namefrom ( select ps_array_to_set(a.conkey) as nnfrom pg_constraint a, pg_class bwhere b.oid = a.conrelidand a.contype = 'p'and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nnfrom pg_attribute d, pg_class ewhere e.oid = d.attrelidand e.relname = p_table ) fwhere f.nn = c.nnorder by f.nnloopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;l_sql := l_sql || columns.name;end loop;l_sql := l_sql ||')) inherits (' || p_table || ')';execute l_sql;l_sql :='create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')';execute l_sql;perform ps_trigger_regenerate(l_table);execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;l_sql :='insert into ' || p_table || '_' || l_start_str || ' ' ||'select * from ' || p_table || ' where ' ||p_column || ' > = to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';execute l_sql;l_sql :='delete from only ' || p_table || ' where ' ||p_column || ' > = to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';execute l_sql;l_sql :='create trigger ps_' || p_table || '_before_insert ' ||'before insert on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_insert_trigger()';execute l_sql;perform 1from ps_snapshot a, ps_column bwhere b.table_id = a.snapshot_id and a.table_id = l_tableand b.type_name in ('min', 'max');if FOUND thenl_sql :='create trigger ps_' || p_table || '_after_update ' ||'after update on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_after_delete ' ||'after delete on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;elsel_sql :='create trigger ps_' || p_table || '_after_update ' ||'after update on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_update_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_after_delete ' ||'after delete on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_delete_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||'execute procedure ps_' || p_table || '_update_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||'execute procedure ps_' || p_table || '_delete_trigger()';execute l_sql;end if;end;$$ language plpgsql;

Here, after checking that the input data is correct, we add the necessary metadata, and then we create an inherited table. Then, we recreate the trigger functions by calling ps_trigger_regenerate, then we transfer the data subject to the sectioning condition into the created section with a dynamic query and recreate the triggers themselves.
There were two things that caused difficulties.

  1. I had to mess around a bit with adding month, day or year to the start date (depending on the input parameter p_type:
    l_end := p_start + ('1 ' || p_type)::INTERVAL;

  2. Since the primary key is not inherited, I had to compose a query to System Catalogs to get a list of primary key columns of the original table (I didn’t consider it reasonable to keep the description of the primary key in my metadata, either):
    select f.name as namefrom ( select ps_array_to_set(a.conkey) as nnfrom pg_constraint a, pg_class bwhere b.oid = a.conrelidand a.contype = 'p'and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nnfrom pg_attribute d, pg_class ewhere e.oid = d.attrelidand e.relname = p_table ) fwhere f.nn = c.nnorder by f.nn

Also, it should be noted that before creating an index, on the sectioning key (for the section created), it would be worth checking beforehand whether it is the leading column of the primary key (so as not to create a duplicate index).
The function to delete a section is much simpler and does not need any special comments:
ps_del_range_partition(varchar, date)

create or replace function ps_del_range_partition(in p_table varchar, in p_start date)returns voidas $$declarel_sql text;l_start_str varchar(10);l_table bigint;beginselect id into l_tablefrom ps_tablewhere lower(name) = lower(p_table);l_start_str = to_char(p_start, 'YYYYMMDD');delete from ps_range_partitionwhere table_id = l_tableand start_value = p_start;perform ps_trigger_regenerate(l_table);l_sql :='insert into ' || p_table || ' ' ||'select * from ' || p_table || '_' || l_start_str;execute l_sql;perform 1from ( select 1from ps_range_partitionwhere table_id = l_tableunion allselect 1from ps_snapshotwhere table_id = l_table ) a;if not FOUND thenexecute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;execute 'drop function ps_' || p_table || '_insert_trigger() cascade';execute 'drop function ps_' || p_table || '_raise_trigger() cascade';execute 'drop function ps_' || p_table || '_update_trigger() cascade';execute 'drop function ps_' || p_table || '_delete_trigger() cascade';delete from ps_column where table_id = l_table;delete from ps_table where id = l_table;end if;perform 1from ps_range_partitionwhere table_id = l_table;if not FOUND thendelete from ps_columnwhere table_id = l_tableand type_name = 'date';end if;execute 'drop table ' || p_table || '_' || l_start_str;end;$$ language plpgsql;

When you delete a section, the data is not lost, of course, but transferred to the main table (the triggers are removed beforehand, because, as it turned out, the keyword only does not work in the insert statement).
Still to be added are live data snapshot management features :
ps_add_snapshot_column(varchar, varchar, varchar, varchar)

create or replace function ps_add_snapshot_column(in p_snapshot varchar, in p_column varchar, in p_parent varchar, in p_type varchar) returns voidas $$declarel_table bigint;beginperform 1from ps_tablewhere lower(name) = lower(p_snapshot);if not FOUND theninsert into ps_table(name) values (lower(p_snapshot));end if;select id into l_tablefrom ps_tablewhere lower(name) = lower(p_snapshot);insert into ps_column(table_id, name, parent_name, type_name)values (l_table, lower(p_column), lower(p_parent), p_type);end;$$ language plpgsql;

ps_add_snapshot(varchar, varchar, varchar)

create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, in p_type varchar) returns voidas $$declarel_sql text;l_table bigint;l_snapshot bigint;l_flag boolean;columns record;beginselect id into l_snapshotfrom ps_tablewhere lower(name) = lower(p_snapshot);perform 1from ps_columnwhere table_id = l_snapshotand type_name in ('date', 'key');if not FOUND thenraise EXCEPTION 'Key columns not found';end if;perform 1from ps_columnwhere table_id = l_snapshotand not type_name in ('date', 'key', 'nullable');if not FOUND thenraise EXCEPTION 'Aggregate columns not found';end if;perform 1from ps_tablewhere lower(name) = lower(p_table);if not FOUND theninsert into ps_table(name) values (lower(p_table));end if;select id into l_tablefrom ps_tablewhere lower(name) = lower(p_table);insert into ps_snapshot(table_id, snapshot_id, type_name)values (l_table, l_snapshot, p_type);perform ps_trigger_regenerate(l_table);l_sql := 'create table ' || p_snapshot || ' (';l_flag := FALSE;for columns inselect name, type_namefrom ps_columnwhere table_id = l_snapshotloopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql || columns.name || ' date not null';elsel_sql := l_sql || columns.name || ' bigint not null';end if;end loop;l_sql := l_sql || ', primary key (';l_flag := FALSE;for columns inselect namefrom ps_columnwhere table_id = l_snapshotand type_name in ('date', 'key', 'nullable')loopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;l_sql := l_sql || columns.name;end loop;l_sql := l_sql || '))';execute l_sql;execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;l_sql :='create trigger ps_' || p_table || '_before_insert ' ||'before insert on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_insert_trigger()';execute l_sql;perform 1from ps_snapshot a, ps_column bwhere b.table_id = a.snapshot_id and a.table_id = l_tableand b.type_name in ('min', 'max');if FOUND thenl_sql :='create trigger ps_' || p_table || '_after_update ' ||'after update on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_after_delete ' ||'after delete on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;elsel_sql :='create trigger ps_' || p_table || '_after_update ' ||'after update on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_update_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_after_delete ' ||'after delete on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_delete_trigger()';execute l_sql;end if;l_sql := 'insert into ' || p_snapshot || '(';l_flag := FALSE;for columns inselect namefrom ps_columnwhere table_id = l_snapshotloopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;l_sql := l_sql || columns.name;end loop;l_sql := l_sql || ') select ';l_flag := FALSE;for columns inselect parent_name as name, type_namefrom ps_columnwhere table_id = l_snapshotloopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';end if;if columns.type_name = 'key' thenl_sql := l_sql || columns.name;end if;if columns.type_name = 'nullable' thenl_sql := l_sql || 'coalesce(' || columns.name || ', 0)';end if;if columns.type_name = 'sum' thenl_sql := l_sql || 'sum(' || columns.name || ')';end if;if columns.type_name = 'min' thenl_sql := l_sql || 'min(' || columns.name || ')';end if;if columns.type_name = 'max' thenl_sql := l_sql || 'max(' || columns.name || ')';end if;if columns.type_name = 'cnt' thenl_sql := l_sql || 'count(' || columns.name || ')';end if;end loop;l_sql := l_sql || 'from ' || p_table || ' group by ';l_flag := FALSE;for columns inselect parent_name as name, type_namefrom ps_columnwhere table_id = l_snapshotand type_name in ('date', 'key', 'nullable')loopif l_flag thenl_sql := l_sql || ', ';end if;l_flag := TRUE;if columns.type_name = 'date' thenl_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';elsel_sql := l_sql || columns.name;end if;end loop;execute l_sql;end;$$ language plpgsql;

ps_del_snapshot(varchar)

create or replace function ps_del_snapshot(in p_snapshot varchar) returns voidas $$declarel_sql text;p_table varchar(50);l_table bigint;l_snapshot bigint;beginselect a.table_id, c.name into l_table, p_tablefrom ps_snapshot a, ps_table b, ps_table cwhere b.id = a.snapshot_id and c.id = a.table_idand lower(b.name) = lower(p_snapshot);select id into l_snapshotfrom ps_tablewhere lower(name) = lower(p_snapshot);delete from ps_snapshot where snapshot_id = l_snapshot;delete from ps_column where table_id = l_snapshot;delete from ps_table where id = l_snapshot;execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;perform 1from ( select 1from ps_range_partitionwhere table_id = l_tableunion allselect 1from ps_snapshotwhere table_id = l_table ) a;if not FOUND thenexecute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade';execute 'drop function if exists ps_' || p_table || '_raise_trigger() cascade';execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade';execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade';elseperform ps_trigger_regenerate(l_table);l_sql :='create trigger ps_' || p_table || '_before_insert ' ||'before insert on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_insert_trigger()';execute l_sql;perform 1from ps_snapshot a, ps_column bwhere b.table_id = a.snapshot_id and a.table_id = l_tableand b.type_name in ('min', 'max');if FOUND thenl_sql :='create trigger ps_' || p_table || '_after_update ' ||'after update on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_after_delete ' ||'after delete on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_raise_trigger()';execute l_sql;elsel_sql :='create trigger ps_' || p_table || '_after_update ' ||'after update on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_update_trigger()';execute l_sql;l_sql :='create trigger ps_' || p_table || '_after_delete ' ||'after delete on ' || p_table || ' for each row ' ||'execute procedure ps_' || p_table || '_delete_trigger()';execute l_sql;end if;end if;execute 'drop table if exists ' || p_snapshot;end;$$ language plpgsql;

There’s nothing fundamentally new here either, and the only thing I’d like to mention is that, in case ‘min’ or ‘max’ aggregates are used, when creating triggers, the function ps_TABLE_raise_trigger() is used, which prohibits deletions and changes in the table on which the snapshot is built.This is done because I couldn’t come up with a performance-adequate implementation for updating these aggregates when the update and delete statements in the original table are executed.
Let’s see how it all works. Let’s create a test table :

create sequence test_seq;create table test (id bigint default nextval('test_seq') not null, event_time timestamp not null, customer_id bigint not null, value bigint not null, primary key(id));

Now, to add a section, simply run the following query :

select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))

As a result, an inherited table test_20130501 will be created, which will automatically contain all records for the month of May.
To delete a section, you can run the following query :

select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))

Creating a snapshot is a bit more complicated, because you first need to define the columns of interest :

select ps_add_snapshot_column('test_month', 'customer_id', 'key')select ps_add_snapshot_column('test_month', 'event_time', 'date')select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')select ps_add_snapshot('test', 'test_month', 'month')

This will create an automatically updated table, based on the following query :

select customer_id, date_trunc('month', event_time), sum(value) as value_sum, count(value) as value_cnt, max(value) as value_maxfrom testgroup by customer_id, date_trunc('month', event_time)

You can remove the snapshot by running the following query :

select ps_del_snapshot('test_month')

That’s all for today. The scripts can be picked up at GitHub

You may also like