Large set of data (prices, user_prefs, financial transactions) where volume of inserts is very great
Let's say we want to track changes for rapidly changing data --
Let's say we care about the price of a widget at 9 am. We get back price information. If we calculate a signal on the price between 9 am and 9:15, we use that price as a fact. Then at 9:20, we receive a price correction. The 9 am widget price has changed. We now want to know that the corrected price. But if we try to replay signals as of 9:10 am, we want to have the old price.
If syncing server to a heterogenous datastore, and sync server crashes at 9:20 am, we want to be able to discover if the updated price has been written easily
Apache Kafka (http://kafka.apache.org/)
Distributed messenging server
Is it possible to delete a topic? In the current version, 0.8.0, no. (You could clear the entire Kafka and zookeeper states to delete all topics and data.) But upcoming releases are expected to include a delete topic tool.
Apache Zookeeper (http://zookeeper.apache.org/)
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.
Datomic (Rich Hickey) -- (http://www.datomic.com)
from IPython.display import HTML
HTML('<iframe src="http://www.datomic.com" width=1024 height=768></iframe>')
HTML('<iframe src="http://www.postgresql.org/docs/6.3/static/c0503.htm" width=1024 height=768></iframe>')
http://www.postgresql.org/docs/current/static/contrib-spi.html
HTML('<iframe src="http://www.postgresql.org/docs/current/static/contrib-spi.html" width=1024 height=768></iframe>')
HTML('<iframe src="http://postgresql.1045698.n5.nabble.com/template/NamlServlet.jtp?macro=print_post&node=1961716" width=800 height=600></iframe>')
Oracle has flashback queries:
SELECT * FROM FOO AS OF TIMESTAMP TO_TIMESTAMP('2014-03-29 13:34:12', 'YYYY-MM-DD HH24:MI:SS');
Depends on setting of UNDO_RETENTION to determine time period available for flashback -- typically <= 1 day
Block updates and deletes from table
Timestamp every record insert with clock_timestamp
Updates will be have later date_entered for same fact; deletes just not allowed
Use DISTINCT ON or WINDOW functions to get record as of timestamp
Have to use custom sql to access current state
%%script psql -d tmc -U norman
\d price
Table "public.price" Column | Type | Modifiers --------------+--------------------------+----------------------------------------------------- pid | bigint | not null default nextval('price_pid_seq'::regclass) cid | integer | not null dte | timestamp with time zone | not null settle | numeric(33,16) | not null date_entered | timestamp with time zone | not null default clock_timestamp() Indexes: "price_pkey" PRIMARY KEY, btree (pid) "price_cid_dte_date_entered_key" UNIQUE CONSTRAINT, btree (cid, dte, date_entered) "price_date_entered" btree (date_entered) "price_date_entered_dte" btree (date_entered, dte) Triggers: price_before_truncate BEFORE TRUNCATE ON price FOR EACH STATEMENT EXECUTE PROCEDURE price_block_update_or_delete_or_truncate() price_before_update_or_delete BEFORE DELETE OR UPDATE ON price FOR EACH ROW EXECUTE PROCEDURE price_block_update_or_delete_or_truncate()
%load_ext sql
%sql postgresql://norman@localhost/tmc
cid=432
%%script psql -d tmc -U norman
\sf+ price_block_update_or_delete_or_truncate
CREATE OR REPLACE FUNCTION public.price_block_update_or_delete_or_truncate() RETURNS trigger LANGUAGE plpgsql IMMUTABLE 1 AS $function$ 2 BEGIN 3 RAISE EXCEPTION 'Changes to data are not allowed!'; 4 END; 5 $function$
%%script psql -d tmc -U norman
update price
set settle = 2
where cid = 432
and dte = '2013-12-02';
delete from price
where cid = 432
and dte = '2013-12-02';
truncate table price;
ERROR: Changes to data are not allowed! ERROR: Changes to data are not allowed! ERROR: Changes to data are not allowed!
%%sql
SELECT min(date_entered),max(date_entered),count(*)
FROM price
WHERE cid = :cid
AND dte = '2013-12-13'
1 rows affected.
min | max | count |
---|---|---|
2013-12-16 00:47:55+01:00 | 2013-12-31 23:21:17+01:00 | 312 |
%%sql
select *
from price
where cid = :cid
and dte = '2013-12-13'
order by date_entered desc
limit 10
10 rows affected.
pid | cid | dte | settle | date_entered |
---|---|---|---|---|
6324367 | 432 | 2013-12-13 00:00:00+01:00 | 0.8495000000000000 | 2013-12-31 23:21:17+01:00 |
2734295 | 432 | 2013-12-13 00:00:00+01:00 | 8.7554000000000000 | 2013-12-31 22:33:42+01:00 |
2085648 | 432 | 2013-12-13 00:00:00+01:00 | 3.3659000000000000 | 2013-12-31 20:05:04+01:00 |
5170273 | 432 | 2013-12-13 00:00:00+01:00 | 1.2348000000000000 | 2013-12-31 19:51:03+01:00 |
5660964 | 432 | 2013-12-13 00:00:00+01:00 | 22.6408000000000000 | 2013-12-31 15:23:51+01:00 |
5855569 | 432 | 2013-12-13 00:00:00+01:00 | 19.7054000000000000 | 2013-12-31 15:23:30+01:00 |
2508825 | 432 | 2013-12-13 00:00:00+01:00 | 7.3199000000000000 | 2013-12-31 14:17:44+01:00 |
6314247 | 432 | 2013-12-13 00:00:00+01:00 | 2.9704000000000000 | 2013-12-31 12:50:36+01:00 |
2231736 | 432 | 2013-12-13 00:00:00+01:00 | 17.1793000000000000 | 2013-12-31 12:40:31+01:00 |
6931619 | 432 | 2013-12-13 00:00:00+01:00 | 3.0220000000000000 | 2013-12-31 12:29:24+01:00 |
%%sql
-- Get price for Dec 13 2013 as of 12:30 pm Dec 31st
select distinct on (cid,dte) cid,dte,date_entered, settle
from price
where cid = :cid
and dte = '2013-12-13'
and date_entered <= '2013-12-31 12:30:00'
order by 1,2,3 desc
1 rows affected.
cid | dte | date_entered | settle |
---|---|---|---|
432 | 2013-12-13 00:00:00+01:00 | 2013-12-31 12:29:24+01:00 | 3.0220000000000000 |
%%sql
-- Get price for Dec 13 2013 as of 12:45 pm Dec 31st
select distinct on (cid,dte) cid,dte,date_entered, settle
from price
where cid = :cid
and dte = '2013-12-13'
and date_entered <= '2013-12-31 12:45:00'
order by 1,2,3 desc
1 rows affected.
cid | dte | date_entered | settle |
---|---|---|---|
432 | 2013-12-13 00:00:00+01:00 | 2013-12-31 12:40:31+01:00 | 17.1793000000000000 |
%%sql
-- or we can use windowing example
select distinct cid,dte,
first_value(date_entered) over date_window as date_entered,
first_value(settle) over date_window as settle
FROM price
where cid = :cid
and dte = '2013-12-13'
and date_entered <= '2013-12-31 12:45:00'
window date_window as
(partition by cid, dte
order by cid, dte, date_entered desc
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
order by 1,2,3 desc;
1 rows affected.
cid | dte | date_entered | settle |
---|---|---|---|
432 | 2013-12-13 00:00:00+01:00 | 2013-12-31 12:40:31+01:00 | 17.1793000000000000 |
%%script psql -d tmc -U norman
-- create extension btree_gist ;
-- need this to mix btree with gist
\d price_range
Table "public.price_range" Column | Type | Modifiers ------------+--------------------------+----------- cid | integer | not null dte | timestamp with time zone | not null settle | numeric(33,16) | not null date_range | tstzrange | Indexes: "price_range_cid_dte_date_range_excl" EXCLUDE USING gist (cid WITH =, dte WITH =, date_range WITH &&)
%%script psql -d tmc -U norman
-- insert into price_range
-- (cid,dte,settle,date_range)
-- select cid,dte,settle,
-- tstzrange(date_entered,lead(date_entered)
-- over (partition by cid,dte order by cid,dte,date_entered)
-- ) as date_range
-- from price
-- order by cid,dte,date_entered;
select *
from price_range
where cid = 432
and dte = '2013-12-13'
order by date_range desc limit 10;
cid | dte | settle | date_range -----+------------------------+---------------------+----------------------------------------------------- 432 | 2013-12-13 00:00:00+01 | 0.8495000000000000 | ["2013-12-31 23:21:17+01",) 432 | 2013-12-13 00:00:00+01 | 8.7554000000000000 | ["2013-12-31 22:33:42+01","2013-12-31 23:21:17+01") 432 | 2013-12-13 00:00:00+01 | 3.3659000000000000 | ["2013-12-31 20:05:04+01","2013-12-31 22:33:42+01") 432 | 2013-12-13 00:00:00+01 | 1.2348000000000000 | ["2013-12-31 19:51:03+01","2013-12-31 20:05:04+01") 432 | 2013-12-13 00:00:00+01 | 22.6408000000000000 | ["2013-12-31 15:23:51+01","2013-12-31 19:51:03+01") 432 | 2013-12-13 00:00:00+01 | 19.7054000000000000 | ["2013-12-31 15:23:30+01","2013-12-31 15:23:51+01") 432 | 2013-12-13 00:00:00+01 | 7.3199000000000000 | ["2013-12-31 14:17:44+01","2013-12-31 15:23:30+01") 432 | 2013-12-13 00:00:00+01 | 2.9704000000000000 | ["2013-12-31 12:50:36+01","2013-12-31 14:17:44+01") 432 | 2013-12-13 00:00:00+01 | 17.1793000000000000 | ["2013-12-31 12:40:31+01","2013-12-31 12:50:36+01") 432 | 2013-12-13 00:00:00+01 | 3.0220000000000000 | ["2013-12-31 12:29:24+01","2013-12-31 12:40:31+01") (10 rows)
%%sql
select cid,dte, round(settle,2) as settle
from price_range
where cid = :cid
and dte between '2010-12-01' and '2010-12-13'
-- now is within date_range
and current_timestamp <@ date_range
order by 2
9 rows affected.
cid | dte | settle |
---|---|---|
432 | 2010-12-01 00:00:00+01:00 | 0.16 |
432 | 2010-12-02 00:00:00+01:00 | 10.68 |
432 | 2010-12-03 00:00:00+01:00 | 5.80 |
432 | 2010-12-06 00:00:00+01:00 | 17.57 |
432 | 2010-12-07 00:00:00+01:00 | 13.77 |
432 | 2010-12-08 00:00:00+01:00 | 21.97 |
432 | 2010-12-09 00:00:00+01:00 | 13.96 |
432 | 2010-12-10 00:00:00+01:00 | 3.86 |
432 | 2010-12-13 00:00:00+01:00 | 3.82 |
From recent blog post by Robert Hodges on immutable data (http://scale-out-blog.blogspot.com/2014/02/why-arent-all-data-immutable.html) -- storage needs are not terrible (with certain assumptions, of course)
Xacts/Sec | Bytes/Xact | Bytes/Sec | GB Generated in 1 Hour | GB Generated in 1 Day | GB Generated in 1 Month | GB Generated in 1 Year | GB Generated in 7 Years |
---|---|---|---|---|---|---|---|
1,000 | 1,000 | 1,000,000 | 3.35 | 80.47 | 2,447.52 | 29,370.19 | 205,591.32 |
HTML('<iframe src="http://arstechnica.com/information-technology/2014/01/why-facebook-thinks-blu-ray-discs-are-perfect-for-the-data-center/" width=1024 height=768></iframe>')
%%script psql -d tmc -U norman
-- create schema partman;
-- create extension pg_partman schema partman;
-- set search_path to public,partman;
-- create table price_parent(like price including constraints including defaults including indexes)
-- create partition based index please...
-- create index price_parent_dte on price_parent(dte);
-- select partman.create_parent(p_parent_table:='public.price_parent', p_control:='dte',
-- p_type:='time-dynamic',p_interval:='yearly',p_start_partition:='2009-01-01 00:00:00');
\sf+ price_parent_part_trig_func
CREATE OR REPLACE FUNCTION public.price_parent_part_trig_func() RETURNS trigger LANGUAGE plpgsql 1 AS $function$ 2 DECLARE 3 v_count int; 4 v_partition_name text; 5 v_partition_timestamp timestamptz; 6 BEGIN 7 IF TG_OP = 'INSERT' THEN 8 v_partition_timestamp := date_trunc('year', NEW.dte); 9 v_partition_name := partman.check_name_length('price_parent', 'public', to_char(v_partition_timestamp, 'YYYY'), TRUE); 10 SELECT count(*) INTO v_count FROM pg_tables WHERE schemaname ||'.'|| tablename = v_partition_name; 11 IF v_count > 0 THEN 12 EXECUTE 'INSERT INTO '||v_partition_name||' VALUES($1.*)' USING NEW; 13 ELSE 14 RETURN NEW; 15 END IF; 16 END IF; 17 18 RETURN NULL; 19 END $function$
%%script psql -d tmc -U norman
\d price_parent_p2009
Table "public.price_parent_p2009" Column | Type | Modifiers --------------+--------------------------+----------------------------------------------------- pid | bigint | not null default nextval('price_pid_seq'::regclass) cid | integer | not null dte | timestamp with time zone | not null settle | numeric(33,16) | not null date_entered | timestamp with time zone | not null default clock_timestamp() Indexes: "price_parent_p2009_pkey" PRIMARY KEY, btree (pid) "price_parent_p2009_cid_dte_date_entered_key" UNIQUE CONSTRAINT, btree (cid, dte, date_entered) "price_parent_p2009_date_entered_dte_idx" btree (date_entered, dte) "price_parent_p2009_date_entered_idx" btree (date_entered) "price_parent_p2009_dte_idx" btree (dte) Check constraints: "price_parent_p2009_partition_check" CHECK (dte >= '2009-01-01 00:00:00+01'::timestamp with time zone AND dte < '2010-01-01 00:00:00+01'::timestamp with time zone) Inherits: price_parent
%%sql
explain analyze
select distinct on (cid, dte) cid, dte, date_entered,settle
FROM price_parent
where cid = :cid
and dte between '2014-02-01' and '2014-03-30'
and date_entered < current_timestamp
ORDER BY 1,2,3 desc
12 rows affected.
QUERY PLAN |
---|
Unique (cost=70403.62..72585.61 rows=13680 width=26) (actual time=491.344..611.294 rows=40 loops=1) |
-> Sort (cost=70403.62..71130.95 rows=290932 width=26) (actual time=491.342..575.253 rows=283067 loops=1) |
Sort Key: price_parent.dte, price_parent.date_entered |
Sort Method: external merge Disk: 11328kB |
-> Append (cost=0.00..30078.08 rows=290932 width=26) (actual time=68.993..162.068 rows=283067 loops=1) |
-> Seq Scan on price_parent (cost=0.00..0.00 rows=1 width=46) (actual time=0.001..0.001 rows=0 loops=1) |
Filter: ((dte >= '2014-02-01 00:00:00+01'::timestamp with time zone) AND (dte <= '2014-03-30 00:00:00+01'::timestamp with time zone) AND (cid = 432) AND (date_entered < now())) |
-> Bitmap Heap Scan on price_parent_p2014 (cost=11961.13..30078.08 rows=290931 width=26) (actual time=68.992..130.419 rows=283067 loops=1) |
Recheck Cond: ((cid = 432) AND (dte >= '2014-02-01 00:00:00+01'::timestamp with time zone) AND (dte <= '2014-03-30 00:00:00+01'::timestamp with time zone) AND (date_entered < now())) |
-> Bitmap Index Scan on price_parent_p2014_cid_dte_date_entered_key (cost=0.00..11888.39 rows=290931 width=0) (actual time=68.242..68.242 rows=283067 loops=1) |
Index Cond: ((cid = 432) AND (dte >= '2014-02-01 00:00:00+01'::timestamp with time zone) AND (dte <= '2014-03-30 00:00:00+01'::timestamp with time zone) AND (date_entered < now())) |
Total runtime: 657.469 ms |
Foreign Data Wrappers available for Redis, Memcached, MongoDB, Hadoop
Any time there is more than one master database, conflicts can occur
Many points of possible failure in any distributed system
CAP Theorem (Consistency, Availability, Partition Tolerance) = today's Fast, Good and Cheap
Examples of complex quorum-based consensus protocols to resolve conflicts
Paxos (Leslie Lamport--also creator of LaTeX) (1989)
Raft (Diego Ongaro and John Ousterhout -- also creator of Tcl) (2013)
Zab (Zookeeper) -- similar to Raft, assumes a designated leader
https://github.com/aphyr/jepsen is a project that simulates network failures in a distributed cluster using LXC and iptables.
In every one of the distributed databases, it's possible to lose data unpredictably with network failure.
From http://aphyr.com/posts/286-call-me-maybe-final-thoughts:
In each case, the system did something… odd. Maybe we hadn't fully thought through the consequences of the system, even if they were documented. Maybe the marketing or documentation were misleading, or flat-out lies. We saw design flaws, like the Redis Sentinel protocol. Some involved bugs, like MongoDB's WriteConcern.MAJORITY treating network errors as successful acknowledgements. Other times we uncovered operational caveats, like Riak's high latencies before setting up fallback vnodes. In each case, the unexpected behavior led to surprising new information about the challenge of building correct distributed systems.
Initially all clocks are zero. Each time a process experiences an internal event, it increments its own logical clock in the vector by one. Each time a process prepares to send a message, it increments its own logical clock in the vector by one and then sends its entire vector along with the message being sent. Each time a process receives a message, it increments its own logical clock in the vector by one and updates each element in its vector by taking the maximum of the value in its own vector clock and the value in the vector in the received message (for every element).
Possibly simpler solution for distributed immutable table
Possibility of having foreign tables be child tables in 9.4 (9.5?)
HTML('<iframe src="http://postgresql.1045698.n5.nabble.com/template/NamlServlet.jtp?macro=print_post&node=5778285" width=800 height=600></iframe>')
Adapted from recent post by Kyotaro Horiguchi on hackers email list
http://postgresql.1045698.n5.nabble.com/inherit-support-for-foreign-tables-td5778285i60.html
create table pu1 (a int not null, b int not null, c int, d text);
create unique index i_pu1_ab on pu1 (a, b);
create unique index i_pu1_c on pu1 (c);
create table cu11 (like pu1 including all) inherits (pu1);
create table cu12 (like pu1 including all) inherits (pu1);
insert into cu11 (select a / 5, 4 - (a % 5), a, 'cu11' from generate_series(000000, 099999) a);
insert into cu12 (select a / 5, 4 - (a % 5), a, 'cu12' from generate_series(100000, 199999) a);
create extension postgres_fdw;
create server pg2 foreign data wrapper postgres_fdw options (host '/tmp', port '5433', dbname 'postgres');
create user mapping for current_user server pg2 options (user 'horiguti');
create foreign table _cu11 (a int, b int, c int, d text) server pg2
options (table_name 'cu11', use_remote_estimate 'true');
create foreign table _cu12 (a int, b int, c int, d text) server pg2
options (table_name 'cu12', use_remote_estimate 'true');
create table rpu1 (a int, b int, c int, d text);
alter foreign table _cu11 inherit rpu1;
alter foreign table _cu12 inherit rpu1;
analyze rpu1;
This talk is available at http://nbviewer.ipython.org/gist/nyamada/9946705