尝试用 async / await 下载文件失败,求帮助 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
noqwerty
V2EX    Python

尝试用 async / await 下载文件失败,求帮助

  •  
  •   noqwerty 2018-01-15 07:42:29 +08:00 5733 次点击
    这是一个创建于 2825 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目前需要从一个 FTP 服务器下载 3 万多个小文件,之前用 multiprocessing 总是下一部分之后就停了,所以尝试用异步加快下载:

    class pdb: def __init__(self): self.ids = [] self.dl_id = [] self.err_id = [] async def download_file(self, session, url): try: with async_timeout.timeout(10): async with session.get(url) as remotefile: if remotefile.status == 200: data = await remotefile.read() return {"error": "", "data": data} else: return {"error": remotefile.status, "data": ""} except Exception as e: return {"error": e, "data": ""} async def unzip(self, session, work_queue): while not work_queue.empty(): queue_url = await work_queue.get() print(queue_url) data = await self.download_file(session, queue_url) id = queue_url[-11:-7] ID = id.upper() if not data["error"]: saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb') if ID not in self.dl_id: self.dl_id.append(ID) with open(f"{id}.ent.gz", 'wb') as f: f.write(data["data"].read()) with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile: shutil.copyfileobj(inFile, outFile) os.remove(f"{id}.ent.gz") else: self.err_id.append(ID) def download_queue(self, urls): loop = asyncio.get_event_loop() q = asyncio.Queue(loop=loop) [q.put_nowait(url) for url in urls] con = aiohttp.TCPConnector(limit=10) with aiohttp.ClientSession(loop=loop, cOnnector=con) as session: tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))] loop.run_until_complete(asyncio.gather(tasks)) loop.close() if __name__ == "__main__": x = pdb() urls = ['ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/nf/pdb4nfn.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ny/pdb4nyj.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/mn/pdb2mnz.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ra/pdb4ra4.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/x5/pdb4x5w.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/dm/pdb2dmq.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/n7/pdb2n7r.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/om/pdb2omv.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/oy/pdb3oy8.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/fe/pdb3fej.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/hw/pdb2hw9.ent.gz'] x.download_queue(urls) 

    报错信息如下:

    Traceback (most recent call last):
    File "test.py", line 111, in <module>
    x.download_queue(urls)
    File "test.py", line 99, in download_queue
    loop.run_until_complete(asyncio.gather(*tasks))
    File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
    File "test.py", line 73, in unzip
    data = await self.download_file(session, queue_url)
    File "test.py", line 65, in download_file
    return {"error": remotefile.status, "data": ""}
    File "/home/yz/miniconda3/lib/python3.6/site-packages/async_timeout/init.py", line 46, in exit
    raise asyncio.TimeoutError from None
    concurrent.futures._base.TimeoutError

    请大家帮忙看看。谢谢!

    第 1 条附言    2018-01-16 13:11:22 +08:00

    发现了很蠢的一个问题……aiohttp似乎并不支持解析ftp链接,我在urls里面随机插了一些正常链接就跑得动了……还是感谢大家的帮忙!

    24 条回复    2018-06-23 12:12:36 +08:00
    XiaoFaye
        1
    XiaoFaye  
       2018-01-15 07:46:30 +08:00 via Android   1
    lftp 开多线程 10 万文件我也下载过,一点问题没有啊!除非服务器限制吧。
    noqwerty
        2
    noqwerty  
    OP
       2018-01-15 07:47:14 +08:00
    @XiaoFaye #1 哈哈我知道,肯定是我代码的问题不是人家服务器的问题
    hareandlion
        3
    hareandlion  
       2018-01-15 07:57:52 +08:00 via iPhone   1
    timeout 时间设置的太短了?
    noqwerty
        4
    noqwerty  
    OP
       2018-01-15 07:58:43 +08:00
    @hareandlion #3 我试过把那行删了,就会一直卡在那,感觉还是其他地方有问题。
    hareandlion
        5
    hareandlion  
       2018-01-15 08:03:20 +08:00 via iPhone   1
    加个 print 看看是不是哪个 url 无效,卡住了
    noqwerty
        6
    noqwerty  
    OP
       2018-01-15 08:39:58 +08:00
    @hareandlion #5 已经有 print 了呀,会直接把所有链接都打印出来然后卡住……
    shoaly
        7
    shoaly  
       2018-01-15 09:47:22 +08:00   2
    老老实实用 python 做一个下载链接的清单, 然后用 aria2c 下载吧... 省出来的时间都是你的
    ipwx
        8
    ipwx  
       2018-01-15 09:57:53 +08:00   1
    unzip 不要用 async,CPU 密集型。
    bramblex
        9
    bramblex  
       2018-01-15 10:05:47 +08:00 via iPhone
    Python 可以真多线程了吗?
    noqwerty
        10
    noqwerty  
    OP
       2018-01-15 10:13:35 +08:00
    @shoaly #7 以后可能会了……但是这个就是练手的项目,想弄明白异步到底该怎么写。
    noqwerty
        11
    noqwerty  
    OP
       2018-01-15 10:14:20 +08:00
    @ipwx #8 没太明白,unzip 里面只有取数据那一行用了 await,其他步骤都没有,这样也不可以吗?谢谢帮忙!
    noqwerty
        12
    noqwerty  
    OP
       2018-01-15 10:15:45 +08:00
    @bramblex #9 还是有 GIL 的,估计多线程是有生之年系列了……不过现在多核越来越不值钱,多线程意义也没那么大了吧。
    Miksztowi
        13
    Miksztowi  
       2018-01-15 11:20:33 +08:00   1
    是客户端连接出错了把。从队列中取出 url 后会打印,发生了异常会继续拿,这样的话,如果请求有问题,应该是直接打印所有的 url 后结束?
    ipwx
        14
    ipwx  
       2018-01-15 11:27:53 +08:00   1
    @noqwerty 你的问题在于:

    with open(f"{id}.ent.gz", 'wb') as f:
    ....f.write(data["data"].read())
    with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
    ....shutil.copyfileobj(inFile, outFile)
    os.remove(f"{id}.ent.gz")

    这几行是没法被 asyncio 通过 Coroutine 并行化的,只能多线程。但是这就产生了两个问题,第一默认的 asyncio 不是多线程并行化的,第二即使设置 asyncio 多线程并行化,考虑到 GIL,Python 多线程也是不够用的。所以总体来说,asyncio 对你这段程序是不够的。还是得上多进程。
    ipwx
        15
    ipwx  
       2018-01-15 11:29:14 +08:00   1
    @noqwerty asyncio 主要针对网络通讯的并行化,用的是非阻塞模型。关键词可以搜索 select, epoll,了解更多非阻塞模型的事情。
    Miksztowi
        16
    Miksztowi  
       2018-01-15 11:43:44 +08:00   1
    @ipwx GIL 在文件 I/O 时不是会释放吗?
    支持 async 的文件 I/O 的有:
    1.aiofiles: https://github.com/Tinche/aiofiles
    2.asyncio 中有 thread pool executor. run_in_executor()也可以处理文件 I/O.
    如果还有别的方法,欢迎补充 :)

    你说 unzip 是 cpu 密集型,那这跟 GIL 有啥关系? 还是要上多进程。
    ivechan
        17
    ivechan  
       2018-01-15 12:12:46 +08:00   1
    @Miksztowi GIL 的存在会使得 Python 里的多线程对 CPU 密集型 程序优化作用有限,而多进程就可以避免这个缺点。
    ivechan
    &nbs;   18
    ivechan  
       2018-01-15 12:23:03 +08:00   1
    @ipwx select, epoll 应该只是 I/O 复用,但其实还是属于阻塞模型吧?只不过是在 select 的时候阻塞,而不是在真正的 IO 调用上。
    ipwx
        19
    ipwx  
       2018-01-15 15:51:42 +08:00   2
    @Miksztowi GIL 的存在导致文件读取的每个原子操作,线程切换的开销增大。而 shutil.copyfileobj,那是个 Python 循环,所以是不可能高效的。aiofiles 那东西在很多平台上面是多线程实现的,你可以 check 一下它的源代码。

    基于这个原因,可以认为楼主的程序上多线程没救,所以 asyncio 就没有救(如果多线程有救,asyncio 还是可以用的)。而因为多线程无法使用,CPU 密集型的 unzip 就没法被 asyncio 搞定。这才导出了我的结论,unzip 是 cpu 密集型,不适合 asyncio。

    至于 select、epoll 这类 I/O 复用,我觉得可以认为它们是非阻塞模型,因为它们避免了多线程模式下的 while { read } 县线程等待,和 callback 效果等同。我觉得并不一定 callback 才可以被认为是非阻塞,只要看是否达到同样的效果就可以了。
    noqwerty
        20
    noqwerty  
    OP
       2018-01-16 00:02:34 +08:00
    @ipwx #19 那请问多进程和 asyncio 可以结合起来使用吗?之前用多进程写的也总是跑到一半就自己停了,也不报错。多谢帮助!
    ipwx
        21
    ipwx  
       2018-01-16 09:45:23 +08:00   1
    @noqwerty 我记得 asyncio 有方法用多进程,不过现在的 api 都很基础很难用。所以你这需求用 python 其实挺麻烦的。
    linw1995
        22
    linw1995  
       2018-01-17 22:17:08 +08:00   1
    把 unzip,和 writefile 写成一个普通函数,用`concurrent.futures.ProcessPoolExecutor`和`loop.run_in_executor`函数运行,这样就可以结合起来

    await loop.run_in_executor(PPExecutor, func, args)

    https://pymotw.com/3/asyncio/executors.html
    alexred
        23
    alexred  
       2018-03-20 20:38:51 +08:00
    为什么我跑你的代码会报
    TypeError: Use async with instead
    的错
    ssikiki
        24
    ssikiki  
       2018-06-23 12:12:36 +08:00
    装 aiohttp 2.3.0 版本, 解决 TypeError: Use async with instead
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1031 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 18:29 PVG 02:29 LAX 11:29 JFK 14:29
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86