Python黑魔法,一行实现并行化

Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。

 

传统的例子

 

简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:

 

#Example.py

'''

Standard Producer/Consumer Threading Pattern

'''

 

import time

import threading

import Queue

 

class Consumer(threading.Thread):

    def __init__(self, queue):

        threading.Thread.__init__(self)

        self._queue = queue

 

    def run(self):

        while True:

            # queue.get() blocks the current thread until

            # an item is retrieved.

            msg = self._queue.get()

            # Checks if the current message is

            # the "Poison Pill"

            if isinstance(msg, str) and msg == 'quit':

                # if so, exists the loop

                break

            # "Processes" (or in our case, prints) the queue item  

            print "I'm a thread, and I received %s!!" % msg

        # Always be friendly!

        print 'Bye byes!'

 

def Producer():

    # Queue is used to share items between

    # the threads.

    queue = Queue.Queue()

 

    # Create an instance of the worker

    worker = Consumer(queue)

    # start calls the internal run() method to

    # kick off the thread

    worker.start()

 

    # variable to keep track of when we started

    start_time = time.time()

    # While under 5 seconds..

    while time.time() - start_time < 5:

        # "Produce" a piece of work and stick it in

        # the queue for the Consumer to process

        queue.put('something at %s' % time.time())

        # Sleep a bit just to avoid an absurd number of messages

        time.sleep(1)

 

    # This the "poison pill" method of killing a thread.

    queue.put('quit')

    # wait for the thread to close down

    worker.join()

 

if __name__ == '__main__':

    Producer()

 

哈,看起来有些像 Java 不是吗?

 

我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。

 

问题在于…

 

首先,你需要一个样板类;

其次,你需要一个队列来传递对象;

而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。

 

worker 越多,问题越多

 

按照这一思路,你现在需要一个 worker 线程的线程池。下面是 一篇 IBM 经典教程 中的例子——在进行网页检索时通过多线程进行加速。

 

#Example2.py

'''

A more realistic thread pool example

'''

 

import time

import threading

import Queue

import urllib2

 

class Consumer(threading.Thread):

    def __init__(self, queue):

        threading.Thread.__init__(self)

        self._queue = queue

 

    def run(self):

        while True:

            content = self._queue.get()

            if isinstance(content, str) and content == 'quit':

                break

            response = urllib2.urlopen(content)

        print 'Bye byes!'

 

def Producer():

    urls = [

        'http://www.python.org', 'http://www.yahoo.com'

        'http://www.scala.org', 'http://www.google.com'

        # etc..

    ]

    queue = Queue.Queue()

    worker_threads = build_worker_pool(queue, 4)

    start_time = time.time()

 

    # Add the urls to process

    for url in urls:

        queue.put(url)  

    # Add the poison pillv

    for worker in worker_threads:

        queue.put('quit')

    for worker in worker_threads:

        worker.join()

 

    print 'Done! Time taken: {}'.format(time.time() - start_time)

 

def build_worker_pool(queue, size):

    workers = []

    for _ in range(size):

        worker = Consumer(queue)

        worker.start()

        workers.append(worker)

    return workers

 

if __name__ == '__main__':

    Producer()

 

这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……

 

至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。

 

何不试试 map

 

map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。

 

urls = ['http://www.yahoo.com', 'http://www.reddit.com']

results = map(urllib2.urlopen, urls)

 

上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:

 

results = []

for url in urls:

    results.append(urllib2.urlopen(url))

 

map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。

 

为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。

 

 

在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.dummy.

 

这里多扯两句: multiprocessing.dummy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:”嘛,有这么个东西,你知道就成.”相信我,这个库被严重低估了!

 

dummy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 dummy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。

所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。

 

动手尝试

 

使用下面的两行代码来引用包含并行化 map 函数的库:

 

from multiprocessing import Pool

from multiprocessing.dummy import Pool as ThreadPool

 

实例化 Pool 对象:

 

pool = ThreadPool()

 

这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问。

 

Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。

 

一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。

 

pool = ThreadPool(4) # Sets the pool size to 4

 

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

 

创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

 

import urllib2

from multiprocessing.dummy import Pool as ThreadPool

 

urls = [

    'http://www.python.org',

    'http://www.python.org/about/',

    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',

    'http://www.python.org/doc/',

    'http://www.python.org/download/',

    'http://www.python.org/getit/',

    'http://www.python.org/community/',

    'https://wiki.python.org/moin/',

    'http://planet.python.org/',

    'https://wiki.python.org/moin/LocalUserGroups',

    'http://www.python.org/psf/',

    'http://docs.python.org/devguide/',

    'http://www.python.org/community/awards/'

    # etc..

    ]

 

# Make the Pool of workers

pool = ThreadPool(4)

# Open the urls in their own threads

# and return the results

results = pool.map(urllib2.urlopen, urls)

#close the pool and wait for the work to finish

pool.close()

pool.join()

 

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

 

# results = []

# for url in urls:

#   result = urllib2.urlopen(url)

#   results.append(result)

 

# # ------- VERSUS ------- #

 

