使用oracle_fdw进行增量数据迁移的神奇方法

Posted on 2014-12-10 21:54:31 by osdba

本文详细讲解了一种使用oracle_fdw把Oracle中的表的数据增量同步到PostgreSQL中的方法。

1. 增量数据迁移的理论基础

  • 先进行一次全量迁移,在全量迁移开始前就开始记录增量变化
  • 增量变化中只需要记录主键,同步可以根据日志表记录的主键到原表中查询数据
    • 把主键的变化记录到一个增量日志表中
    • 对于更新主键的情况,相当于把旧主键删除,插入新主键,所以日志表中记录两条记录
    • 增量同步时,当增量日志表中存在的主键而原表不存在的主键,则在目标表中这些记录都该删除掉
    • 增量同步时,在日志表中记录变化的主键与原先表进行关联查询,查询出来的数据都是应该merge到目标表中的。
  • 把增量变化的数据同步到到目标表中,可以进行多次的增量同步
  • 停止源数据库的写入,进行最后一次增量同
  • 把应用切换到新的数据库上

2. Oracle中记录增量日志的方法

  • 使用触发器记录
  • 使用物化视图 LOG

    • 创建物化视图 LOG 的 SQL 如下:
    −−在 scott 用 户 的 表 emp 上 建 物 化 视 图 log
    CREATE MATERIALIZED VIEW LOG ON SCOTT.EMP WITH PRIMARY KEY;
    
    • 将会自动生成这样的日志表
    SQL> desc mlog$_emp;
     Name                      Null?    Type
     ----------------------------------------- -------- ----------------------------
     EMPNO                          NUMBER(4)
     SNAPTIME$$                     DATE
     DMLTYPE$$                      VARCHAR2(1)
     OLD_NEW$$                      VARCHAR2(1)
     CHANGE_VECTOR$$                    RAW(255)
     XID$$                          NUMBER
    

3. 具体增量同步的方法

  • 下面SQL提取出来的数据是需要merge到目标表中:

    SELECT empno, ename, job, mgr, hiredate, sal, comm,deptno
      FROM emp a
     WHERE EXISTS(SELECT 1 FROM mlog$_emp b WHERE a.empno = b.empno);
    
  • 下面SQL提取出来的主键值的数据行都该在目标表中删除掉:

    SELECT DISTINCT empno FROM mlog$_emp a
     WHERE NOT EXISTS (SELECT 1 FROM emp b WHERE a.empno = b.empno);
    
  • 为了保证数据一致性,在目标表中要删除掉的数据与要merge进去的数据都需要在一个原子操作中完成

  • 目标数据同步完后,增量日志表中相应的记录也应被删除掉

3.1 增量同步保持原子性的方法

  • 基本思路:把上面的两条SQL及取当前日志表中的记录rowid的SQL都合并成一条SQL,一次执行

    CREATE VIEW vw____emp as
    SELECT  1 as x____action, null as x____row_id,  
        empno, ename, job, mgr, hiredate, sal, comm, deptno
      FROM emp a
     WHERE EXISTS(SELECT 1 FROM mlog$_emp b WHERE a.empno = b.empno)
    UNION ALL 
    SELECT DISTINCT 2 AS x____action, null AS x____row_id, 
        empno, NULL, NULL, NULL, NULL, NULL, NULL, NULL 
      FROM mlog$_emp a
     WHERE NOT EXISTS (SELECT 1 FROM emp b WHERE a.empno = b.empno)
    UNION ALL
    SELECT 3 AS x____action, owidtochar(rowid) AS x____row_id, 
        NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL     
      FROM mlog$_emp;
    
  • x____action 表示上面三种SQL的类型

  • 第三个SQL记录了当前增量日志表的rowid,为增量同步完后删除增量日志时使用

