当前位置:  开发笔记 > 编程语言 > 正文

如何在Java中的javaPairRDD上使用gregationByKey?

如何解决《如何在Java中的javaPairRDD上使用gregationByKey?》经验,为你挑选了1个好方法。

我进行了很多搜索,但没有找到在Java代码中执行gregationByKey的示例。

我想查找按键减少的JavaPairRDD中的行数。

我读到aggregateByKey是最好的方法,但是我使用的是Java而不是scala,因此我无法在Java中使用它。

请帮忙!!!

例如:

input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]

我想与示例完全相同,我想在输出行中增加一列,减少行数。

谢谢!!!



1> Harish Patha..:

这是我如何通过java中的键进行聚合的示例。

JavaPairRDD result = inputDataFrame.javaRDD().mapToPair(new  PairFunction() {
    private static final long serialVersionUID = 1L;
    public Tuple2 call(Row tblRow) throws Exception {
        String strID= CommonConstant.BLANKSTRING;
        Object[] newRow = new Object[schemaSize];
        for(String s: matchKey)
        {
            if(tblRow.apply(finalSchema.get(s))!=null){
                strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
            }                           
        }   
        int rowSize=    tblRow.length();
        for (int itr = 0; itr < rowSize; itr++)
        {
            if(tblRow.apply(itr)!=null)
            {
                newRow[itr] = tblRow.apply(itr);
            }
        }
        newRow[idIndex]= Utils.generateKey(strID);
        return new Tuple2(strID,RowFactory.create(newRow));
    }
}).aggregateByKey(RowFactory.create(arr), new Function2(){

    private static final long serialVersionUID = 1L;

    public Row call(Row argRow1, Row argRow2) throws Exception {
        // TODO Auto-generated method stub

        Integer rowThreshold=   dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
        Object[] newRow = new Object[schemaSize];
        int rowSize=    argRow1.length();

        for (int itr = 0; itr < rowSize; itr++)
        {
            if(argRow1!=null && argRow2!=null)
            {
                if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                {
                    if(itr==rowSize-1){
                        newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                    }else{
                        newRow[itr] = argRow2.apply(itr);
                    }
                }
            }
        }

        return RowFactory.create(newRow);

    }

}, new Function2(){
    private static final long serialVersionUID = 1L;

    public Row call(Row v1, Row v2) throws Exception {
        // TODO Auto-generated method stub
        return v1;
    }
});

JavaRDD result1 = result.map(new Function, Row>() {
    private static final long serialVersionUID = -5480405270683046298L;
    public Row call(Tuple2 rddRow) throws Exception {
        return rddRow._2();
    }
});

推荐阅读
家具销售_903
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有