当前位置:  开发笔记 > 编程语言 > 正文

MongoDB MapReduce功能简单使用

最近在做搜索的查询日志的统计分析,对每一条查询统计日志,我将其解析出来后以特定字段格式存在mongodb中,定时调度做些统计分析。其中有个需求是,统计某个时间段(每天、每周、每月)各个query的查询次数,展示上就是热门查询query了。考虑到处理的数据量

最近在做搜索的查询日志的统计分析,对每一条查询统计日志,我将其解析出来后以特定字段格式存在mongodb中,定时调度做些统计分析。其中有个需求是,统计某个时间段(每天、每周、每月)各个query的查询次数,展示上就是热门查询query了。考虑到处理的数据量不会很大,解决方法也可以简单来之。我现在使用的方法就是mongodb的MapReduce功能,其实这个需求也可以认为是个group操作,而mongodb的group功能就是基于MapReduce的,但group对结果集的大小是有限制的。本文就针对一个示例介绍一下mongodb MapReduce功能。

语法介绍

MapReduce是mongodb中的一个Command,它的语法格式如下:

db.runCommand(
 false>]
   [, finalize : ]
   [, scope : ]
   [, verbose : true]
);

对于该Command,必有的3个参数我就不解释了。对于可选参数,这里简要说明如下:
(1) query是很常用的,它用来在map阶段过滤查询条件的以限定MapReduce操作的记录范围。
(2) 和query相关的还有sort和limit,我起初以为它俩是用在reduce阶段,实际上和query一起用在map阶段。
(3) mongodb默认是创建一个临时的collection存储MapReduce结果,当客户端连接关闭或者显示使用collection.drop(),这个临时的collection会被删除掉。这也就说,默认的keeptemp是false,如果keeptemp为true,那么结果collection就是永久的。当然,生成的collection名称并不友好,所以可以指定out表明永久存储的collection的名称(这时不需要再指定keeptemp)。当指定out时,并不是将执行结果直接存储到out,而是同样到临时collection,之后如果out存在则drop掉,最后rename临时collection为out。
(4) finalize:当MapReduce完成时应用到所有结果上,通常不怎么使用。
(5) verbose:提供执行时间的统计信息。
执行结果的格式如下:

{ result : ,
  counts : input :  ,
       emit  : ,
       output :  ,
  timeMillis : ,
  ok : <1_if_ok>,
  [, err : ]
}

更常用的MapReduce命令的helper是:

 db.collection.mapReduce(mapfunction,reducefunction[,options]);

map函数的定义如下,map函数内使用this来操作当前行表示的对象,并且需要使用emit(key,value)方法来向reduce提供参数:

 function map(void) -> void

reduce函数定义如下,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组:

 function reduce(key, value_array) -> value

MapReduce得到的collection的格式是“_id”:key,”value”:。

应用示例

这里给出一个假想的无意义的示例,主要是为了说明mongodb MapReduce的使用。每条记录的schema是“query”:,”cnt”:,”year”:,”month”=>。这个schema的cnt是不需要的,因为每条查询query的cnt都是1,但这里想要稍微复杂一些条件。下面是可在mongodb shell中执行的MapReduce脚本。

map = function() emit(this.query, this.cnt);;
reduce = function(key , vals) {
        var sum = 0;
        for(var i in vals) sum += vals[i];
        return sum;
};
res = db.log_info.mapReduce(map,reduce,{"query":"year":2010});

执行结果如下:

{
        "result" : "tmp.mr.mapreduce_1284794393_2",
        "timeMillis" : 72,
        "counts" : "input" : 1000,
                "emit" : 1000,
                "output" : 113,
        "ok" : 1,
}

对于”result”,它是生成的临时collection名称,这个名称的命名规则是:”tmp.mr.mapreduce_”+time(0)+”_”+(jobNumber++)
执行db[res.result].find()得到:

"_id" : "a", "value" : 521
"_id" : "aa", "value" : 128
"_id" : "aaa", "value" : 40
"_id" : "aaaa", "value" : 4
"_id" : "aaab", "value" : 9
"_id" : "aaac", "value" : 13
"_id" : "aab", "value" : 45
"_id" : "aaba", "value" : 5
"_id" : "aabb", "value" : 14
"_id" : "aabc", "value" : 20
"_id" : "aac", "value" : 39
"_id" : "aaca", "value" : 6
"_id" : "aacb", "value" : 2
"_id" : "aacc", "value" : 5
"_id" : "ab", "value" : 65
"_id" : "aba", "value" : 37
"_id" : "abaa", "value" : 12
"_id" : "abab", "value" : 13
"_id" : "abac", "value" : 10
"_id" : "abb", "value" : 42
Java客户端API使用

和JS脚本一样,mongodb Java客户端提供了两个MapReduce接口,分别是:

public MapReduceOutput mapReduce( String map , String reduce , String outputCollection , DBObject query );
public MapReduceOutput mapReduce( DBObject command );

MapReduceOutput实现如下:

public class MapReduceOutput {
?
    MapReduceOutput( DBCollection from , BasicDBObject raw )_collname = raw.getString( "result" );
        _coll = from._db.getCollection( _collname );
        _counts = (BasicDBObject)raw.get( "counts" );
?
    public DBCursor results()return _coll.find();
?
    public void drop()_coll.drop();
?
    public DBCollection getOutputCollection()return _coll;
?
    final String _collname;
    final DBCollection _coll;
    final BasicDBObject _counts;
}

所以,可以调用MapReduceOutput.results()得到DBCursor做后续处理,比如在我的应用场景里,根据value值做降序排序并取limit 1000以得到最热门的一些query。
由于JavaScript引擎设计上的限制,当前的mongodb MapReduce还只是单线程执行,mongodb也在计划解决这个问题。如果需要多线程处理,可以考虑shard或者在客户端代码控制处理。

推荐阅读
农大军乐团_697
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有