3.2 创建oracle_fdw

  • emp表的定义如下:

    CREATE TABLE emp (
        empno smallint NOT NULL primary key,
        ename varchar(10),
        job varchar(9),
        mgr smallint,
        hiredate timestamp,
        sal double precision,
        comm double precision,
        deptno smallint
    ) ;
    
  • 建外部服务:

    CREATE EXTENSION oracle_fdw;
    CREATE SERVER oradb FOREIGN DATA WRAPPER oracle_fdw
              OPTIONS (dbserver 'oratest');
    GRANT USAGE ON FOREIGN SERVER oradb TO scott;
    
  • 建用户映射(在scott用户下):

    CREATE USER MAPPING FOR scott SERVER oradb 
        OPTIONS (user 'SCOTT', password 'tiger');
    
  • 建外部表

    CREATE FOREIGN TABLE fdw____emp (
        empno smallint NOT NULL,
        ename varchar(10),
        job varchar(9),
        mgr smallint,
        hiredate timestamp,
        sal double precision,
        comm double precision,
        deptno smallint
    ) SERVER oradb OPTIONS(schema 'SCOTT', table 'EMP');
    
    CREATE FOREIGN TABLE vw____emp (
        x____action smallint NOT NULL,
        x____row_id text,
        empno smallint NOT NULL,
        ename varchar(10),
        job varchar(9),
        mgr smallint,
        hiredate timestamp,
        sal double precision,
        comm double precision,
        deptno smallint
    ) SERVER oradb OPTIONS(schema 'SCOTT', table 'VW____EMP');
    

3.3 增量同步的操作步骤

  • 把增量数据提取到PG中的一张临时表中:

    CREATE TEMP TABLE IF NOT EXISTS tmp____emp(
        x____action smallint NOT NULL,
        x____row_id text,
        empno smallint,
        ename varchar(10),
        job varchar(9),
        mgr smallint,
        hiredate timestamp,
        sal double precision,
        comm double precision,
        deptno smallint
    );
    INSERT INTO tmp____emp SELECT * FROM vw____emp;
    
  • merge数据

    WITH upsert as
    (UPDATE emp AS m SET ename = t.ename, job=t.job, mgr=t.mgr, 
            hiredate = t.hiredate, sal=t.sal, comm=t.comm, deptno=t.deptno 
       FROM tmp____emp t
      WHERE t.x____action = 1 AND m.empno = t.empno
    RETURNING m.*)
    INSERT INTO emp  
    SELECT empno, ename, job, mgr, hiredate, sal, comm, deptno FROM tmp____emp a
     WHERE a.x____action = 1
       AND NOT EXISTS(SELECT 1 FROM upsert b WHERE a.empno=b.empno);
    
    DELETE FROM emp a WHERE EXISTS(
        SELECT 1 FROM tmp____emp b WHERE x____action=2 AND a.empno=b.empno);
    
    

3.4 清理已同步过的增量日志

  • 按临时表tmp____emp中记录的rowid来删除Oracle中增量日志表中已同步过的记录,但因oracle_fdw建的外部表无法使用rowid字段,即无法执行下面的SQL:

    DELETE FROM fdw_mlog$_emp WHERE rowid in (SELECT x____row_id FROM tmp____emp);
    
  • 解决方法是,在Oracle建一张临时表,然后在临时表上建触发器,向临时表插入数据时,触发器删除日志表中的数据,方法如下:

    CREATE GLOBAL TEMPORARY TABLE clean_emp_mvlog(row_id varchar2(18)) 
        ON COMMIT DELETE ROWS;
    CREATE OR REPLACE TRIGGER trg_af_row_clean_emp_mvlog     
    AFTER INSERT ON clean_emp_mvlog 
    FOR EACH ROW 
    declare    
    begin
        DELETE FROM mlog$_emp WHERE rowid = chartorowid(:new.row_id);       
    end;
    /
    
  • 在PG中建Oracle中表cmd_clean_mvlog的外部表:

    CREATE FOREIGN TABLE clean_emp_mvlog(row_id varchar(18))
     SERVER oradb OPTIONS ( schema 'SCOTT', table 'CLEAN_EMP_MVLOG');
    
  • 删除Oracle中已同步过的记录方法是在PG中执行:

    INSERT INTO cmd_clean_mvlog SELECT row_id FROM tmp____emp WHERE x____action=3;
    

