Using Postgres for append-only immutable data

Norman Yamada, Parataxis, LLC

Who am I?

  • Independent Postgres hacker, first used Postgres in 2004
  • User -- not contributor

Basic use-case

  • Large set of data (prices, user_prefs, financial transactions) where volume of inserts is very great

    • Perhaps table is partitioned or is quasi-federated out of heterogenous data stores.
  • But corrections (updates, deletes) to data must be retained
  • Want the ability to "time-travel" to know how data was at any point in time

Let's say we want to track changes for rapidly changing data --

Let's say we care about the price of a widget at 9 am. We get back price information. If we calculate a signal on the price between 9 am and 9:15, we use that price as a fact. Then at 9:20, we receive a price correction. The 9 am widget price has changed. We now want to know that the corrected price. But if we try to replay signals as of 9:10 am, we want to have the old price.

If syncing server to a heterogenous datastore, and sync server crashes at 9:20 am, we want to be able to discover if the updated price has been written easily

Some current projects using immutable data stores:

  • Apache Kafka (http://kafka.apache.org/)

    Distributed messenging server

    Is it possible to delete a topic? In the current version, 0.8.0, no. (You could clear the entire Kafka and zookeeper states to delete all topics and data.) But upcoming releases are expected to include a delete topic tool.

  • Apache Zookeeper (http://zookeeper.apache.org/)

    ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.

  • Datomic (Rich Hickey) -- (http://www.datomic.com)

    • Founded by creator of Clojure.
    • Distributed database with immutable data store.
    • Database with no side effects(!)
In [1]:
from IPython.display import HTML
HTML('<iframe src="http://www.datomic.com" width=1024 height=768></iframe>')
Out[1]:

Prior Art

In [2]:
HTML('<iframe src="http://www.postgresql.org/docs/6.3/static/c0503.htm" width=1024 height=768></iframe>')
Out[2]:

But

  • This feature involved keeping min and max for every row in database -- dropped as of Postgres 6.2

SPI timetravel extension

http://www.postgresql.org/docs/current/static/contrib-spi.html

  • Table-based
  • Uses trigger function before insert, update or delete.
  • Assumes two columns of abstime(!) to track freshness of data
In [3]:
HTML('<iframe src="http://www.postgresql.org/docs/current/static/contrib-spi.html" width=1024 height=768></iframe>')
Out[3]:

But

  • Use of deprecated type abstime means precision only to second
  • Trigger-based function adds complexity
  • Doesn't use rangetypes
  • No real attention to extension since 2007(!)
In [4]:
HTML('<iframe src="http://postgresql.1045698.n5.nabble.com/template/NamlServlet.jtp?macro=print_post&node=1961716" width=800 height=600></iframe>')
Out[4]:

Current Approaches

  • Oracle has flashback queries:
SELECT * 
  FROM FOO 
  AS OF TIMESTAMP TO_TIMESTAMP('2014-03-29 13:34:12', 'YYYY-MM-DD HH24:MI:SS');

  • Depends on setting of UNDO_RETENTION to determine time period available for flashback -- typically <= 1 day

Can we do this without extensions?

  • Block updates and deletes from table

  • Timestamp every record insert with clock_timestamp

  • Updates will be have later date_entered for same fact; deletes just not allowed

  • Use DISTINCT ON or WINDOW functions to get record as of timestamp

  • Have to use custom sql to access current state

In [10]:
%%script psql -d tmc -U norman
\d price
                                     Table "public.price"
    Column    |           Type           |                      Modifiers                      
--------------+--------------------------+-----------------------------------------------------
 pid          | bigint                   | not null default nextval('price_pid_seq'::regclass)
 cid          | integer                  | not null
 dte          | timestamp with time zone | not null
 settle       | numeric(33,16)           | not null
 date_entered | timestamp with time zone | not null default clock_timestamp()
Indexes:
    "price_pkey" PRIMARY KEY, btree (pid)
    "price_cid_dte_date_entered_key" UNIQUE CONSTRAINT, btree (cid, dte, date_entered)
    "price_date_entered" btree (date_entered)
    "price_date_entered_dte" btree (date_entered, dte)
Triggers:
    price_before_truncate BEFORE TRUNCATE ON price FOR EACH STATEMENT EXECUTE PROCEDURE price_block_update_or_delete_or_truncate()
    price_before_update_or_delete BEFORE DELETE OR UPDATE ON price FOR EACH ROW EXECUTE PROCEDURE price_block_update_or_delete_or_truncate()

In [5]:
%load_ext sql
%sql postgresql://[email protected]/tmc
cid=432
In [12]:
%%script psql -d tmc -U norman
\sf+ price_block_update_or_delete_or_truncate
        CREATE OR REPLACE FUNCTION public.price_block_update_or_delete_or_truncate()
         RETURNS trigger
         LANGUAGE plpgsql
         IMMUTABLE
1       AS $function$ 
2       BEGIN 
3          RAISE EXCEPTION 'Changes to data are not allowed!';
4       END; 
5       $function$
In [17]:
%%script psql -d tmc -U norman
update price 
set settle = 2 
where cid = 432
and dte = '2013-12-02';

delete from price
where cid = 432
and dte = '2013-12-02';

truncate table price;
ERROR:  Changes to data are not allowed!
ERROR:  Changes to data are not allowed!
ERROR:  Changes to data are not allowed!

Importance of insert-only

  • Each change to price table is guaranteed to be idempotent
  • Clock_timestamp guarantees ordering (even if overlapping connections in long transactions)
  • How do we get meaningful data out of append-only table if multiple rows for given fact?
In [6]:
%%sql
SELECT min(date_entered),max(date_entered),count(*)
FROM price
WHERE cid = :cid
AND dte = '2013-12-13'
1 rows affected.
Out[6]:
min max count
2013-12-16 00:47:55+01:00 2013-12-31 23:21:17+01:00 312
In [6]:
%%sql 
select * 
from price 
where cid = :cid 
and dte = '2013-12-13' 
order by date_entered desc 
limit 10
10 rows affected.
Out[6]:
pid cid dte settle date_entered
6324367 432 2013-12-13 00:00:00+01:00 0.8495000000000000 2013-12-31 23:21:17+01:00
2734295 432 2013-12-13 00:00:00+01:00 8.7554000000000000 2013-12-31 22:33:42+01:00
2085648 432 2013-12-13 00:00:00+01:00 3.3659000000000000 2013-12-31 20:05:04+01:00
5170273 432 2013-12-13 00:00:00+01:00 1.2348000000000000 2013-12-31 19:51:03+01:00
5660964 432 2013-12-13 00:00:00+01:00 22.6408000000000000 2013-12-31 15:23:51+01:00
5855569 432 2013-12-13 00:00:00+01:00 19.7054000000000000 2013-12-31 15:23:30+01:00
2508825 432 2013-12-13 00:00:00+01:00 7.3199000000000000 2013-12-31 14:17:44+01:00
6314247 432 2013-12-13 00:00:00+01:00 2.9704000000000000 2013-12-31 12:50:36+01:00
2231736 432 2013-12-13 00:00:00+01:00 17.1793000000000000 2013-12-31 12:40:31+01:00
6931619 432 2013-12-13 00:00:00+01:00 3.0220000000000000 2013-12-31 12:29:24+01:00
In [6]:
%%sql 
-- Get price for Dec 13 2013 as of 12:30 pm Dec 31st
select distinct on (cid,dte) cid,dte,date_entered, settle 
from price
where cid = :cid
and dte = '2013-12-13'
and date_entered <= '2013-12-31 12:30:00'
order by 1,2,3 desc
1 rows affected.
Out[6]:
cid dte date_entered settle
432 2013-12-13 00:00:00+01:00 2013-12-31 12:29:24+01:00 3.0220000000000000
In [7]:
%%sql 
-- Get price for Dec 13 2013 as of 12:45 pm Dec 31st
select distinct on (cid,dte) cid,dte,date_entered, settle 
from price
where cid = :cid
and dte = '2013-12-13'
and date_entered <= '2013-12-31 12:45:00'
order by 1,2,3 desc
1 rows affected.
Out[7]:
cid dte date_entered settle
432 2013-12-13 00:00:00+01:00 2013-12-31 12:40:31+01:00 17.1793000000000000
In [35]:
%%sql
-- or we can use windowing example
select distinct cid,dte, 
first_value(date_entered) over date_window as date_entered,
first_value(settle) over date_window as settle
FROM price
where cid = :cid
and dte = '2013-12-13'
and date_entered <= '2013-12-31 12:45:00'
window date_window as
   (partition by cid, dte
   order by cid, dte, date_entered desc
   ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
order by 1,2,3 desc;
1 rows affected.
Out[35]:
cid dte date_entered settle
432 2013-12-13 00:00:00+01:00 2013-12-31 12:40:31+01:00 17.1793000000000000

Materialized views

  • In certain cases, rewriting data using range types and btree-gist indexes may improve discovery queries
In [37]:
%%script psql -d tmc -U norman
-- create extension btree_gist ;
-- need this to mix btree with gist 
\d price_range
            Table "public.price_range"
   Column   |           Type           | Modifiers 
------------+--------------------------+-----------
 cid        | integer                  | not null
 dte        | timestamp with time zone | not null
 settle     | numeric(33,16)           | not null
 date_range | tstzrange                | 
Indexes:
    "price_range_cid_dte_date_range_excl" EXCLUDE USING gist (cid WITH =, dte WITH =, date_range WITH &&)

In [8]:
%%script psql -d tmc -U norman
-- insert into price_range
-- (cid,dte,settle,date_range)
-- select cid,dte,settle,
--    tstzrange(date_entered,lead(date_entered) 
--     over (partition by cid,dte order by cid,dte,date_entered)
--     ) as date_range
-- from price
-- order by cid,dte,date_entered;
select * 
from price_range 
where cid = 432
and dte = '2013-12-13' 
order by date_range desc limit 10;
 cid |          dte           |       settle        |                     date_range                      
-----+------------------------+---------------------+-----------------------------------------------------
 432 | 2013-12-13 00:00:00+01 |  0.8495000000000000 | ["2013-12-31 23:21:17+01",)
 432 | 2013-12-13 00:00:00+01 |  8.7554000000000000 | ["2013-12-31 22:33:42+01","2013-12-31 23:21:17+01")
 432 | 2013-12-13 00:00:00+01 |  3.3659000000000000 | ["2013-12-31 20:05:04+01","2013-12-31 22:33:42+01")
 432 | 2013-12-13 00:00:00+01 |  1.2348000000000000 | ["2013-12-31 19:51:03+01","2013-12-31 20:05:04+01")
 432 | 2013-12-13 00:00:00+01 | 22.6408000000000000 | ["2013-12-31 15:23:51+01","2013-12-31 19:51:03+01")
 432 | 2013-12-13 00:00:00+01 | 19.7054000000000000 | ["2013-12-31 15:23:30+01","2013-12-31 15:23:51+01")
 432 | 2013-12-13 00:00:00+01 |  7.3199000000000000 | ["2013-12-31 14:17:44+01","2013-12-31 15:23:30+01")
 432 | 2013-12-13 00:00:00+01 |  2.9704000000000000 | ["2013-12-31 12:50:36+01","2013-12-31 14:17:44+01")
 432 | 2013-12-13 00:00:00+01 | 17.1793000000000000 | ["2013-12-31 12:40:31+01","2013-12-31 12:50:36+01")
 432 | 2013-12-13 00:00:00+01 |  3.0220000000000000 | ["2013-12-31 12:29:24+01","2013-12-31 12:40:31+01")
(10 rows)

In [9]:
%%sql
select cid,dte, round(settle,2) as settle
from price_range 
where cid = :cid 
and dte between '2010-12-01' and '2010-12-13' 
-- now is within date_range
and current_timestamp <@ date_range
order by 2
9 rows affected.
Out[9]:
cid dte settle
432 2010-12-01 00:00:00+01:00 0.16
432 2010-12-02 00:00:00+01:00 10.68
432 2010-12-03 00:00:00+01:00 5.80
432 2010-12-06 00:00:00+01:00 17.57
432 2010-12-07 00:00:00+01:00 13.77
432 2010-12-08 00:00:00+01:00 21.97
432 2010-12-09 00:00:00+01:00 13.96
432 2010-12-10 00:00:00+01:00 3.86
432 2010-12-13 00:00:00+01:00 3.82

Storage needs

From recent blog post by Robert Hodges on immutable data (http://scale-out-blog.blogspot.com/2014/02/why-arent-all-data-immutable.html) -- storage needs are not terrible (with certain assumptions, of course)

Xacts/Sec Bytes/Xact Bytes/Sec GB Generated in 1 Hour GB Generated in 1 Day GB Generated in 1 Month GB Generated in 1 Year GB Generated in 7 Years
1,000 1,000 1,000,000 3.35 80.47 2,447.52 29,370.19 205,591.32

Prioritize Fresher Data

In [3]:
HTML('<iframe src="http://arstechnica.com/information-technology/2014/01/why-facebook-thinks-blu-ray-discs-are-perfect-for-the-data-center/" width=1024 height=768></iframe>')
Out[3]:

Prioritize fresher data

  • Most immutable-only data needs to privilege most recent data
  • Partition data using time-based partition
  • Needs management
    • e.g., using extension pg_partman
In [11]:
%%script psql -d tmc -U norman
-- create schema partman;
-- create extension pg_partman schema partman;
-- set search_path to public,partman;
-- create table price_parent(like price including constraints including defaults including indexes)

-- create partition based index please...
-- create index price_parent_dte on price_parent(dte);


-- select partman.create_parent(p_parent_table:='public.price_parent', p_control:='dte', 
--       p_type:='time-dynamic',p_interval:='yearly',p_start_partition:='2009-01-01 00:00:00');
\sf+ price_parent_part_trig_func
        CREATE OR REPLACE FUNCTION public.price_parent_part_trig_func()
         RETURNS trigger
         LANGUAGE plpgsql
1       AS $function$ 
2               DECLARE
3                   v_count                 int;
4                   v_partition_name        text;
5                   v_partition_timestamp   timestamptz;
6               BEGIN 
7               IF TG_OP = 'INSERT' THEN 
8                   v_partition_timestamp := date_trunc('year', NEW.dte);
9                   v_partition_name := partman.check_name_length('price_parent', 'public', to_char(v_partition_timestamp, 'YYYY'), TRUE);
10                  SELECT count(*) INTO v_count FROM pg_tables WHERE schemaname ||'.'|| tablename = v_partition_name;
11                  IF v_count > 0 THEN 
12                      EXECUTE 'INSERT INTO '||v_partition_name||' VALUES($1.*)' USING NEW;
13                  ELSE
14                      RETURN NEW;
15                  END IF;
16              END IF;
17              
18              RETURN NULL; 
19              END $function$
In [10]:
%%script psql -d tmc -U norman
\d price_parent_p2009
                               Table "public.price_parent_p2009"
    Column    |           Type           |                      Modifiers                      
--------------+--------------------------+-----------------------------------------------------
 pid          | bigint                   | not null default nextval('price_pid_seq'::regclass)
 cid          | integer                  | not null
 dte          | timestamp with time zone | not null
 settle       | numeric(33,16)           | not null
 date_entered | timestamp with time zone | not null default clock_timestamp()
Indexes:
    "price_parent_p2009_pkey" PRIMARY KEY, btree (pid)
    "price_parent_p2009_cid_dte_date_entered_key" UNIQUE CONSTRAINT, btree (cid, dte, date_entered)
    "price_parent_p2009_date_entered_dte_idx" btree (date_entered, dte)
    "price_parent_p2009_date_entered_idx" btree (date_entered)
    "price_parent_p2009_dte_idx" btree (dte)
Check constraints:
    "price_parent_p2009_partition_check" CHECK (dte >= '2009-01-01 00:00:00+01'::timestamp with time zone AND dte < '2010-01-01 00:00:00+01'::timestamp with time zone)
Inherits: price_parent

In [11]:
%%sql
explain analyze
select distinct on (cid, dte) cid, dte, date_entered,settle
FROM price_parent
where cid = :cid
and dte between '2014-02-01' and '2014-03-30'
and date_entered < current_timestamp
ORDER BY 1,2,3 desc
12 rows affected.
Out[11]:
QUERY PLAN
Unique (cost=70403.62..72585.61 rows=13680 width=26) (actual time=491.344..611.294 rows=40 loops=1)
-> Sort (cost=70403.62..71130.95 rows=290932 width=26) (actual time=491.342..575.253 rows=283067 loops=1)
Sort Key: price_parent.dte, price_parent.date_entered
Sort Method: external merge Disk: 11328kB
-> Append (cost=0.00..30078.08 rows=290932 width=26) (actual time=68.993..162.068 rows=283067 loops=1)
-> Seq Scan on price_parent (cost=0.00..0.00 rows=1 width=46) (actual time=0.001..0.001 rows=0 loops=1)
Filter: ((dte >= '2014-02-01 00:00:00+01'::timestamp with time zone) AND (dte <= '2014-03-30 00:00:00+01'::timestamp with time zone) AND (cid = 432) AND (date_entered < now()))
-> Bitmap Heap Scan on price_parent_p2014 (cost=11961.13..30078.08 rows=290931 width=26) (actual time=68.992..130.419 rows=283067 loops=1)
Recheck Cond: ((cid = 432) AND (dte >= '2014-02-01 00:00:00+01'::timestamp with time zone) AND (dte <= '2014-03-30 00:00:00+01'::timestamp with time zone) AND (date_entered < now()))
-> Bitmap Index Scan on price_parent_p2014_cid_dte_date_entered_key (cost=0.00..11888.39 rows=290931 width=0) (actual time=68.242..68.242 rows=283067 loops=1)
Index Cond: ((cid = 432) AND (dte >= '2014-02-01 00:00:00+01'::timestamp with time zone) AND (dte <= '2014-03-30 00:00:00+01'::timestamp with time zone) AND (date_entered < now()))
Total runtime: 657.469 ms

Additional improvements

  • Using tablespaces (SSD for newer data, Hard Drives for older data)
  • Heterogenous database engines (NoSQL for very fresh data)

Mixing Postgresql with NoSQL/Hadoop

  • Foreign Data Wrappers available for Redis, Memcached, MongoDB, Hadoop

    • Mismatch between NoSQL key/value storage and relational database schemas means data must be transformed before the two datasets can be merged
    • All of these FDWs (AFAIK) are currently read-only except Hadoop FDW, so applications will have to manage moving of data from one store to another
  • Any time there is more than one master database, conflicts can occur

    • Usually, conflicts here can be resolved by sorting all entries in time order, assuming that roles of data sources are well-understood (e.g., NoSQL data store is responsible for all of today's data; Postgresql for all data < today)
  • Many points of possible failure in any distributed system

Conflict resolution with multi-master writes

  • CAP Theorem (Consistency, Availability, Partition Tolerance) = today's Fast, Good and Cheap

Very difficult problem

In each case, the system did something… odd. Maybe we hadn't fully thought through the consequences of the system, even if they were documented. Maybe the marketing or documentation were misleading, or flat-out lies. We saw design flaws, like the Redis Sentinel protocol. Some involved bugs, like MongoDB's WriteConcern.MAJORITY treating network errors as successful acknowledgements. Other times we uncovered operational caveats, like Riak's high latencies before setting up fallback vnodes. In each case, the unexpected behavior led to surprising new information about the challenge of building correct distributed systems.

  • Clock timestamps not reliable across networks.
    • Use vector clock (https://en.wikipedia.org/wiki/Vector_clocks)

      Initially all clocks are zero. Each time a process experiences an internal event, it increments its own logical clock in the vector by one. Each time a process prepares to send a message, it increments its own logical clock in the vector by one and then sends its entire vector along with the message being sent. Each time a process receives a message, it increments its own logical clock in the vector by one and updates each element in its vector by taking the maximum of the value in its own vector clock and the value in the vector in the received message (for every element).

  • CRDT (Convergent and Commutative Replicated Data Types)

(http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf)

Postgres-only cluster

  • Possibly simpler solution for distributed immutable table

  • Possibility of having foreign tables be child tables in 9.4 (9.5?)

In [13]:
HTML('<iframe src="http://postgresql.1045698.n5.nabble.com/template/NamlServlet.jtp?macro=print_post&node=5778285" width=800 height=600></iframe>')
Out[13]:

Sample foreign inheritance support syntax -- still being refined:

Adapted from recent post by Kyotaro Horiguchi on hackers email list

http://postgresql.1045698.n5.nabble.com/inherit-support-for-foreign-tables-td5778285i60.html

  • on child database
    create table pu1 (a int not null, b int not null, c int, d text); 
    create unique index i_pu1_ab on pu1 (a, b); 
    create unique index i_pu1_c  on pu1 (c); 
    create table cu11 (like pu1 including all) inherits (pu1); 
    create table cu12 (like pu1 including all) inherits (pu1); 
    insert into cu11 (select a / 5, 4 - (a % 5), a, 'cu11' from generate_series(000000, 099999) a); 
    insert into cu12 (select a / 5, 4 - (a % 5), a, 'cu12' from generate_series(100000, 199999) a);
  • on parent database
    create extension postgres_fdw; 
    create server pg2 foreign data wrapper postgres_fdw options (host '/tmp', port '5433', dbname 'postgres'); 
    create user mapping for current_user server pg2 options (user 'horiguti');   
    create foreign table _cu11 (a int, b int, c int, d text) server pg2 
       options (table_name 'cu11', use_remote_estimate 'true'); 
    create foreign table _cu12 (a int, b int, c int, d text) server pg2 
       options (table_name 'cu12', use_remote_estimate 'true'); 
    create table rpu1 (a int, b int, c int, d text); 
    alter foreign table _cu11 inherit rpu1; 
    alter foreign table _cu12 inherit rpu1; 
    analyze rpu1;

Questions?

This talk is available at http://nbviewer.ipython.org/gist/nyamada/9946705