PostgreSQL分库分表解决方案之citus

Posted on 2017-03-29 16:55:26 by osdba

1. citus介绍

citus是PostgreSQL数据库中的一种轻量级的分库分表解决方案。citus不是一个单独的程序,它是PostgreSQL数据库中的一个插件,可以使用create extension安装此插件。
每个citus集群有多个PostgreSQL数据库实例组成,数据库实例分为两类:

  • master节点,通常有一台。master节点只存储分库分表的元数据,不存储实际的数据。
  • worker节点,通常有多台。worker节点存储实际的分片数据(shard)。

用户只连接master节点,即用户把SQL发到master节点,然后master节点解析SQL,把SQL分解成各个子SQL发送到底层的worker节点,完成SQL的处理。
例如用户发一条如下的SQL到master节点:

SELECT reponse_type, avg(reponse_time) as responsetime_avg
  FROM events
 WHERE reponse_timestamp > '2013-11-01 23:20:00' 
   AND reponse_timestamp < '2013-11-01 23:30:00'
 GROUP BY response_type
 ORDER BY reponsetime_avg DESC;

master节点会把这条SQL分解成如下的子SQL发到底层的各个worker上:

SELECT reponse_type, sum(reponse_time), count(response_time)
  FROM events_XXX
 WHERE reponse_timestamp > '2013-11-01 23:20:00' 
   AND reponse_timestamp < '2013-11-01 23:30:00'
 GROUP BY response_type
 ORDER BY reponsetime_avg DESC;

上面的SQL中的“events_XXX”中的“XXX”代表分片号。当各个workder把数据返给master之后,master再做一次聚合运算,然后把结果返回用户。
一些更多的信息可以见citus的官方网站为:https://www.citusdata.com/

2. 安装citus

2.1 从二进制包安装

citus是一个插件,在ubuntu下只需要使用apt-get安装即可:

aptitude install postgresql-9.6 postgresql-9.6-citus

装完之后,可以检查一下安装上的包:

root@vds01:~# dpkg -l |grep postgresql-9.6
ii  postgresql-9.6                         9.6.2-1.pgdg14.04+1              amd64        object-relational SQL database, version 9.6 server
ii  postgresql-9.6-citus                   6.0.1.PGDG-1.pgdg14.04+1         amd64        sharding and distributed joins for PostgreSQL

2.2 编译安装

如果只是编译安装citus,而PostgreSQL数据库软件是通过操作系统包安装的,这时需要安装PostgreSQL开发包:

aptitude install postgresql-server-dev-9.6

然后从github下载源代码:

git clone https://github.com/citusdata/citus.git

编译源代码:

cd citus
./configure 
make

在make时会报错:

/usr/include/postgresql/9.6/server/libpq/libpq-be.h:36:27: fatal error: gssapi/gssapi.h: No such file or directory
 #include <gssapi/gssapi.h>
                           ^