4. 增量同步方法的改进

  • 上面的整个操作过程比较复杂,如果有很多表要同步,手工操作起来比较麻烦
  • 思路:可以把上面的手工过程封装在函数中,把对在Oracle数据库中的ddl操作都能在PG中执行。
    • 方法是在Oracle中建一个命令表,然后在命令表上建触发器,然后把这张命令表映射成PG中的一张外部表,当把一条SQL插入这个外部表中,就会让Oracle中的触发器执行这条SQL
  • 建命令表的SQL如下:

    CREATE GLOBAL TEMPORARY TABLE replica_cmd( cmd varchar2 (4000)) ON COMMIT DELETE ROWS;
    
    CREATE OR REPLACE TRIGGER trg_af_insrow_replica_cmd
    AFTER INSERT ON replica_cmd
    FOR EACH ROW
    DECLARE
    PRAGMA AUTONOMOUS_TRANSACTION;
    BEGIN
    EXECUTE IMMEDIATE :new.cmd;
    COMMIT;
    END; 
    
  • 注意1:上面建的命令表是临时表,因为我们只需要执行触发器,而不需要保存执行的SQL

  • 注意2: 如果想在触发器中执行DDL,需要在自治事务中:PRAGMA AUTONOMOUS_TRANSACTION

  • 注意3:因为是在存储过程中执行DDL,需要显式的对用户赋DDL权限,如:

    GRANT create table TO scott;
    
  • 把Oracle中的命令表映射为PG中的外部表

    CREATE FOREIGN TABLE fdw_replica_cmd (
    cmd varchar(4000)
    ) SERVER oradb OPTIONS ( schema 'SCOTT', table 'REPLICA_CMD');
  • 以后就可以这样在向Oracle数据库发送DDL命令了:
    INSERT INTO fdw_replica_cmd values('CREATE TABLE test01(id number)');
  • 函数的实际封装如下:

    • add_table_to_replica函数:把要同步的表加入同步中,例子如下:

      SELECT add_table_to_replica('scott','emp');
      
    • remove_table_from_replica函数:把已同步的表从同步中去掉,例子如下:

      SELECT remove_table_from_replica('scott','emp');
      
    • refresh_increment函数:增量同步数据,例子如下:

      SELECT refresh_increment('scott','emp');
      
    • clean_current_increment_log函数:清理掉本次已同步过的增量日志(需确定增量日志已同步过了,否则会丢失增量),例子如下:

      SELECT clean_current_increment_log('scott','emp');
      
  • 函数add_table_to_replica函数的实现如下

CREATE OR REPLACE FUNCTION add_table_to_replica(arg_schema_name text, arg_table_name text)
  RETURNS text AS
$BODY$
DECLARE
    full_table_name text := arg_schema_name||'.'||arg_table_name;

    r1 RECORD;
    r2 RECORD;
    cols_name_list text[];
    cols_type_list text[];
    pk_name_list text[];

    cols_name_str text;
    cols_name_type_str text;
    create_tmp_table_sql  text;    
    a_cols_name_str text;
    ab_join_cond_str text;
    mt_join_cond_str text;
    pk_null_str text;
    cols_null_str text;

    item text;
    va text;
    i int;

    out_info text;
    local_sql text;
    ora_sql text;

