使用Kettle动态生成页码并实现分页数据同步

hresh 787 0

使用Kettle动态生成页码并实现分页数据同步

需求

将 DB2 数据库中的表数据导入另一个 DB2 数据库的表里面。

源表(DB2):table1

目标表(DB2):table2

数据量:千万级别

思路

当时直接使用 Kettle 将数据从源表导入到目标表中,但是考虑到数据量过于庞大,实际执行过程花费了很长时间,因此考虑采用分页导入的方式来进行数据传输,即:

根据实际情况设置一个每次处理的数据量,比如:5,000条,然后根据总的数据条数和每次处理的数据量计算出一共分几页。

假设总数据量有:10,000,000,所以页数为:10,000,000/5,000=2000页

注: 若存在小数,小数部分算一页,比如:20.3算21页

解决方案

根据上述思路,我们首先需要考虑如何计算得到总页数,以及页码。可以考虑增加一个辅助配置表来存放页码,这也是我在网上看到的处理方法,但是不符合工作需求,所以我考虑必须自动计算一个表的总页数,并生成页码。最后根据页码来循环导入数据。

主流程如下:onetable_A.kjb

使用Kettle动态生成页码并实现分页数据同步

流程说明:

  1. 首先我们建立一个作业流程,然后配置一个 START 节点;
  2. 第一个转换流程用于查询表数据的数目,并计算得到总页数,以及得到一个页码集合;
  3. 再建一个作业流程,来循环处理页码,主要是将第几页的数据从源表同步到目标表中。

根据表数据生成页码

首先我们来重点关注第一个转换流程,这也是本次实现过程中最重要的一点。

流程结构如下:

使用Kettle动态生成页码并实现分页数据同步

getTotal 节点用于查询表数据的数目,其实就是在系统表中做查询操作,具体实现如下:

使用Kettle动态生成页码并实现分页数据同步

点击预览按钮,结果如下:

使用Kettle动态生成页码并实现分页数据同步

Java 代码脚本部分主要是根据上一步得到的数据数目来计算页码,并生成一个页码集合。关于 Java 代码脚本的使用,这里就不做介绍了。构建过程如下:

使用Kettle动态生成页码并实现分页数据同步

使用Kettle动态生成页码并实现分页数据同步

Java 代码如下:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;

    /* TODO: Your code here. (Using info fields)

    FieldHelper infoField = get(Fields.Info, "info_field_name");

    RowSet infoStream = findInfoRowSet("info_stream_tag");

    Object[] infoRow = null;

    int infoRowCount = 0;

    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){

      // do something with info data
      infoRowCount++;
    }
    */
  }

  Object[] r = getRow();

  if (r == null) {
    setOutputDone();
    return false;
  }

  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.

  /* TODO: Your code here. (See Sample)

  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);

  foobar += "bar";

  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);

  */
//此处创建 r,是为了获取输入参数TOTAL_SRC的值
r = createOutputRow(r, data.outputRowMeta.size());

double num = get(Fields.In, "TOTAL_SRC").getNumber(r);
int pageNum = 5000;
int pages = (int)num/pageNum +1;    //计算总页数
//生成页码,并输出
for(int i=0;i<pages;i++){
    //个人觉得r类似于输出器,如果想将每个页码都输出去,则必须独立进行声明,此步骤为本人测试所得
    r = createOutputRow(r, data.outputRowMeta.size());
    get(Fields.Out, "PAGE").setValue(r, i);     //将页码赋值给PAGE
    //get(Fields.Out, "TJOBSEQ").setValue(r, get(Fields.In, "JOB_SEQ").getString(r));
    //get(Fields.Out, "id").setValue(r, i);
  putRow(data.outputRowMeta, r);
} 
  // Send the row on to the next step.

  return true;
}

虽然我们想要得到的页码为整数类型,但是在设置 PAGE 类型时需要设置为 String 类型,否则会报错,如下图所示:

使用Kettle动态生成页码并实现分页数据同步

并不影响后续的使用。

对于下述流程进行测试验证。

使用Kettle动态生成页码并实现分页数据同步

使用Kettle动态生成页码并实现分页数据同步

执行结果为:

使用Kettle动态生成页码并实现分页数据同步

从结果可以看出,每个页码都被正确输出。那么接下来我们需要将页码复制到结果中,传递到接下来的作业流程中。

根据页码循环同步数据

页码生成完毕后,接下来就是根据页码从源表查询数据,然后同步到目标表中。流程设计如下:

使用Kettle动态生成页码并实现分页数据同步

传进来的页码必须先从结果中获取到,然后再定义为变量,才能被后续所使用。

第一个转换流程的内部实现如下:

使用Kettle动态生成页码并实现分页数据同步

为了区分,我们将变量名叫做 EPAGE。

接下来就是数据同步,查看 getData_Epage。

使用Kettle动态生成页码并实现分页数据同步

首先需要根据页码从源表中获取到数据,注意这里为了使用页码条件,查询得到的结果不得不多了一列结果。

使用Kettle动态生成页码并实现分页数据同步

表输出时,注意不要勾选裁剪表,需要指定数据库字段,将上面查到的结果中多的一列给剔除掉。如果不想在此处做数据库字段指定操作,可以修改表输入中的查询语句。

select
REC_CREATOR,
REC_CREATE_TIME,
....
from (select ROW_NUMBER() over() as a, g001.* from
ZGROD112.D112_L2_FV_QD_CPCSEG002 g001) as temp 
where a>=(5*{EPAGE}+1) and a<=(5*({EPAGE}+1))

至此,关于大规模数据表之间的同步操作结束。

总结

本例重点讲述了如何根据表数据的数目动态生成页码,从而减少了页码配置表的构建步骤,最终减轻数据同步对于内存资源的占用。

拓展

实际工作中,我们不可能只对一张表做数据同步,往往会针对多张表做同步操作,所以对每张表还会有一个循环处理操作。关于这种情况的处理,需要一个额外的配置表,用来存放源系统和目标系统基本信息,以及源表和目标表信息,如果需要做增量同步操作,还可以加几个字段。后续有时间可以简单写一下。

参考文献

采用Kettle分页处理大数据量抽取任务

发表评论 取消回复
表情 图片 链接 代码

分享