当前位置:  开发笔记 > 编程语言 > 正文

MongoDB的MapReduce用法及php示例代码

MongoDB虽然不像我们常用的mysql,sqlserver,oracle等关系型数据库有groupby函数那样方便分组,但是MongoDB要实现分组也有3个办法:*Mongodb三种分组方式:*1、group(先筛选再分组,不支持分片,对数据量 SyntaxHighlig

MongoDB虽然不像我们常用的mysql,sqlserver,oracle等关系型数据库有group by函数那样方便分组,但是MongoDB要实现分组也有3个办法:


 * Mongodb三种分组方式:

 * 1、group(先筛选再分组,不支持分片,对数据量有所限制,效率不高)

 * 2、mapreduce(基于js引擎,单线程执行,效率较低,适合用做后台统计等)

 * 3、aggregate(推荐) (如果你的PHP的mongodb驱动版本需>=1.3.0,推荐你使用aggregate,性能要高很多,并且使用上要简单些,不过1.3的目前还不支持账户认证模式,可以通过 


下面就来看下mapreduce方式:

 


Mongodb官网对MapReduce介绍:

Map/reduce in MongoDB is useful for batch processing of data and aggregation operations. It is similar in spirit to using something like Hadoop with all input coming from a collection and output going to a collection. Often, in a situation where you would have used GROUP BY in SQL, map/reduce is the right tool in MongoDB.


大致意思是:Mongodb中的Map/reduce主要是用来对数据进行批量处理和聚合操作,有点类似于使用Hadoop对集合数据进行处理,所有输入数据都是从集合中获取,而MapReduce后输出的数据也都会写入到集合中。通常类似于我们在SQL中使用Group By语句一样。
使用MapReduce要实现两个函数:Map和Reduce。Map函数调用emit(key,value)遍历集合中所有的记录,将key与value传给Reduce函数进行处理。Map函数和Reduce函数是使用Javascript编写的,并可以通过db.runCommand或mapreduce命令来执行MapReduce操作。

 

MapReduce命令如下:
[javascript]
db.runCommand( 
{ mapreduce :
   map :
   reduce :  
   [, query :
   [, sort :
   [, limit :
   [, out :
   [, keeptemp:
   [, finalize :
   [, scope :
   [, verbose : true] 
 } 
); 

db.runCommand(
{ mapreduce : ,
   map : ,
   reduce :
   [, query : ]
   [, sort : ]
   [, limit : ]
   [, out : ]
   [, keeptemp: ]
   [, finalize : ]
   [, scope : ]
   [, verbose : true]
 }
);


参数说明:

mapreduce:要操作的目标集合

map:映射函数(生成键值对序列,作为Reduce函数的参数)

reduce:统计函数

query:目标记录过滤

sort:对目标记录排序

limit:限制目标记录数量

out:统计结果存放集合(如果不指定则使用临时集合,在客户端断开后自动删除)

keeptemp:是否保留临时集合

finalize:最终处理函数(对reduce返回结果执行最终整理后存入结果集合)

scope:向map、reduce、finalize导入外部变量

verbose:显示详细的时间统计信息

 

map函数
map函数调用当前对象,并处里对象的属性,传值给reduce,map方法使用this来操作当前对象,最少调用一次emit(key,value)方法来向reduce提供参数,其中emit的key为最终数据的id。

 

reduce函数
接收一个值和数组,根据需要对数组进行合并分组等处理,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组。

 

Finalize函数
此函数为可选函数,可在执行完map和reduce后执行,对最后的数据进行统一处理。

 

看完基本介绍,我们再来看一个实例:


已知集合feed,测试数据如下:
[javascript]

   "_id": ObjectId("50ccb3f91e937e2927000004"), 
   "feed_type": 1, 
   "to_user": 234, 
   "time_line": "2012-12-16 01:26:00" 

 

   "_id": ObjectId("50ccb3ef1e937e0727000004"), 
   "feed_type": 8, 
   "to_user": 123, 
   "time_line": "2012-12-16 01:26:00" 

 

   "_id": ObjectId("50ccb3e31e937e0a27000003"), 
   "feed_type": 1, 
   "to_user": 123, 
   "time_line": "2012-12-16 01:26:00" 

 

   "_id": ObjectId("50ccb3d31e937e0927000001"), 
   "feed_type": 1, 
   "to_user": 123, 
   "time_line": "2012-12-16 01:26:00" 

{
   "_id": ObjectId("50ccb3f91e937e2927000004"),
   "feed_type": 1,
   "to_user": 234,
   "time_line": "2012-12-16 01:26:00"
}

{
   "_id": ObjectId("50ccb3ef1e937e0727000004"),
   "feed_type": 8,
   "to_user": 123,
   "time_line": "2012-12-16 01:26:00"
}

{
   "_id": ObjectId("50ccb3e31e937e0a27000003"),
   "feed_type": 1,
   "to_user": 123,
   "time_line": "2012-12-16 01:26:00"
}

{
   "_id": ObjectId("50ccb3d31e937e0927000001"),
   "feed_type": 1,
   "to_user": 123,
   "time_line": "2012-12-16 01:26:00"
}

 

我们按动态类型feed_type和用户to_user进行分组统计,实现结果:
feed_type to_user cout
1 234 1
8 123 1
1 123 2

 

 

 

 

 

 

 


实现代码:


[php]
/编写map函数  
$map = ' 
     function() { 
      var key = {to_user:this.to_user,feed_type:this.feed_type}; 
      var value = {count:1}; 
      emit(key,value); 
    } ';  
 
//reduce 函数  
$reduce = ' 
     function(key, values) { 
         var ret = {count:0}; 
     for(var i in values) { 
          ret.count += 1; 
      } 
      return ret; 
      }'; 
 
//查询条件  
$query = null;  //本实例中没有查询条件,设置为null 

//编写map函数
$map = '
     function() {
   var key = {to_user:this.to_user,feed_type:this.feed_type};
   var value = {count:1};
   emit(key,value);
    } ';

//reduce 函数
$reduce = '
     function(key, values) {
         var ret = {count:0};
  for(var i in values) {
       ret.count += 1;
   }
   return ret;
      }';

//查询条件
$query = null;  //本实例中没有查询条件,设置为null[php] view plaincopyprint?$mongo = new Mongo('mongodb://root:root@127.0.0.1: 28017/'); //链接mongodb,账号和密码为root,root 
$instance = $mongo->selectDB("testdb"); 
 
//执行此命令后,会创建feed_temp_res的临时集合,并将统计后的数据放在该集合中  
$cmd = $instance->command(array( 
        'mapreduce' => 'feed', 
        'map'       => $map, 
        'reduce'    => $reduce, 
        'query' => $query, 
        'out' => 'feed_temp_res' 
)); 
 
//查询临时集合中的统计数据,验证统计结果是否和预期结果一致  
$cursor = $instance->selectCollection('feed_temp_res')->find(); 
$result = array(); 
try { 
    while ($cursor->hasNext()) 
    { 
        $result[] = $cursor->getNext(); 
    } 

catch (MongoConnectionException $e) 

    echo $e->getMessage(); 

catch (MongoCursorTimeoutException $e) 

    echo $e->getMessage(); 

catch(Exception $e){ 
    echo $e->getMessage(); 

 
//test  
var_dump($result); 

$mongo = new Mongo('mongodb://root:root@127.0.0.1: 28017/'); //链接mongodb,账号和密码为root,root
$instance = $mongo->selectDB("testdb");

//执行此命令后,会创建feed_temp_res的临时集合,并将统计后的数据放在该集合中
$cmd = $instance->command(array(
  'mapreduce' => 'feed',
  'map'       => $map,
  'reduce'    => $reduce,
  'query' => $query,
  'out' => 'feed_temp_res'
));

//查询临时集合中的统计数据,验证统计结果是否和预期结果一致
$cursor = $instance->selectCollection('feed_temp_res')->find();
$result = array();
try {
 while ($cursor->hasNext())
 {
  $result[] = $cursor->getNext();
 }
}
catch (MongoConnectionException $e)
{
 echo $e->getMessage();
}
catch (MongoCursorTimeoutException $e)
{
 echo $e->getMessage();
}
catch(Exception $e){
 echo $e->getMessage();
}

//test
var_dump($result);
下面是输出的结果,和预期结果一致


[javascript]

   "_id": { 
     "to_user": 234, 
     "feed_type": 1  
  }, 
   "value": { 
     "count": 1  
  }  

 

   "_id": { 
     "to_user": 123, 
     "feed_type": 8  
  }, 
   "value": { 
     "count": 1  
  }  

 

   "_id": { 
     "to_user": 123, 
     "feed_type": 1  
  }, 
   "value": { 
     "count": 2  
  }  

{
   "_id": {
     "to_user": 234,
     "feed_type": 1
  },
   "value": {
     "count": 1
  }
}

{
   "_id": {
     "to_user": 123,
     "feed_type": 8
  },
   "value": {
     "count": 1
  }
}

{
   "_id": {
     "to_user": 123,
     "feed_type": 1
  },
   "value": {
     "count": 2
  }
}

 

以上只是简单的统计实现,你可以实现复杂的条件统计编写复杂的reduce函数,可以增加查询条件,排序等等。


附上mapReduce数据库处理函数(简单封装)
[php]
/**
 * mapReduce分组
 * 
 * @param string $table_name 表名(要操作的目标集合名)
 * @param string $map 映射函数(生成键值对序列,作为 reduce 函数参数) 
 * @param string $reduce 统计处理函数
 * @param array  $query 过滤条件 如:array('uid'=>123)
 * @param array  $sort 排序
 * @param number $limit 限制的目标记录数
 * @param string $out 统计结果存放集合 (不指定则使用tmp_mr_res_$table_name, 1.8以上版本需指定)
 * @param bool   $keeptemp 是否保留临时集合
 * @param string $finalize 最终处理函数 (对reduce返回结果进行最终整理后存入结果集合)
 * @param string $scope 向 map、reduce、finalize 导入外部js变量
 * @param bool   $jsMode 是否减少执行过程中BSON和JS的转换,默认true(注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,//true时BSON-->js-->map-->reduce-->BSON)
 * @param bool   $verbose 是否产生更加详细的服务器日志
 * @param bool   $returnresult 是否返回新的结果集
 * @param array  &$cmdresult 返回mp命令执行结果 array("errmsg"=>"","code"=>13606,"ok"=>0) ok=1表示执行命令成功
 * @return 
 */ 
function mapReduce($table_name,$map,$reduce,$query=null,$sort=null,$limit=0,$out='',$keeptemp=true,$finalize=null,$scope=null,$jsMode=true,$verbose=true,$returnresult=true,&$cmdresult){ 
    if(empty($table_name) || empty($map) || empty($reduce)){ 
        return null; 
    } 
    $map = new MongoCode($map); 
    $reduce = new MongoCode($reduce); 
    if(empty($out)){ 
        $out = 'tmp_mr_res_'.$table_name; 
    } 
    $cmd = array( 
            'mapreduce' => $table_name, 
            'map'       => $map, 
            'reduce'    => $reduce, 
            'out'       =>$out 
    ); 
    if(!empty($query) && is_array($query)){ 
        array_push($cmd, array('query'=>$query)); 
    } 
    if(!empty($sort) && is_array($sort)){ 
        array_push($cmd, array('sort'=>$query)); 
    } 
    if(!empty($limit) && is_int($limit) && $limit>0){ 
        array_push($cmd, array('limit'=>$limit)); 
    } 
    if(!empty($keeptemp) && is_bool($keeptemp)){ 
        array_push($cmd, array('keeptemp'=>$keeptemp)); 
    } 
    if(!empty($finalize)){ 
        $finalize = new Mongocode($finalize); 
        array_push($cmd, array('finalize'=>$finalize)); 
    } 
    if(!empty($scope)){ 
        array_push($cmd, array('scope'=>$scope)); 
    } 
    if(!empty($jsMode) && is_bool($jsMode)){ 
        array_push($cmd, array('jsMode'=>$jsMode)); 
    } 
    if(!empty($verbose) && is_bool($verbose)){ 
        array_push($cmd, array('verbose'=>$verbose)); 
    } 
    $dbname = $this->curr_db_name; 
    $cmdresult = $this->mongo->$dbname->command($cmd); 
    if($returnresult){ 
        if($cmdresult && $cmdresult['ok']==1){ 
            $result = $this->find($out, array()); 
        } 
    } 
    if($keeptemp==false){ 
        //删除集合  
        $this->mongo->$dbname->dropCollection($out); 
    } 
    return $result; 

    /**
     * mapReduce分组
     *
     * @param string $table_name 表名(要操作的目标集合名)
     * @param string $map 映射函数(生成键值对序列,作为 reduce 函数参数)
     * @param string $reduce 统计处理函数
     * @param array  $query 过滤条件 如:array('uid'=>123)
     * @param array  $sort 排序
     * @param number $limit 限制的目标记录数
     * @param string $out 统计结果存放集合 (不指定则使用tmp_mr_res_$table_name, 1.8以上版本需指定)
     * @param bool   $keeptemp 是否保留临时集合
     * @param string $finalize 最终处理函数 (对reduce返回结果进行最终整理后存入结果集合)
     * @param string $scope 向 map、reduce、finalize 导入外部js变量
     * @param bool   $jsMode 是否减少执行过程中BSON和JS的转换,默认true(注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,//true时BSON-->js-->map-->reduce-->BSON)
     * @param bool   $verbose 是否产生更加详细的服务器日志
     * @param bool   $returnresult 是否返回新的结果集
     * @param array  &$cmdresult 返回mp命令执行结果 array("errmsg"=>"","code"=>13606,"ok"=>0) ok=1表示执行命令成功
     * @return
     */
    function mapReduce($table_name,$map,$reduce,$query=null,$sort=null,$limit=0,$out='',$keeptemp=true,$finalize=null,$scope=null,$jsMode=true,$verbose=true,$returnresult=true,&$cmdresult){
     if(empty($table_name) || empty($map) || empty($reduce)){
      return null;
     }
     $map = new MongoCode($map);
     $reduce = new MongoCode($reduce);
     if(empty($out)){
      $out = 'tmp_mr_res_'.$table_name;
     }
     $cmd = array(
       'mapreduce' => $table_name,
       'map'       => $map,
       'reduce'    => $reduce,
       'out'  =>$out
     );
     if(!empty($query) && is_array($query)){
      array_push($cmd, array('query'=>$query));
     }
     if(!empty($sort) && is_array($sort)){
      array_push($cmd, array('sort'=>$query));
     }
     if(!empty($limit) && is_int($limit) && $limit>0){
      array_push($cmd, array('limit'=>$limit));
     }
     if(!empty($keeptemp) && is_bool($keeptemp)){
      array_push($cmd, array('keeptemp'=>$keeptemp));
     }
     if(!empty($finalize)){
      $finalize = new Mongocode($finalize);
      array_push($cmd, array('finalize'=>$finalize));
     }
     if(!empty($scope)){
      array_push($cmd, array('scope'=>$scope));
     }
     if(!empty($jsMode) && is_bool($jsMode)){
      array_push($cmd, array('jsMode'=>$jsMode));
     }
     if(!empty($verbose) && is_bool($verbose)){
      array_push($cmd, array('verbose'=>$verbose));
     }
     $dbname = $this->curr_db_name;
     $cmdresult = $this->mongo->$dbname->command($cmd);
     if($returnresult){
      if($cmdresult && $cmdresult['ok']==1){
       $result = $this->find($out, array());
      }
     }
     if($keeptemp==false){
      //删除集合
      $this->mongo->$dbname->dropCollection($out);
     }
     return $result;
    }

 

推荐阅读
有风吹过best
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有