BEGIN
    FOR r1 IN
        SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod) as coltype,a.attnum
          FROM pg_catalog.pg_attribute a
         WHERE a.attrelid = full_table_name::regclass
           AND a.attnum > 0
           AND NOT a.attisdropped
         ORDER BY a.attnum
    LOOP
            cols_name_list := cols_name_list || r1.attname::text;
            cols_type_list := cols_type_list || r1.coltype::text;
    END LOOP;

    FOR r2 IN
    SELECT        
      pg_attribute.attname, 
      format_type(pg_attribute.atttypid, pg_attribute.atttypmod) 
      FROM pg_index, pg_class, pg_attribute 
     WHERE 
      pg_class.oid =  full_table_name::regclass AND
      indrelid = pg_class.oid AND
      pg_attribute.attrelid = pg_class.oid AND 
      pg_attribute.attnum = any(pg_index.indkey)
      AND indisprimary
    LOOP
            pk_name_list := pk_name_list || r2.attname::text;
    END LOOP;

    i :=1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF i = 1 THEN
            cols_name_type_str :=  cols_name_list[i] ||' '||cols_type_list[i]; 
        ELSE
            cols_name_type_str :=  cols_name_type_str || ', ' || cols_name_list[i] ||' '||cols_type_list[i];
        END IF;
        i := i+1;
    END LOOP;

    i :=1;
    FOREACH item IN ARRAY pk_name_list
    LOOP
        IF i = 1 THEN
            ab_join_cond_str :=  'a.'||item ||' = b.'||item;
            mt_join_cond_str :=  'm.'||item ||' = t.'||item;
        ELSE
            ab_join_cond_str :=  ab_join_cond_str || ' AND a.' || item ||' = b.'||item;
            mt_join_cond_str :=  mt_join_cond_str || ' AND m.' || item ||' = t.'||item;
        END IF;
        i := i+1;
    END LOOP;

        i := 1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF i =1 THEN
            a_cols_name_str :=  'a.'||item; 
        ELSE
            a_cols_name_str :=  a_cols_name_str||', a.'||item; 
        END IF;
        i := i + 1;
    END LOOP;

    -- 生成pk_null_str,和cols_null_str,
    -- pk_null_str中除了主键的列名外,其它填null,
    -- cols_null_str即对每一列填null
    pk_null_str = '';
    i := 1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF item = ANY(pk_name_list) THEN
            va := item;
        ELSE
            va := 'null';
        END IF;

        IF i =1 THEN
            pk_null_str :=  va;
            cols_null_str :='null';
        ELSE
            pk_null_str :=  pk_null_str ||', '||va;
            cols_null_str := cols_null_str || ', null'; 
        END IF;
        i := i + 1;
    END LOOP;


    -- 在Oracle中生成表的物化视图log
    ora_sql := format('CREATE MATERIALIZED VIEW LOG ON %s.%s WITH PRIMARY KEY', arg_schema_name, arg_table_name);    
    INSERT INTO fdw_replica_cmd values( ora_sql);

    out_info := format(E'Run in oracle: %s\n', ora_sql);

    -- 在Oracle中生成用于同步的一个的视图
    ora_sql := format( E'CREATE VIEW vw____%s AS \n'||
                   E'SELECT 1 as x____action, null as x____row_id, %s FROM %s a \n'||
                   E' WHERE EXISTS (SELECT 1 FROM mlog$_%s b WHERE %s)\n' ||
                   E'UNION ALL \n'||
                   E'SELECT DISTINCT 2 AS x____action, null AS x____row_id, %s FROM mlog$_%s a \n'||
                   E' WHERE NOT EXISTS (SELECT 1 FROM %s b WHERE %s) \n'||
                   E'UNION ALL \n'||
                   E'SELECT 3 AS x____action, rowidtochar(rowid) AS x____row_id, %s FROM mlog$_%s',
                   arg_table_name, a_cols_name_str, arg_table_name, arg_table_name, ab_join_cond_str,
                   pk_null_str, arg_table_name, arg_table_name, ab_join_cond_str,
                   cols_null_str, arg_table_name                      
                   );
    INSERT INTO fdw_replica_cmd values( ora_sql);

    out_info := out_info || format(E'Run in oracle: %s\n', ora_sql);

    -- 把Oracle中的增量数据的视图映射到本地的一个外部表上
    local_sql := format ( E'CREATE FOREIGN TABLE vw____%s ( \n'||
                          E'x____action smallint NOT NULL, \n' ||
                          E'x____row_id text, \n' ||
                          E'%s )' ||
                          E'SERVER oradb OPTIONS ( schema \'%s\', table \'VW____%s\')', 
                          arg_table_name, 
                          cols_name_type_str, 
                          upper(arg_schema_name),
                          upper(arg_table_name)
                        );
    EXECUTE local_sql;
    out_info := out_info || format(E'Run: %s\n', local_sql);

    -- 把Oracle中的远程的表映射到本地的一个外部表上
    local_sql := format ( E'CREATE FOREIGN TABLE fdw____%s ( \n'||
                          E'%s )' ||
                          E'SERVER oradb OPTIONS ( schema \'%s\', table \'%s\')',
                          arg_table_name,
                          cols_name_type_str, 
                          upper(arg_schema_name),
                          upper(arg_table_name)
                        );
    EXECUTE local_sql;
    out_info := out_info || format(E'Run: %s\n', local_sql);    
    return out_info;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE;
  • 函数remove_table_from_replic的实现如下
