python3 asyncio 编程实践
asyncio
由 Python 之父亲自操刀的异步编程模块 asyncio 在 3.4 版本中已经投入使用。 这号称可以将普通的函数封装成协程来执行, 然后将协程加入事件循环。 解释器根据事件循环执行代码, 遇到 IO 等阻塞就会挂起当前事件, 轮询并执行其他事件。
实践任务
把 2333 个随机的含有 2-10 个字符的中文翻译为英文, 这里我使用的是百度通用 API,
每个月有 200 万个字符免费使用的权利。 API 接口已经放在 github。
不使用 asyncio 耗费的时间
github 的脚本的主函数写下如下代码:
if __name__ == '__main__': trans = BaiduTranslator() from_txt = ['我', '学生', ...] // 假设已经生成 2333 个随机中文字符串 for txt in from_txt: rs = trans.translate("zh", "en", txt) print(rs)
这样每秒大约 4 个字符串左右, 需耗费 10 min。
用 asyncio + 多线程的话, 会少些时间, 需耗费 6 min 左右。 代码如下:
几个依赖库:
import asyncio import aiohttp import multiprocessing from multiprocessing.pool import ThreadPool
先定义一个工具函数, 作用是将给定数组按每 n 个划分为新的数组, 不足 n 个的划分为新的一组。
def split_array_n(arr, n): res = [] length = len(arr) if n <= 1 or length < 20: return [arr] paras = length // n remain = length % n for i in range(n): res.append(arr[i * paras:i * paras + paras]) if remain != 0: res.append(arr[length - remain: length]) return res
翻译部分的代码如下:
async def fetch(session, url, params): async with session.get(url, params=params) as response: return await response.json() async def translate_one(*args): fid, sl, tl, text = args // fid 是无关紧要的参数, 可删掉 async with aiohttp.ClientSession() as session: while True: key = pick_key() // 修改为自己的 appid, secretKey 即可 if key is None: return fid, None uname, appid, secretKey = key params = generate_sign(appid, secretKey, sl, tl, text) try: res = await fetch(session, target_url, params=params) except IOError: return fid, None if 'trans_result' in res.keys(): return fid, res['trans_result'][0]['dst'] if 'error_code' in res.keys() and res['error_code'] == '54004': # 接口每月免费字数已用完, 禁用当前 appid, secretKey, 进入循环 enable_key(uname, 0) else: trans_result = None break return fid, trans_result
开启若干个事件循环, 分别执行不同数组分片,代码:
def translate_many(sources): """ :param sources: list like [(fid, sl, tl, text), ] :return: translate result list like [(fid, dst_text),] """ result = [] asyncio.set_event_loop(asyncio.new_event_loop()) // 新建事件循环 loop = asyncio.get_event_loop() for source in sources: task = asyncio.ensure_future(translate_one(*source)) res = loop.run_until_complete(task) // res 是返回值 result.append(res) loop.close() return result
多线程的代码:
def translate(sources, thread_num=None): res = [] if thread_num is None: thread_num = 2 * multiprocessing.cpu_count() sources = split_array_n(sources, thread_num) pool = ThreadPool(processes=thread_num) for x in pool.imap(translate_many, sources): res.extend(x) pool.close() pool.join() return res
主函数:
if __name__ == '__main__': sources = ['', ''] // 一个含有 2333 个字符串的 list translate(sources, 4)
发表评论
评论列表, 共 0 条评论