# # ------- 4 Pool ------- #

# pool = ThreadPool(4)

# results = pool.map(urllib2.urlopen, urls)

 

# # ------- 8 Pool ------- #

 

# pool = ThreadPool(8)

# results = pool.map(urllib2.urlopen, urls)

 

# # ------- 13 Pool ------- #

 

# pool = ThreadPool(13)

# results = pool.map(urllib2.urlopen, urls)

 

结果:

 

#        Single thread:  14.4 Seconds

#               4 Pool:   3.1 Seconds

#               8 Pool:   1.4 Seconds

#              13 Pool:   1.3 Seconds

 

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

 

另一个真实的例子

 

生成上千张图片的缩略图

这是一个 CPU 密集型的任务,并且十分适合进行并行化。

 

基础单进程版本

 

import os

import PIL

 

from multiprocessing import Pool

from PIL import Image

 

SIZE = (75,75)

SAVE_DIRECTORY = 'thumbs'

 

def get_image_paths(folder):

    return (os.path.join(folder, f)

            for f in os.listdir(folder)

            if 'jpeg' in f)

 

def create_thumbnail(filename):

    im = Image.open(filename)

    im.thumbnail(SIZE, Image.ANTIALIAS)

    base, fname = os.path.split(filename)

    save_path = os.path.join(base, SAVE_DIRECTORY, fname)

    im.save(save_path)

 