CREATE OR REPLACE FUNCTION remove_table_from_replica(arg_schema_name text, arg_table_name text)
  RETURNS text AS
$BODY$
DECLARE    
    out_info text;
    local_sql text;
    ora_sql text;
    v_detail  text;
BEGIN
    -- 删除在Oracle中生成表的物化视图log
    ora_sql := format('DROP MATERIALIZED VIEW LOG ON %s.%s', arg_schema_name, arg_table_name);
    BEGIN   
        INSERT INTO fdw_replica_cmd values( ora_sql);
        out_info := format(E'Run in oracle: %s\n', ora_sql);
    EXCEPTION
        WHEN OTHERS THEN
            GET STACKED DIAGNOSTICS v_detail  = PG_EXCEPTION_DETAIL;
            out_info := format(E'Run in oracle failed: %s;\n    SQLSTATE=%s, %s\n    %s\n', ora_sql, SQLSTATE, SQLERRM, v_detail);            
    END;

    -- 删除在Oracle中生成用于同步的一个的视图
    ora_sql := format( E'DROP VIEW %s.vw____%s', arg_schema_name, arg_table_name);
    BEGIN 
        INSERT INTO fdw_replica_cmd values( ora_sql);
        out_info := out_info || format(E'Run in oracle: %s\n', ora_sql);
    EXCEPTION
        WHEN OTHERS THEN
            GET STACKED DIAGNOSTICS v_detail  = PG_EXCEPTION_DETAIL;
            out_info := out_info || format(E'Run in oracle failed: %s;\n    SQLSTATE=%s, %s\n    %s\n', ora_sql, SQLSTATE, SQLERRM, v_detail);            
    END;

    -- 删除把Oracle中的增量数据的视图映射到本地的外部表
    local_sql := format ('DROP FOREIGN TABLE %s.vw____%s', arg_schema_name, arg_table_name);
    BEGIN
        EXECUTE local_sql;
        out_info := out_info || format(E'Run: %s\n', local_sql);
    EXCEPTION
        WHEN OTHERS THEN
            out_info := out_info || format(E'Run in oracle failed: %s;\n    SQLSTATE=%s, %s\n', ora_sql, SQLSTATE, SQLERRM);            
    END;


    -- 删除把Oracle中的远程的表映射到本地的外部表上
    local_sql := format ( 'DROP FOREIGN TABLE %s.fdw____%s', arg_schema_name, arg_table_name);
    BEGIN    
        EXECUTE local_sql;
        out_info := out_info || format(E'Run: %s\n', local_sql); 
    EXCEPTION
        WHEN OTHERS THEN
            out_info := out_info || format(E'Run in oracle failed: %s;\n    SQLSTATE=%s, %s\n', ora_sql, SQLSTATE, SQLERRM);            
    END;    

    return out_info;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE;

  • 函数refresh_increment的实现如下
CREATE OR REPLACE FUNCTION refresh_increment(arg_schema_name text, arg_table_name text)
  RETURNS text AS
