我进行了很多搜索,但没有找到在Java代码中执行gregationByKey的示例。
我想查找按键减少的JavaPairRDD中的行数。
我读到aggregateByKey是最好的方法,但是我使用的是Java而不是scala,因此我无法在Java中使用它。
请帮忙!!!
例如:
input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])] output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]
我想与示例完全相同,我想在输出行中增加一列,减少行数。
谢谢!!!
这是我如何通过java中的键进行聚合的示例。
JavaPairRDDresult = inputDataFrame.javaRDD().mapToPair(new PairFunction () { private static final long serialVersionUID = 1L; public Tuple2
call(Row tblRow) throws Exception { String strID= CommonConstant.BLANKSTRING; Object[] newRow = new Object[schemaSize]; for(String s: matchKey) { if(tblRow.apply(finalSchema.get(s))!=null){ strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase(); } } int rowSize= tblRow.length(); for (int itr = 0; itr < rowSize; itr++) { if(tblRow.apply(itr)!=null) { newRow[itr] = tblRow.apply(itr); } } newRow[idIndex]= Utils.generateKey(strID); return new Tuple2 (strID,RowFactory.create(newRow)); } }).aggregateByKey(RowFactory.create(arr), new Function2 (){ private static final long serialVersionUID = 1L; public Row call(Row argRow1, Row argRow2) throws Exception { // TODO Auto-generated method stub Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold); Object[] newRow = new Object[schemaSize]; int rowSize= argRow1.length(); for (int itr = 0; itr < rowSize; itr++) { if(argRow1!=null && argRow2!=null) { if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null) { if(itr==rowSize-1){ newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString()); }else{ newRow[itr] = argRow2.apply(itr); } } } } return RowFactory.create(newRow); } }, new Function2
(){ private static final long serialVersionUID = 1L; public Row call(Row v1, Row v2) throws Exception { // TODO Auto-generated method stub return v1; } }); JavaRDD
result1 = result.map(new Function
, Row>() { private static final long serialVersionUID = -5480405270683046298L; public Row call(Tuple2 rddRow) throws Exception { return rddRow._2(); } });