我正在实现一个haskell程序,它将文件的每一行与文件中的每一行进行比较.其中可以实现单线程如下
distance :: Int -> Int -> Int distance a b = (a-b)*(a-b) sumOfDistancesOnSmallFile :: FilePath -> IO Int sumOfDistancesOnSmallFile path = do fileContents <- readFile path return $ allDistances $ map read $ lines $ fileContents where allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs) allDistances _ = 0
这将在O(n ^ 2)时间内运行,并且必须始终在内存中保留完整的整数列表.在我的实际程序中,该行包含更多数字,其中我构造了比Int更复杂的数据类型.这给了我必须处理的数据的内存错误.
因此,对上述单线程解决方案有两个改进.首先,加快实际运行时间.其次,找到一种不将整个列表保留在内存中的方法.我知道这需要解析整个文件n次.因此将进行O(n ^ 2)比较,并解析O(n ^ 2)行.这对我来说没问题,因为我宁愿选择一个成功慢的程序而不是一个失败的程序.当输入文件足够小时,我总能驻留在更简单的版本中.
为了使用多个cpu核心,我从Real World Haskell中获取了Mapreduce实现(第24章,可在此处获得).
我修改了书中的分块功能,而不是将整个文件分成块,返回与行相同的块,每个块代表一个元素
tails . lines . readFile
因为我希望程序也可以在文件大小上进行扩展,所以我最初使用的是惰性IO.然而,这与"太多打开的文件"失败了,我在上一个问题中提到了这个问题(文件句柄被GC处理得太迟了).完整的懒惰IO版本发布在那里.
正如公认的答案所解释的那样,严格的IO可以解决问题.这确实解决了2k行文件的"太多打开文件"问题,但在50k文件中出现"内存不足"失败.
请注意,第一个单线程实现(没有mapreduce)能够处理50k文件.
另一种对我来说最吸引人的解决方案是使用iteratee IO.我希望这能解决文件句柄和内存资源耗尽问题.但是,我的实现仍然在2k行文件上出现"Too many open files"错误.
iteratee IO版本具有与本书中相同的mapReduce函数,但具有修改后的chunkedFileEnum,以使其与Enumerator一起使用.
因此我的问题是; 以下iteratee IO基础实现有什么问题?懒惰在哪里?
import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans (MonadIO, liftIO) import System.IO import qualified Data.Enumerator.List as EL import qualified Data.Enumerator.Text as ET import Data.Enumerator hiding (map, filter, head, sequence) import Data.Text(Text) import Data.Text.Read import Data.Maybe import qualified Data.ByteString.Char8 as Str import Control.Exception (bracket,finally) import Control.Monad(forM,liftM) import Control.Parallel.Strategies import Control.Parallel import Control.DeepSeq (NFData) import Data.Int (Int64) --Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances --My operation for one value pair distance :: Int -> Int -> Int distance a b = (a-b)*(a-b) combineDistances :: [Int] -> Int combineDistances = sum --Test file generation createTestFile :: Int -> FilePath -> IO () createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1 where infiniteList :: Int->Int-> [Int] infiniteList i j = (i + j) : infiniteList j (i+j) --Applying my operation simply on a file --(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000) --But i want to use multiple cores.. sumOfDistancesOnSmallFile :: FilePath -> IO Int sumOfDistancesOnSmallFile path = do fileContents <- readFile path return $ allDistances $ map read $ lines $ fileContents where allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs) allDistances _ = 0 --Setting up an enumerator of read values from a text stream readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b readerEnumerator reader = joinI . (EL.concatMapM transformer) where transformer input = case reader input of Right (val, remainder) -> return [val] Left err -> return [0] readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b readEnumerator = readerEnumerator (signed decimal) --The iteratee version of my operation distancesFirstToTailIt :: Monad m=> Iteratee Int m Int distancesFirstToTailIt = do maybeNum <- EL.head maybe (return 0) distancesOneToManyIt maybeNum distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int distancesOneToManyIt base = do maybeNum <- EL.head maybe (return 0) combineNextDistance maybeNum where combineNextDistance nextNum = do rest <- distancesOneToManyIt base return $ combineDistances [(distance base nextNum),rest] --The mapreduce algorithm mapReduce :: Strategy b -- evaluation strategy for mapping -> (a -> b) -- map function -> Strategy c -- evaluation strategy for reduction -> ([b] -> c) -- reduce function -> [a] -- list to map over -> c mapReduce mapStrat mapFunc reduceStrat reduceFunc input = mapResult `pseq` reduceResult where mapResult = parMap mapStrat mapFunc input reduceResult = reduceFunc mapResult `using` reduceStrat --Applying the iteratee operation using mapreduce sumOfDistancesOnFileWithIt :: FilePath -> IO Int sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc) rpar (sumValuesAsReduceFunc) where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt)) sumValuesAsReduceFunc :: [IO Int] -> IO Int sumValuesAsReduceFunc = liftM sum . sequence --Working with (file)chunk enumerators: data ChunkSpec = CS{ chunkOffset :: !Int , chunkLength :: !Int } deriving (Eq,Show) chunkedFileEnum :: (NFData (a)) => MonadIO m => (FilePath-> IO [ChunkSpec]) -> ([Enumerator Text m b]->IO a) -> FilePath -> IO a chunkedFileEnum chunkCreator funcOnChunks path = do (chunks, handles)<- chunkedEnum chunkCreator path r <- funcOnChunks chunks (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles chunkedEnum :: MonadIO m=> (FilePath -> IO [ChunkSpec]) -> FilePath -> IO ([Enumerator Text m b], [Handle]) chunkedEnum chunkCreator path = do chunks <- chunkCreator path liftM unzip . forM chunks $ \spec -> do h <- openFile path ReadMode hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF return (chunk,h) -- returns set of chunks representing tails . lines . readFile chunkByLinesTails :: FilePath -> IO[ChunkSpec] chunkByLinesTails path = do bracket (openFile path ReadMode) hClose $ \h-> do totalSize <- fromIntegral `liftM` hFileSize h let chunkSize = 1 findChunks offset = do let newOffset = offset + chunkSize hSeek h AbsoluteSeek (fromIntegral newOffset) let findNewline lineSeekOffset = do eof <- hIsEOF h if eof then return [CS offset (totalSize - offset)] else do bytes <- Str.hGet h 256 case Str.elemIndex '\n' bytes of Just n -> do nextChunks <- findChunks (lineSeekOffset + n + 1) return (CS offset (totalSize-offset):nextChunks) Nothing -> findNewline (lineSeekOffset + Str.length bytes) findNewline newOffset findChunks 0
顺便说一下,我在Mac OS X 10.6.7(雪豹)上
使用以下软件包运行HaskellPlatform 2011.2.0 :
bytestring 0.9.1.10
parallel 3.1.0.1
enumerator 0.4.8,这里有一本手册