$BODY$
DECLARE
    full_table_name text := arg_schema_name||'.'||arg_table_name;
    tmp_table_name text := 'x____tmp_'||arg_schema_name||'_'||arg_table_name;

    r1 RECORD;
    r2 RECORD;
    cols_name_list text[];
    cols_type_list text[];
    pk_name_list text[];

    cols_name_str text;
    cols_name_type_str text;
    create_tmp_table_sql  text;    
    a_cols_name_str text;
    ab_join_cond_str text;
    mt_join_cond_str text;
    pk_null_str text;
    cols_null_str text;
    up_set_str text;

    item text;
    va text;
    i int;

    insert_sql text;
    merge_sql text;
    delete_sql text;
    tj_sql text;

    merge_nums int;
    delete_nums int;

BEGIN
    FOR r1 IN
        SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod) as coltype,a.attnum
          FROM pg_catalog.pg_attribute a
         WHERE a.attrelid = full_table_name::regclass
           AND a.attnum > 0
           AND NOT a.attisdropped
         ORDER BY a.attnum
    LOOP
            cols_name_list := cols_name_list || r1.attname::text;
            cols_type_list := cols_type_list || r1.coltype::text;
    END LOOP;

    FOR r2 IN
    SELECT        
      pg_attribute.attname, 
      format_type(pg_attribute.atttypid, pg_attribute.atttypmod) 
      FROM pg_index, pg_class, pg_attribute 
     WHERE 
      pg_class.oid =  full_table_name::regclass AND
      indrelid = pg_class.oid AND
      pg_attribute.attrelid = pg_class.oid AND 
      pg_attribute.attnum = any(pg_index.indkey)
      AND indisprimary
    LOOP
            pk_name_list := pk_name_list || r2.attname::text;
    END LOOP;

    i :=1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF i = 1 THEN
            cols_name_type_str :=  cols_name_list[i] ||' '||cols_type_list[i]; 
        ELSE
            cols_name_type_str :=  cols_name_type_str || ', ' || cols_name_list[i] ||' '||cols_type_list[i];
        END IF;
        i := i+1;
    END LOOP;

    i :=1;
    FOREACH item IN ARRAY pk_name_list
    LOOP
        IF i = 1 THEN
            ab_join_cond_str :=  'a.'||item ||' = b.'||item;
            mt_join_cond_str :=  'm.'||item ||' = t.'||item;
        ELSE
            ab_join_cond_str :=  ab_join_cond_str || ' AND a.' || item ||' = b.'||item;
            mt_join_cond_str :=  mt_join_cond_str || ' AND m.' || item ||' = t.'||item;
        END IF;
        i := i+1;
    END LOOP;

    cols_name_str := array_to_string(cols_name_list, ',');

    -- 组合a_cols_name_str内容为:a.col1, a.col2, a.col3 ...
    i := 1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF i =1 THEN
            a_cols_name_str :=  'a.'||item; 
        ELSE
            a_cols_name_str :=  a_cols_name_str||', a.'||item; 
        END IF;
        i := i + 1;
    END LOOP;

    pk_null_str = '';
    i := 1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF item = ANY(pk_name_list) THEN
            va := item;
        ELSE
            va := 'null';
        END IF;

        IF i =1 THEN
            pk_null_str :=  va;
            cols_null_str :='null';
        ELSE
            pk_null_str :=  pk_null_str ||', '||va;
            cols_null_str := cols_null_str || ', null'; 
        END IF;
        i := i + 1;
    END LOOP;

    -- 生成update语句中的set col1=v1,col2=v2的字符串
    i := 1;
    FOREACH item IN ARRAY cols_name_list
    LOOP
        IF item = ANY(pk_name_list) THEN
           CONTINUE;
        END IF;

        IF i =1 THEN
            up_set_str := item ||' = t.'||item;
        ELSE
            up_set_str := up_set_str || ',' || item || ' = t.' || item;
        END IF;
        i := i + 1;
    END LOOP;

    -- 增量数据需要访问多次,如果多次访问远程的增量表,每次访问的数据是不一样的,无法保证一致性,因此把远程的增量数据保证到本地的临时表中。
    -- 创建这张临时表,保存这次操作的增量数据
    create_tmp_table_sql := format(
        E'CREATE TEMP TABLE IF NOT EXISTS \n'||
        E'%s(x____action int, x____row_id text, %s)', 
        tmp_table_name, cols_name_type_str);

    -- 把远程Oracle的增量数据(即对应本地的一张外部表)的数据插到本地的临时表中
    insert_sql := format( 
        E'INSERT INTO %s\n'||
        E'SELECT x____action, x____row_id, %s FROM vw____%s\n',
        tmp_table_name, cols_name_str, arg_table_name);

    -- merge增量数据的SQL
    merge_sql := format( 
        E'WITH upsert as \n'||
        E'(UPDATE %s AS m SET %s\n'||
        E'   FROM %s t \n'||
        E'  WHERE t.x____action = 1 AND %s \n'||
        E'RETURNING m.*)\n'||
        E'INSERT INTO %s \n'|| 
        E'SELECT %s FROM %s a \n'||
        E' WHERE a.x____action = 1 \n'||
        E'   AND NOT EXISTS(SELECT 1 FROM upsert b WHERE %s)',
        full_table_name, up_set_str,
        tmp_table_name, mt_join_cond_str,
        full_table_name, cols_name_str,
        tmp_table_name, ab_join_cond_str);

    --增量数据中的删除的SQL
    delete_sql := format(
        E'DELETE FROM %s a WHERE EXISTS(SELECT 1 FROM %s b WHERE x____action=2 AND %s)',
        full_table_name, tmp_table_name, ab_join_cond_str);

    EXECUTE create_tmp_table_sql;
    EXECUTE insert_sql;
    EXECUTE merge_sql;
    EXECUTE delete_sql;

    tj_sql := 'SELECT sum(case x____action when 1 then 1 else 0 end), '||
              'sum(case x____action when 2 then 1 else 0 end) FROM '||tmp_table_name;

    EXECUTE tj_sql into merge_nums, delete_nums; 
    RETURN format('merge %s rows, delete %s rows.', merge_nums, delete_nums);
