需求
将 DB2 数据库中的表数据导入另一个 DB2 数据库的表里面。
源表(DB2):table1
目标表(DB2):table2
数据量:千万级别
思路
当时直接使用 Kettle 将数据从源表导入到目标表中,但是考虑到数据量过于庞大,实际执行过程花费了很长时间,因此考虑采用分页导入的方式来进行数据传输,即:
根据实际情况设置一个每次处理的数据量,比如:5,000条,然后根据总的数据条数和每次处理的数据量计算出一共分几页。
假设总数据量有:10,000,000,所以页数为:10,000,000/5,000=2000页
注: 若存在小数,小数部分算一页,比如:20.3算21页
解决方案
根据上述思路,我们首先需要考虑如何计算得到总页数,以及页码。可以考虑增加一个辅助配置表来存放页码,这也是我在网上看到的处理方法,但是不符合工作需求,所以我考虑必须自动计算一个表的总页数,并生成页码。最后根据页码来循环导入数据。
主流程如下:onetable_A.kjb
流程说明:
- 首先我们建立一个作业流程,然后配置一个 START 节点;
- 第一个转换流程用于查询表数据的数目,并计算得到总页数,以及得到一个页码集合;
- 再建一个作业流程,来循环处理页码,主要是将第几页的数据从源表同步到目标表中。
根据表数据生成页码
首先我们来重点关注第一个转换流程,这也是本次实现过程中最重要的一点。
流程结构如下:
getTotal 节点用于查询表数据的数目,其实就是在系统表中做查询操作,具体实现如下:
点击预览
按钮,结果如下:
Java 代码脚本部分主要是根据上一步得到的数据数目来计算页码,并生成一个页码集合。关于 Java 代码脚本的使用,这里就不做介绍了。构建过程如下:
Java 代码如下:
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
/* TODO: Your code here. (Using info fields)
FieldHelper infoField = get(Fields.Info, "info_field_name");
RowSet infoStream = findInfoRowSet("info_stream_tag");
Object[] infoRow = null;
int infoRowCount = 0;
// Read all rows from info step before calling getRow() method, which returns first row from any
// input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
while((infoRow = getRowFrom(infoStream)) != null){
// do something with info data
infoRowCount++;
}
*/
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
/* TODO: Your code here. (See Sample)
// Get the value from an input field
String foobar = get(Fields.In, "a_fieldname").getString(r);
foobar += "bar";
// Set a value in a new output field
get(Fields.Out, "output_fieldname").setValue(r, foobar);
*/
//此处创建 r,是为了获取输入参数TOTAL_SRC的值
r = createOutputRow(r, data.outputRowMeta.size());
double num = get(Fields.In, "TOTAL_SRC").getNumber(r);
int pageNum = 5000;
int pages = (int)num/pageNum +1; //计算总页数
//生成页码,并输出
for(int i=0;i<pages;i++){
//个人觉得r类似于输出器,如果想将每个页码都输出去,则必须独立进行声明,此步骤为本人测试所得
r = createOutputRow(r, data.outputRowMeta.size());
get(Fields.Out, "PAGE").setValue(r, i); //将页码赋值给PAGE
//get(Fields.Out, "TJOBSEQ").setValue(r, get(Fields.In, "JOB_SEQ").getString(r));
//get(Fields.Out, "id").setValue(r, i);
putRow(data.outputRowMeta, r);
}
// Send the row on to the next step.
return true;
}
虽然我们想要得到的页码为整数类型,但是在设置 PAGE 类型时需要设置为 String 类型,否则会报错,如下图所示:
并不影响后续的使用。
对于下述流程进行测试验证。
执行结果为:
从结果可以看出,每个页码都被正确输出。那么接下来我们需要将页码复制到结果中,传递到接下来的作业流程中。
根据页码循环同步数据
页码生成完毕后,接下来就是根据页码从源表查询数据,然后同步到目标表中。流程设计如下:
传进来的页码必须先从结果中获取到,然后再定义为变量,才能被后续所使用。
第一个转换流程的内部实现如下:
为了区分,我们将变量名叫做 EPAGE。
接下来就是数据同步,查看 getData_Epage。
首先需要根据页码从源表中获取到数据,注意这里为了使用页码条件,查询得到的结果不得不多了一列结果。
表输出时,注意不要勾选裁剪表,需要指定数据库字段,将上面查到的结果中多的一列给剔除掉。如果不想在此处做数据库字段指定操作,可以修改表输入中的查询语句。
select
REC_CREATOR,
REC_CREATE_TIME,
....
from (select ROW_NUMBER() over() as a, g001.* from
ZGROD112.D112_L2_FV_QD_CPCSEG002 g001) as temp
where a>=(5*{EPAGE}+1) and a<=(5*({EPAGE}+1))
至此,关于大规模数据表之间的同步操作结束。
总结
本例重点讲述了如何根据表数据的数目动态生成页码,从而减少了页码配置表的构建步骤,最终减轻数据同步对于内存资源的占用。
拓展
实际工作中,我们不可能只对一张表做数据同步,往往会针对多张表做同步操作,所以对每张表还会有一个循环处理操作。关于这种情况的处理,需要一个额外的配置表,用来存放源系统和目标系统基本信息,以及源表和目标表信息,如果需要做增量同步操作,还可以加几个字段。后续有时间可以简单写一下。
参考文献
本文作者为hresh,转载请注明。