• 12.8 简单的并行编程
    • 问题
    • 解决方案
    • 讨论

    12.8 简单的并行编程

    问题

    你有个程序要执行CPU密集型工作,你想让他利用多核CPU的优势来运行的快一点。

    解决方案

    concurrent.futures 库提供了一个 ProcessPoolExecutor 类,可被用来在一个单独的Python解释器中执行计算密集型函数。不过,要使用它,你首先要有一些计算密集型的任务。我们通过一个简单而实际的例子来演示它。假定你有个Apache web服务器日志目录的gzip压缩包:

    1. logs/
    2. 20120701.log.gz
    3. 20120702.log.gz
    4. 20120703.log.gz
    5. 20120704.log.gz
    6. 20120705.log.gz
    7. 20120706.log.gz
    8. ...

    进一步假设每个日志文件内容类似下面这样:

    1. 124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
    2. 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
    3. 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
    4. 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
    5. ...

    下面是一个脚本,在这些日志文件中查找出所有访问过robots.txt文件的主机:

    1. # findrobots.py
    2.  
    3. import gzip
    4. import io
    5. import glob
    6.  
    7. def find_robots(filename):
    8. '''
    9. Find all of the hosts that access robots.txt in a single log file
    10. '''
    11. robots = set()
    12. with gzip.open(filename) as f:
    13. for line in io.TextIOWrapper(f,encoding='ascii'):
    14. fields = line.split()
    15. if fields[6] == '/robots.txt':
    16. robots.add(fields[0])
    17. return robots
    18.  
    19. def find_all_robots(logdir):
    20. '''
    21. Find all hosts across and entire sequence of files
    22. '''
    23. files = glob.glob(logdir+'/*.log.gz')
    24. all_robots = set()
    25. for robots in map(find_robots, files):
    26. all_robots.update(robots)
    27. return all_robots
    28.  
    29. if __name__ == '__main__':
    30. robots = find_all_robots('logs')
    31. for ipaddr in robots:
    32. print(ipaddr)

    前面的程序使用了通常的map-reduce风格来编写。函数 find_robots() 在一个文件名集合上做map操作,并将结果汇总为一个单独的结果,也就是 find_all_robots() 函数中的 all_robots 集合。现在,假设你想要修改这个程序让它使用多核CPU。很简单——只需要将map()操作替换为一个 concurrent.futures 库中生成的类似操作即可。下面是一个简单修改版本:

    1. # findrobots.py
    2.  
    3. import gzip
    4. import io
    5. import glob
    6. from concurrent import futures
    7.  
    8. def find_robots(filename):
    9. '''
    10. Find all of the hosts that access robots.txt in a single log file
    11.  
    12. '''
    13. robots = set()
    14. with gzip.open(filename) as f:
    15. for line in io.TextIOWrapper(f,encoding='ascii'):
    16. fields = line.split()
    17. if fields[6] == '/robots.txt':
    18. robots.add(fields[0])
    19. return robots
    20.  
    21. def find_all_robots(logdir):
    22. '''
    23. Find all hosts across and entire sequence of files
    24. '''
    25. files = glob.glob(logdir+'/*.log.gz')
    26. all_robots = set()
    27. with futures.ProcessPoolExecutor() as pool:
    28. for robots in pool.map(find_robots, files):
    29. all_robots.update(robots)
    30. return all_robots
    31.  
    32. if __name__ == '__main__':
    33. robots = find_all_robots('logs')
    34. for ipaddr in robots:
    35. print(ipaddr)

    通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。实际的性能优化效果根据你的机器CPU数量的不同而不同。

    讨论

    ProcessPoolExecutor 的典型用法如下:

    1. from concurrent.futures import ProcessPoolExecutor
    2.  
    3. with ProcessPoolExecutor() as pool:
    4. ...
    5. do work in parallel using pool
    6. ...

    其原理是,一个 ProcessPoolExecutor 创建N个独立的Python解释器,N是系统上面可用CPU的个数。你可以通过提供可选参数给 ProcessPoolExecutor(N) 来修改处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成,然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。

    被提交到池中的工作必须被定义为一个函数。有两种方法去提交。如果你想让一个列表推导或一个 map() 操作并行执行的话,可使用 pool.map() :

    1. # A function that performs a lot of work
    2. def work(x):
    3. ...
    4. return result
    5.  
    6. # Nonparallel code
    7. results = map(work, data)
    8.  
    9. # Parallel implementation
    10. with ProcessPoolExecutor() as pool:
    11. results = pool.map(work, data)

    另外,你可以使用 pool.submit() 来手动的提交单个任务:

    1. # Some function
    2. def work(x):
    3. ...
    4. return result
    5.  
    6. with ProcessPoolExecutor() as pool:
    7. ...
    8. # Example of submitting work to the pool
    9. future_result = pool.submit(work, arg)
    10.  
    11. # Obtaining the result (blocks until done)
    12. r = future_result.result()
    13. ...

    如果你手动提交一个任务,结果是一个 Future 实例。要获取最终结果,你需要调用它的 result() 方法。它会阻塞进程直到结果被返回来。

    如果不想阻塞,你还可以使用一个回调函数,例如:

    1. def when_done(r):
    2. print('Got:', r.result())
    3.  
    4. with ProcessPoolExecutor() as pool:
    5. future_result = pool.submit(work, arg)
    6. future_result.add_done_callback(when_done)

    回调函数接受一个 Future 实例,被用来获取最终的结果(比如通过调用它的result()方法)。尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:

    • 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
    • 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
    • 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
    • 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情,
      一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。

    • 在Unix上进程池通过调用 fork() 系统调用被创建,
      它会克隆Python解释器,包括fork时的所有程序状态。而在Windows上,克隆解释器时不会克隆状态。实际的fork操作会在第一次调用 pool.map()pool.submit() 后发生。

    • 当你混合使用进程池和多线程的时候要特别小心。
      你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。

    原文:

    http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p08_perform_simple_parallel_programming.html