python3 asyncio 编程实践

asyncio

官方文档

由 Python 之父亲自操刀的异步编程模块 asyncio 在 3.4 版本中已经投入使用。 这号称可以将普通的函数封装成协程来执行, 然后将协程加入事件循环。 解释器根据事件循环执行代码, 遇到 IO 等阻塞就会挂起当前事件, 轮询并执行其他事件。

实践任务

把 2333 个随机的含有 2-10 个字符的中文翻译为英文, 这里我使用的是百度通用 API,

每个月有 200 万个字符免费使用的权利。 API 接口已经放在 github

不使用 asyncio 耗费的时间

github 的脚本的主函数写下如下代码:

if __name__ == '__main__':
    trans = BaiduTranslator()
    from_txt = ['我', '学生', ...] // 假设已经生成 2333 个随机中文字符串
    for txt in from_txt:
        rs = trans.translate("zh", "en", txt)
        print(rs)

这样每秒大约 4 个字符串左右, 需耗费 10 min。

用 asyncio + 多线程的话, 会少些时间, 需耗费 6 min 左右。 代码如下:

几个依赖库:

import asyncio
import aiohttp
import multiprocessing
from multiprocessing.pool import ThreadPool

先定义一个工具函数, 作用是将给定数组按每 n 个划分为新的数组, 不足 n 个的划分为新的一组。

def split_array_n(arr, n):
    res = []
    length = len(arr)
    if n <= 1 or length < 20:
        return [arr]
    paras = length // n
    remain = length % n
    for i in range(n):
        res.append(arr[i * paras:i * paras + paras])
    if remain != 0:
        res.append(arr[length - remain: length])
    return res

翻译部分的代码如下:

async def fetch(session, url, params):
    async with session.get(url, params=params) as response:
        return await response.json()


async def translate_one(*args):
    fid, sl, tl, text = args // fid 是无关紧要的参数, 可删掉
    async with aiohttp.ClientSession() as session:
        while True:
            key = pick_key()  // 修改为自己的 appid, secretKey 即可
            if key is None:
                return fid, None
            uname, appid, secretKey = key
            params = generate_sign(appid, secretKey, sl, tl, text)
            try:
                res = await fetch(session, target_url, params=params)
            except IOError:
                return fid, None

            if 'trans_result' in res.keys():
                return fid, res['trans_result'][0]['dst']

            if 'error_code' in res.keys() and res['error_code'] == '54004':
                # 接口每月免费字数已用完, 禁用当前 appid, secretKey, 进入循环
                enable_key(uname, 0)
            else:
                trans_result = None
                break
        return fid, trans_result

开启若干个事件循环, 分别执行不同数组分片,代码:

def translate_many(sources):
    """
    :param sources: list like [(fid, sl, tl, text), ]
    :return: translate result list like [(fid, dst_text),]
    """
    result = []
    asyncio.set_event_loop(asyncio.new_event_loop()) // 新建事件循环
    loop = asyncio.get_event_loop()
    for source in sources:
        task = asyncio.ensure_future(translate_one(*source))
        res = loop.run_until_complete(task) // res 是返回值
        result.append(res)
    loop.close()
    return result

多线程的代码:

def translate(sources, thread_num=None):
    res = []
    if thread_num is None:
        thread_num = 2 * multiprocessing.cpu_count()
    sources = split_array_n(sources, thread_num)
    pool = ThreadPool(processes=thread_num)
    for x in pool.imap(translate_many, sources):
        res.extend(x)
    pool.close()
    pool.join()
    return res

主函数:

if __name__ == '__main__':
    sources = ['', '']  // 一个含有 2333 个字符串的 list
    translate(sources, 4)

参考文档

深入理解 Python 异步编程

asyncio + aiohttp

发表评论

评论内容
 

评论列表, 共 0 条评论