compilation terminated.
make[1]: *** [commands/transmit.o] Error 1
make[1]: Leaving directory `/data/citus/src/backend/distributed'
make: *** [extension] Error 2

安装libkrb5-dev包解决上面的问题:

aptitude install libkrb5-dev

然后再编译就没有问题了。

3. 创建citus集群

3.1 集群规划

因为本次安装只是做一个演示,所以我们把所有的数据库实例都建在一台机器上。本次创建三个实例:

实例名称操作系统用户名数据目录端口
实例01pg01/data/pg015432
实例02pg02/data/pg029702
实例03pg03/data/pg039703

3.2 建操作系统用户

groupadd -g 501 pg01
useradd -g pg01 -m -s /bin/bash -u 501 pg01

groupadd -g 502 pg02
useradd -g pg02 -m -s /bin/bash -u 502 pg02

groupadd -g 503 pg03
useradd -g pg03 -m -s /bin/bash -u 503 pg03

在pg01用户的.profile文件中添加如下内容:

export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGDATA=/data/pg01
export PGPORT=5432

在pg02用户的.profile文件中添加如下内容:

export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGDATA=/data/pg02
export PGPORT=9702

在pg03用户的.profile文件中添加如下内容:

export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGDATA=/data/pg03
export PGPORT=9703

3.3 建数据库实例

分别在三个操作系统用户pg01、pg02、pg03下建三个数据库实例。
建的方法为:

initdb

然后分别修改各个实例下的postgresql.conf配置文件。
其中pg01需要修改的配置项如下:

listen_addresses = '*'
port = 5432
unix_socket_directories = '/tmp'
shared_preload_libraries = 'citus'
logging_collector = on

其中pg02需要修改的配置项如下:

listen_addresses = '*'
port = 9702
unix_socket_directories = '/tmp'
shared_preload_libraries = 'citus'
logging_collector = on

其中pg03需要修改的配置项如下:

listen_addresses = '*'
port = 9703
unix_socket_directories = '/tmp'
shared_preload_libraries = 'citus'
logging_collector = on

注意上面配置项中的“shared_preload_libraries = 'citus'”,这一行就是为了装载citus插件。
创建完成后,就可以启动这三个数据库实例了: 启动pg01实例,在root用户下:

su - pg01
pg_ctl start

启动pg02实例,在root用户下:

su - pg02
pg_ctl start

启动pg03实例,在root用户下:

su - pg03
pg_ctl start

3.4 配置citus

在pg01、pg02、pg03这三个实例中分别执行下面的命令创建citus extension:

create extension citus;

为了方便后续的操作,创建一个用户citusr,以后就会用citusr用户测试citus的功能:

create user citusr superuser;

上面是为了方便,所以把用户创建成超级用户。实际上也可以创建成普通用户。
然后在pg01即master节点上,添加两个worker节点(即pg02和pg03):

SELECT * from master_add_node('localhost', 9702);
SELECT * from master_add_node('localhost', 9702);

增加完之后可以用下面的命令看是否增加成功:

postgres=# SELECT * FROM master_get_active_worker_nodes();
 node_name | node_port
-----------+-----------
 localhost |      9703
 localhost |      9702
(2 rows)

做完以上操作之后,就可以创建表以及试用citus的功能了。

3.5 创建表以及测试citus的功能

在citus中有两类表:

  • Distributed Tables:即分片表。表的内容通过hash分在各个worker节点中
  • Reference Tables:即广播表,在每个分片中都复制一份。

下面我们创建两个分片表,注意这时使用我们之前建的用户citusr连接pg01:

pg01@vds01:~$ psql -Ucitusr postgres
psql (9.6.2)
Type "help" for help.

postgres=#

先按照与单机PostgreSQL数据库相同的方式建这两张表:

create table t01(id int, id2 int, t text);
create table t02(id int, id2 int, t text);

然后用下面的语句把这两张表定义为分片表:

select create_distributed_table('t01', 'id2');
select create_distributed_table('t02', 'id2', colocate_with=>'t01');

用下面的语句造一些临时数据,发现不支持:

postgres=# insert into t01 select id, id, lpad(id::text, 5, id::text) from generate_series(1,100) as t(id);
ERROR:  INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL:  Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT:  Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.

从上面知道citus不支持insert 语句后面的SELECT另一张表的这种语法。
不过我们可以通过下面的方法来造数据:

copy (select id, id, lpad(id::text, 5, id::text) from generate_series(1,10000) as t(id)) to '/tmp/t01.txt';
copy t01 from '/tmp/t01.txt';
copy t02 from '/tmp/t01.txt';

从上面知道citus是支持copy命令的。
我们使用explain查看执行计划:

postgres=# explain select a.t, b.t from t01 a, t02 b where a.id2=b.id2 and a.id2=5;
                                  QUERY PLAN
-------------------------------------------------------------------------------
 Distributed Query
   Executor: Router
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=localhost port=9702 dbname=postgres
         ->  Nested Loop  (cost=0.00..11.61 rows=1 width=12)
               ->  Seq Scan on t01_102050 a  (cost=0.00..5.80 rows=1 width=10)
                     Filter: (id2 = 5)
               ->  Seq Scan on t02_102082 b  (cost=0.00..5.80 rows=1 width=10)
                     Filter: (id2 = 5)
(11 rows)

发现上面的执行计划与原生的PostgreSQL数据库有所不同。
如果我们join时不使用分布键,会发生什么? 试一试:

postgres=# explain select a.t, b.t from t01 a, t02 b where a.id=b.id2 and a.id2=5;
ERROR:  cannot use real time executor with repartition jobs
HINT:  Set citus.task_executor_type to "task-tracker".

提示在“real time”执行器下不支持跨节点join,改成“task-tracker”可以支持,试一试:

postgres=# Set citus.task_executor_type to "task-tracker";                                                                                                                     SET
postgres=# select a.t, b.t from t01 a, t02 b where a.id=b.id2 and a.id2=5;
   t   |   t
-------+-------
 55555 | 55555
(1 row)

Time: 9843.828 ms

从上面可以看出,可以执行,就是有些慢。
我们建两张广播表:

create table t03(id int, id2 int, t text);
create table t04(id int, id2 int, t text);

select create_reference_table('t03');
select create_reference_table('t04');
postgres=# copy t03 from '/tmp/t01.txt';
COPY 10000
Time: 34.356 ms
postgres=# copy t04 from '/tmp/t01.txt';
COPY 10000
Time: 30.163 ms

两张广播表join一下:

postgres=# select a.t, b.t from t03 a, t04 b where a.id=b.id2 and a.id2=5;
   t   |   t
-------+-------
 55555 | 55555
(1 row)

Time: 3.432 ms
postgres=# explain select a.t, b.t from t03 a, t04 b where a.id=b.id2 and a.id2=5;
                                      QUERY PLAN
---------------------------------------------------------------------------------------
 Distributed Query
   Executor: Router
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=localhost port=9703 dbname=postgres
         ->  Hash Join  (cost=180.01..372.52 rows=1 width=12)
               Hash Cond: (b.id2 = a.id)
               ->  Seq Scan on t04_102237 b  (cost=0.00..155.00 rows=10000 width=10)
               ->  Hash  (cost=180.00..180.00 rows=1 width=10)
                     ->  Seq Scan on t03_102236 a  (cost=0.00..180.00 rows=1 width=10)
                           Filter: (id2 = 5)
(12 rows)

Time: 13.071 ms

两张广播表join没有问题。
试一试,分片表与广播表join一下:

postgres=# select a.t, b.t from t01 a, t03 b where a.id=b.id2 and a.id2=5;
   t   |   t
-------+-------
 55555 | 55555
(1 row)

Time: 4.964 ms
postgres=# explain select a.t, b.t from t01 a, t03 b where a.id=b.id2 and a.id2=5;
                                     QUERY PLAN
-------------------------------------------------------------------------------------
 Distributed Query
   Executor: Router
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=localhost port=9702 dbname=postgres
         ->  Hash Join  (cost=5.81..198.32 rows=1 width=12)
               Hash Cond: (b.id2 = a.id)
               ->  Seq Scan on t03_102236 b  (cost=0.00..155.00 rows=10000 width=10)
               ->  Hash  (cost=5.80..5.80 rows=1 width=10)
                     ->  Seq Scan on t01_102178 a  (cost=0.00..5.80 rows=1 width=10)
                           Filter: (id2 = 5)
(12 rows)

Time: 15.099 ms

分片表与广播表join没有问题。
试一下分片表的聚合函数:

postgres=# select id, avg(id2) from t01 group by id limit 5;
  id  |          avg
------+-----------------------
 2848 | 2848.0000000000000000
  251 |  251.0000000000000000
 3565 | 3565.0000000000000000
 2026 | 2026.0000000000000000
 6158 | 6158.0000000000000000
(5 rows)

Time: 1108.943 ms
postgres=# explain select id, avg(id2) from t01 group by id limit 5;
                                    QUERY PLAN
-----------------------------------------------------------------------------------
 Distributed Query into pg_merge_job_0056
   Executor: Task-Tracker
   Task Count: 32
   Tasks Shown: One of 32
   ->  Task
         Node: host=localhost port=9702 dbname=postgres
         ->  HashAggregate  (cost=7.55..10.72 rows=317 width=20)
               Group Key: id
               ->  Seq Scan on t01_102172 t01  (cost=0.00..5.17 rows=317 width=8)
 Master Query
   ->  Limit  (cost=0.00..0.00 rows=0 width=0)
         ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)
               Group Key: intermediate_column_56_0
               ->  Seq Scan on pg_merge_job_0056  (cost=0.00..0.00 rows=0 width=0)
(14 rows)

Time: 17.838 ms

也是没有问题。
好的,先试到这儿,祝大家愉快。