当前位置:  开发笔记 > 后端 > 正文

便捷的统计订单收益(一)

java基础教程栏目带大家了解如何便捷的统计订单收益。
java基础教程栏目带大家了解如何便捷的统计订单收益。

  1. 在写入或修改收益表的同时通过canal监控mysql收益表的binlog日志.
  2. canal检测到变更,组装变更的JSON报文,发送RocketMQ中事先定义好的TOPIC.
  3. 程序消费该TOPIC,异构收益日报表.

canal配置部分

canal的安装请参考官方文档 解压后可得到一个canal文件夹,包含三个目录

  • bin:存放启动重启脚本
  • conf:存放核心配置文件
  • lib:存放核心jar包

我们需要重点关注conf文件夹里的conf/canal.properties核心配置文件以及conf/example/instance.properties单个监控节点配置文件

conf/canal.properties

# tcp, kafka, RocketMQ,这里默认是tcp读取模式,采用RocketMQ需要将其改变为RocketMQ模式
canal.serverMode = RocketMQ
# 如果是aliyun的RocketMQ需要配置以下两个KEY,ak/sk
canal.aliyun.accessKey =xxxxxxx
canal.aliyun.secretKey =xxxxxxx
# 监控的节点名称.这个默认就是example如果有多节点可以逗号隔开,如下方的例子
canal.destinations = example,sign
# 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel为cloud默认为local
canal.mq.accessChannel = cloud
#MQ的地址,需要注意这里是不带http://,但是需要带端口号
canal.mq.servers = 
#rocketmq实例id
canal.mq.namespace =

conf/example/instance.properties

#mysql地址
canal.instance.master.address=
#以下两个参数需要在开启数据库binlog日志后得到,在数据库查询界面输入查询语句`show master status`,canal.instance.master.journal.name对应File参数,canal.instance.master.position对应Position参数
canal.instance.master.journal.name=
canal.instance.master.position=
#数据库的账号密码
canal.instance.dbUsername=
canal.instance.dbPassword=
#需要监控变动的表
canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out
#定义发送的mq生产组
canal.mq.producerGroup = 
#定义发送到mq的指定主题
canal.mq.topic=

注:监控表的书写规则格式参照监控表书写规则

启动

cd /canal/bin
./start.sh

这时候会发现canal目录中多了一个log文件,进入可以看到canal主日志文件和example节点启动日志.

canal日志中出现
 the canal server is running now ......
example日志中出现
 init table filter : ^tablename
 xxxxxxxxx , the next step is binlog dump

表示你已经成功了一大步,canal监控已正常运行.

RocketMQ部分

如果用的aliyun的RocketMQ,配置代码部分直接可参考文档 自建的RocketMQ也可参照简单的消费例子监控对应的TOPIC即可 消费Canal发来的数据,格式如下:

{
    "data":[
        {
            //单个修改后表数据,如果同一时间有多个表变动会有多个该JSON对象        }
    ],
    "database":"监控的表所在数据库",
    "es":表变动时间,
    "id":canal生成的id,
    "isDdl":Boolean类型,表示是否DDL语句,
    "mysqlType":{
        表结构
    },
    "old":如果是修改类型会填充修改前的值,
    "pkNames":[
        该表的主键,如"id"
    ],
    "sql":"执行的SQL",
    "sqlType":{
        字段对应的sqlType,一般使用mysqlType即可
    },
    "table":"监控的表名",
    "ts":canal记录发送时间,
    "type":"表的修改类型,入INSERT,UPDATE,DELETE"
}

MQ消费代码主要用了反射,映射到对应的表

//这里的body就是Canal发来的数据
public Action process(String body) {
        boolean result = Boolean.FALSE;
        JSONObject data = JSONObject.parseObject(body);
        log.info("数据库操作日志记录:data:{}",data.toString());
        Class c = null;
        try {
            //这里监控了订单和收益表分别做订单统计和收益日报统计
            c = Class.forName(getClassName(data.getString("table")));
        } catch (ClassNotFoundException e) {
            log.error("error {}",e);
        }
        if (null != c) {
            JSONArray dataArray = data.getJSONArray("data");
            if (dataArray != null) {
                //把获取到的data部分转换为反射后的实体集合
                List list = dataArray.toJavaList(c);
                if (CollUtil.isNotEmpty(list)) {
                    //对修改和写入操作分别进行逻辑操作
                    String type = data.getString("type");
                    if ("UPDATE".equals(type)) {
                        result = uppHistory(list);
                    } else if ("INSERT".equals(type)) {
                        result = saveHistory(list);
                    }
                }
            }
        }
        return result ? Action.CommitMessage : Action.ReconsumeLater;
    }
    
    /**
     * @description: 获取反射ClassName
     * @author: chenyunxuan
     */
    private String getClassName(String tableName) {
        StringBuilder sb = new StringBuilder();
        //判断是哪张表的数据
        if (tableName.equals("t_user_income_detail")) {
            sb.append("cn.mc.core.model.order");
        } else if (tableName.equals("t_user_cash_out")) {
            sb.append("cn.mc.sync.model");
        }
        String className = StrUtil.toCamelCase(tableName).substring(1);
        return sb.append(".").append(className).toString();
    }
    
    /**
     * @description: 写入对应类型的统计表
     * @author: chenyunxuan
     */
    private  Boolean saveHistory(List orderList) {
        boolean result = Boolean.FALSE;
        Object dataType = orderList.get(0);
        //用instanceof判断类型进入不同的逻辑处理代码
        if (dataType instanceof TUserIncomeDetail) {
            result = userOrderHistoryService.saveIncomeDaily(orderList);
        } else if (dataType instanceof UserCashOut) {
            result = userCashOutHistoryService.delSaveHistoryList(orderList);
        }
        return result;
    }

saveIncomeDaily伪代码

  public synchronized Boolean saveIncomeDaily(List orderList) {
    //循环收益明细记录
    .......
    //通过创建时间和用户id查询收益日报表中是否有当日数据
    if(不存在当日数据){
        //创建当日的收益日报表记录
        .....
    }
    //因为不存在当日记录也会立即写入当日的空数据,所以下面的流程都是走更新流程
    //更新当日数据
    .......
    return Boolean.TRUE;
    }

注:代码中应该多打一些日志,方便产生异常收益数据后的校对

后记

至此一个基于canal+RocketMQ的收益日报统计异构方案就完成了,下一篇会围绕本文提到的第二个问题减少聚合SQL的产生展开.敬请关注.

相关免费学习推荐:java基础教程

以上就是便捷的统计订单收益(一)的详细内容,更多请关注其它相关文章!

推荐阅读
重庆制造漫画社
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有