END;
$BODY$
  LANGUAGE plpgsql VOLATILE;

  • 函数clean_current_increment_log的实现如下:
CREATE OR REPLACE FUNCTION clean_current_increment_log(arg_schema_name text, arg_table_name text)
  RETURNS int AS
$BODY$
DECLARE
    -- 定义在一次循环时清理的rowid个数
    rowid_count_in_loop int := 200;

    full_table_name text := arg_schema_name||'.'||arg_table_name;
    tmp_table_name text := 'x____tmp_'||arg_schema_name||'_'||arg_table_name;

    ref refcursor;
    rowid text;
    all_rowid_str text;

    ora_sql text;
    clean_sql text;
    clean_tmp_table_sql text;
    i int;
    total int;
BEGIN

    i := 0;
    total :=0;

    -- 循环存储增量的昨时表,找出当时的rowid
    OPEN ref FOR EXECUTE 'SELECT x____row_id FROM '|| quote_ident(tmp_table_name)||' WHERE x____action=3';
    LOOP
        FETCH ref INTO rowid;
        EXIT WHEN not found;
        IF i = 0 THEN
            all_rowid_str := ''''||rowid||'''';
        ELSE
            all_rowid_str := all_rowid_str || ', '''||rowid||'''';
        END IF;
        i := i + 1;
        IF i > rowid_count_in_loop THEN
            ora_sql = format('DELETE FROM mlog$_%s WHERE rowid in (%s)', arg_table_name, all_rowid_str);
            INSERT INTO fdw_replica_cmd values( ora_sql);
            total := total + i;
            i := 0;
            all_rowid_str :='';
        END IF;
    END LOOP;
    CLOSE ref;

    IF i >0 THEN
        ora_sql = format('DELETE FROM mlog$_%s WHERE rowid in (%s)', arg_table_name, all_rowid_str);
        INSERT INTO fdw_replica_cmd values( ora_sql);
        total := total + i;
    END IF;
    IF total > 0 THEN
        clean_tmp_table_sql = 'TRUNCATE TABLE '||quote_ident(tmp_table_name);
        EXECUTE clean_tmp_table_sql;
    END IF;
    return total;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;