if __name__ == '__main__':

    folder = os.path.abspath(

        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')

    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

 

    images = get_image_paths(folder)

 

    for image in images:

        create_thumbnail(Image)

 

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

 

这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

 

如果我们使用 map 函数来代替 for 循环:

 

import os

import PIL

 

from multiprocessing import Pool

from PIL import Image

 

SIZE = (75,75)

SAVE_DIRECTORY = 'thumbs'

 

def get_image_paths(folder):

    return (os.path.join(folder, f)

            for f in os.listdir(folder)

            if 'jpeg' in f)

 

def create_thumbnail(filename):

    im = Image.open(filename)

    im.thumbnail(SIZE, Image.ANTIALIAS)

    base, fname = os.path.split(filename)

    save_path = os.path.join(base, SAVE_DIRECTORY, fname)

    im.save(save_path)

 

if __name__ == '__main__':

    folder = os.path.abspath(

        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')

    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

 

    images = get_image_paths(folder)

 

    pool = Pool()

    pool.map(creat_thumbnail, images)

    pool.close()

    pool.join()

 

5.6 秒!

 

虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

到这里,我们就实现了(基本)通过一行 Python 实现并行化。

原文链接

转载于:https://www.cnblogs.com/276815076/p/5530420.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/357204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaScript对账号卡号进行脱敏处理

导读&#xff1a;一般8位以上账号&#xff0c;显示首尾各4位&#xff0c;中间固定用8位*代替&#xff1b;8位及以下账号&#xff0c;显示首尾各2位&#xff0c;中间固定用8位*代替。 这里简单处理一下16位及以上的账号&#xff0c;卡号&#xff0c;其它的情况同理&#xff0c; …

2. python 参数个数可变的函数

如果想要定义一个参数个数不确定的函数&#xff0c; 可以通过*args,**kwargs实现&#xff1a; *args的使用&#xff1a; **kwargs的使用&#xff1a; 二者的混合使用&#xff1a; 另外还有一些关于调用函数时候的时候常用的技巧&#xff1a; 其和是等价的。 其和是等价的 当然也…

JavaScript常用工具类整理(总结版)

导读&#xff1a;在前端开发过程中需要对常用的功能模块进行封装&#xff0c;常用的方法多次调用需要整合&#xff0c;保证组件的复用性与程序的可维护性&#xff0c;这里总结一下&#xff0c;便于后续的使用&#xff01; 目录 1.全局声明工具类 2.定时器 3.判断变量是否是一…

axis2 json_带有Java和Axis2的JSON Web服务

axis2 json我最近遇到一位客户&#xff0c;要求我使用Java Web服务重建其旧产品。 他们希望它模块化并且易于使用。 我想到的第一件事是使用宁静的方法。 但是让我烦恼的是&#xff0c;Java宁静的方法是使用XML !&#xff0c;我更喜欢一种更简单的通信方式&#xff0c;易于理解…

如何对技术视频转换文章投稿进行二次创作

导读&#xff1a;在技术社区经常会收到一些大的平台&#xff08;华为云博客、infoq等平台的投稿任务&#xff09;&#xff0c;经过对数千篇通用技术稿件&#xff0c;积攒了一些小技巧。所以&#xff0c;在你创作之前还是要好好的看一下&#xff0c;希望对你有帮助&#xff01;看…

iOS开发-UIScrollView原理

转载:http://www.cnblogs.com/xiaofeixiang/p/5144256.html UIScrollView 在开发中是不可避免&#xff0c;关于UIScrollView都有自己一定的理解。滚动视图有两个需要理解的属性&#xff0c;frame和bounds&#xff0c;frame是定义了视 图在窗口的大小和位置&#xff0c;bounds表…

使用Spring MVC进行资源版本控制

提供静态资源时&#xff0c;通常的做法是将某种版本信息附加到资源URL。 这使浏览器可以无限期地缓存资源。 每当资源的内容更改时&#xff0c;URL中的版本信息也会更改。 更新的URL会强制客户端浏览器放弃缓存的资源&#xff0c;并从服务器重新加载最新的资源版本。 使用Spri…

边缘计算如何实现海量IoT数据就地处理

1.什么是IoT边缘&#xff1f;Gartner数据显示&#xff0c;到2021年底将有超过50&#xff05;的大型企业部署至少一个边缘计算应用&#xff1b;到2023年底&#xff0c;50&#xff05;以上的大型企业将至少部署6个用于物联网或沉浸式体验的边缘计算应用。工业一体机的售价一般在1…

.propertie文件注释

在.properties文件中注释,前边加#就可以转载于:https://www.cnblogs.com/toSeeMyDream/p/5539322.html

《鸿蒙理论知识01》HarmonyOS概述之技术特性

导读:多种设备之间能够实现硬件互助、资源共享,依赖的关键技术包括分布式软总线、分布式设 备虚拟化、分布式数据管理、分布式任务调度等。 目录 1.分布式软总线 2.分布式设备虚拟化 3.分布式数据管理 4.分布式任务调度

《鸿蒙理论知识02》HarmonyOS开发平台和工具

目录 1.应用和开发工具的演进 2.超 级 终 端 应 用 开 发 面 临 全 新 挑 战

不常用 保存下来

一、数学函数  数学函数主要用于处理数字&#xff0c;包括整型、浮点数等。 ABS(x) 返回x的绝对值   SELECT ABS(-1) -- 返回1 CEIL(x),CEILING(x) 返回大于或等于x的最小整数   SELECT CEIL(1.5) -- 返回2 FLOOR(x) 返回小于或等于x的最大整数   SELECT FLOOR(1.5) --…

用Lucene建立搜索索引

本文是我们名为“ Apache Lucene基础知识 ”的学院课程的一部分。 在本课程中&#xff0c;您将了解Lucene。 您将了解为什么这样的库很重要&#xff0c;然后了解Lucene中搜索的工作方式。 此外&#xff0c;您将学习如何将Lucene Search集成到您自己的应用程序中&#xff0c;以…

《鸿蒙理论知识03》HarmonyOS概述之系统安全

在搭载 HarmonyOS 的分布式终端上,可以保证“正确的人,通过正确的设备,正确地使 用数据”。 通过“分布式多端协同身份认证”来保证“正确的人”。 通过“在分布式终端上构筑可信运行环境”来保证“正确的设备”。 通过“分布式数据在跨终端流动的过程中,对数据进行分类分…

Shell基础学习(六) 流程控制

1、if if的语法格式 if conditon thencommand1command2commandn fi 2、if else if conditon thencommand1command2commandn elsecommand1command2commandn fi 3、if elseif else if conditon thencommand1command2commandn else if conditoncommand1command2commandn elsecomma…

《鸿蒙理论知识04》HarmonyOS概述之系统定义

目录 系统定位 技术架构 系统服务层 框架层 系统定位 HarmonyOS 是一款“面向未来”、面向全场景(移动办公、运动健康、社交通信、媒体 娱乐等)的分布式操作系统。在传统的单设备系统能力的基础上,HarmonyOS 提出了基 于同一套系统能力、适配多种终端形态的分布式理念,…

jQuery on 绑定的事件触发多次

jquery用on绑定事件&#xff0c;在代码执行过程中&#xff0c;可能会遇到多次执行的情况。 解决方案是在on的事件前面加上一个off&#xff0c;再on。 $(#btnBind).off(click).on(click,function () {alert(123); });$("#xxx").off(keydown).focus().on(keydown,funct…

twitter api_Java应用程序上的Twitter API

twitter api是否曾想过将推文附加到Java应用程序&#xff1f; 我为此寻找了最好的API&#xff0c;很幸运&#xff0c;我找到了它&#xff01; http://twitter4j.org/ 一个简单的方法&#xff1a; 我们需要做的第一件事是在您的Twitter帐户中创建一个应用程序&#xff0c;为其授…

《鸿蒙理论知识05》HarmonyOS概述之下载与安装软件

DevEco Studio 支持 Windows 和 macOS 系统,下面将针对两种操作系统的软件安装方式 进行介绍。 目录 Windows 环境 macOS 环境 Windows 环境 运行环境要求 为保证 DevEco Studio 正常运行,建议您的电脑配置满足如下要求:  操作系统:Windows10 64 位  内存:8GB …

网络编程释疑之:单台服务器上的并发TCP连接数可以有多少

曾几何时我们还在寻求网络编程中C10K问题的解决方案&#xff0c;但是现在从硬件和操作系统支持来看单台服务器支持上万并发连接已经没有多少挑战性了。我们先假设单台服务器最多只能支持万级并发连接&#xff0c;其实对绝大多数应用来说已经远远足够了&#xff0c;但是对于一些…