我目前在Perl中处理了5个巨大的(每个400万行)日志文件,我想我可能会尝试在Go及其并发功能中实现相同的功能.因此,在Go中缺乏经验,我在考虑如下所示.任何关于该方法的评论将不胜感激.一些粗糙的伪代码:
var wg1 sync.WaitGroup var wg2 sync.WaitGroup func processRow (r Row) { wg2.Add(1) defer wg2.Done() res =return res } func processFile(f File) { wg1.Add(1) open(newfile File) defer wg1.Done() line = result = go processRow(line) newFile.Println(result) // Write new processed line to newFile wg2.Wait() newFile.Close() } func main() { for each f logfile { go processFile(f) } wg1.Wait() }
所以,想法是我同时处理这5个文件,然后每个文件的所有行也将同时处理.
那会有用吗?
您绝对应该使用渠道来管理已处理的行.或者你也可以写另一个goroutine来处理你的输出.
var numGoWriters = 10 func processRow(r Row, ch chan<- string) { res := process(r) ch <- res } func writeRow(f File, ch <-chan string) { w := bufio.NewWriter(f) for s := range ch { _, err := w.WriteString(s + "\n") } func processFile(f File) { outFile, err := os.Create("/path/to/file.out") if err != nil { // handle it } defer outFile.Close() var wg sync.WaitGroup ch := make(chan string, 10) // play with this number for performance defer close(ch) // once we're done processing rows, we close the channel // so our worker threads exit fScanner := bufio.NewScanner(f) for fScanner.Scan() { wg.Add(1) go func() { processRow(fScanner.Text(), ch) wg.Done() }() } for i := 0; i < numGoWriters; i++ { go writeRow(outFile, ch) } wg.Wait() }
在这里,我们processRow
进行了所有处理(我假设string
),writeRow
执行所有输出I/O,processFile
并将每个文件绑在一起.然后所有main
必须做的就是交出文件,生成goroutines,等等.
func main() { var wg sync.WaitGroup filenames := [...]string{"here", "are", "some", "log", "paths"} for fname := range filenames { inFile, err := os.Open(fname) if err != nil { // handle it } defer inFile.Close() wg.Add(1) go processFile(inFile) } wg.Wait()