Python asyncio 下载爬虫没法停止的一个问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
zckun
V2EX    Python

Python asyncio 下载爬虫没法停止的一个问题

  •  
  •   zckun 2019-05-09 11:08:43 +08:00 2761 次点击
    这是一个创建于 2359 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个礼拜前粗略的学习了下 asyncio + aiohttp 实现异步爬虫,三天前为了联手写了一个 ins 下载爬虫。 爬虫思路草图:a.png用文字描述把: 首先我参考了 aiohttp 官方的爬虫例子,官方爬虫例子在这:crawl.py,我的思路是这样的

    1.获取 user id ; user id 是必须的,所以我把这个写成了一个方法在创建类的时候直接调用

    2.获取页的数据;因为没发说是第几页,这个请求需要三个参数,一个是 user id,一个是一次获取到的数量,第三个是 end_cursor 用来获取下一页

    3.解析数据;获取到的数据是 json 格式的,我需要获取两个东西,第一个是图片链接,第二个是 end_cursor,用来获取下一页

    4.处理 url ;这个方法遍历 urls 调用 download 方法下载

    5.下载;用到了 aiohttp 和 aiofiles,没有异常、下载完后我用 asyncio.Task(self.get_display_urls(end_cursor))回到了获取页数据的方法,以此循环,当然获取页数据的方法有判读 end_cursor 是否为空,直接 loop.stop()

    为了实现抓取所有图片,我没有使用 run_until_complete,因为它只获取了一次就停了,我就是用的 run_forever

    全部代码如下:

    import aiohttp import asyncio import aiofiles import aioredis import re import json import os import signal import time import logging class Instagram(object): def __init__(self, username, loop, depth=0, maxtasks=200): """ :param username: 用户名 :param loop: :param depth: 下载页面数量 :param maxtasks: 最大并发限制 """ self.down_tasks = set() self.down_todo = set() self.down_busy = set() self.down_dOne= {} self.loop = loop self.sem = asyncio.Semaphore(maxtasks, loop=loop) self.username = username self.max_page = depth if depth >= 1 else -1 self.ROOT_URL = 'https://www.instagram.com/' self.headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36', 'Cookie': 'rur=ATN; mid=XM2K8QAEAAGy8fiEf1bT05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessiOnid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"' } self.proxy = "http://localhost:8001" t = asyncio.ensure_future(self.init(), loop=loop) loop.run_until_complete(t) async def run(self): """ :return: """ await self.init() t = asyncio.ensure_future(self.addurls(), loop=self.loop) while self.down_busy: await asyncio.sleep(1, loop=self.loop) await t self.loop.close() async def init(self): """ 初始化必要参数:user id :return: """ print('[init] 初始化参数...') shared_data = await self.get_shared_data() if not shared_data: print('!!!!!!!') exit(0) self.user_id = re.findall('"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0] async def _http_request(self, url, **kwargs): """ http 请求 :param url: 请求链接 :param kwargs: 链接参数 :return: 网页 response """ params = dict() if kwargs: for k, v in kwargs.items(): params.update(v) async with self.sem: async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10, proxy=self.proxy, headers=self.headers, params=params) as response: html = (await response.read()).decode('utf-8', 'replace') return html except Exception as exc: logging.warning("[_http_request] 异常: {}".format(exc)) async def get_shared_data(self): """ 获取 shared data :return: """ html = await self._http_request(self.ROOT_URL + self.username) if html: shared_data = html.split("window._sharedData = ")[1].split(";</script>")[0] return shared_data def get_ends_cursor(self, html): """ :param html: :return: """ if html: edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media'] edges = edge_media['edges'] if edges: end_cursor = edge_media['page_info']['end_cursor'] has_next_page = edge_media['page_info']['has_next_page'] if has_next_page: return end_cursor return '' async def get_display_url(self, max=50, end_cursor=""): """ 解析 display url :param max: 单次获取图片总量 :param end_cursor: end_cursor 是获取下一页的参数 :return: 包含{max}数量的图片链接列表 """ pic_params = { 'query_hash': 'f2405b236d85e8296cf30347c9f08c2a', 'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(self.user_id, max, end_cursor), } pic_url = self.ROOT_URL + 'graphql/query/' html = await self._http_request(pic_url, parms=pic_params) if html: edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media'] edges = edge_media['edges'] if edges: display_urls = [] for edge in edges: display_urls.append(edge['node']['display_url']) return display_urls, self.get_ends_cursor(html) async def download(self, url): """ 下载到本地 :param url: :return: """ print('processing:', url) # try: # async with self.sem: //如果使用 Semaphore 会卡住。。。虽然不会报错 self.down_todo.remove(url) self.down_busy.add(url) path = './instagram/' + self.username if not os.path.exists(path): os.makedirs(path) filename = path + '/' + url.split('?')[0].split('/')[-1] print('start download:', url) async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=self.headers, proxy=self.proxy) as resp: if resp.status == 200: f = await aiofiles.open(filename, 'wb') await f.write(await resp.read()) await f.close() await asyncio.Task(self.addurls(self.end_cursor)) resp.close() self.down_done[url] = True except Exception as exc: logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url)) self.down_done[url] = False self.down_busy.remove(url) print(len(self.down_done), 'completed tasks,', len(self.down_tasks), 'still pending, todo', len(self.down_todo)) # 这个判断根本没有任何用,不会调用,直接卡住 if self.end_cursor is False: print('下载完 la') self.loop.close() # except Exception as exc: # logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url)) async def add_down_urls(self, urls): print('[add_down_urls] 开始下载,数量:', len(urls)) async with asyncio.Semaphore() for url in urls: self.down_todo.add(url) await self.sem.acquire() task = asyncio.ensure_future(self.download(url), loop=self.loop) task.add_done_callback(lambda t: self.sem.release()) task.add_done_callback(self.down_tasks.remove) self.down_tasks.add(task) async def addurls(self, end_cursor=""): """ :param end_cursor: 当前页面的标示 base64 加密,用于加载下一页,如果没有下一页改参数为 Fasle :return: """ print("\n\n 开始获取下一页,end_cursor:", end_cursor) display_urls, self.end_cursor = await self.get_display_url(end_cursor=end_cursor) await self.add_down_urls(display_urls) if not self.end_cursor: return ''' 流程: run() --> addurls() --> add_own_urls() --> download() ^ | | | <-------<-----<--------<-------<-- ''' if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() ins = Instagram('taeri__taeri', loop) future = asyncio.ensure_future(ins.addurls(), loop=loop) try: loop.add_signal_handler(signal.SIGINT, loop.stop) except RuntimeError: pass loop.run_forever() # loop.run_until_complete(future) # for i in future.result(): # print(">>>>", i) # ins.main() end = time.time() print('耗时:', end - start) 

    我遇到的问题是不使用 Semaphore 的情况下一开始是疯狂下载,也的确是下载成功了,然后就直接卡住,也不停,就一直卡住(原谅我使用卡住这个词),希望能帮忙看一下错在哪,谢谢了

    10 条回复    2019-05-09 16:05:44 +08:00
    zckun
        1
    zckun  
    OP
       2019-05-09 11:09:23 +08:00
    草图在这。。。为了找画图工具用了半个多小时 https://github.com/ZCKun/d/blob/master/a.png
    zckun
        2
    zckun  
    OP
       2019-05-09 11:11:55 +08:00
    ins 是新号,所以 cookie 没去掉就算了
    zckun
        3
    zckun  
    OP
       2019-05-09 11:15:11 +08:00
    不能重新编辑么。。代码有些错误忘了删除,希望别介意
    zckun
        4
    zckun  
    OP
       2019-05-09 14:08:25 +08:00
    emmmm
    CSM
        5
    CSM  
       2019-05-09 14:15:46 +08:00   1
    你好,研究了下你的代码,发现一个小问题

    # 这个判断根本没有任何用,不会调用,直接卡住
    if self.end_cursor is False:

    这个是因为之前没有下一页的时候 end_cursor 是 '' 空字符串,而不是 False。


    另外就是我觉得你的架构上有问题,这个问题是经典的生产者-消费者模型,请求并解析出图片链接作为生产者,然后启动多个消费者来下载这些链接就行了。我重构了一下你的代码,具体可见 https://gist.github.com/cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4
    CSM
        6
    CSM  
       2019-05-09 14:25:25 +08:00
    还有就是可以看到在 _http_request 方法里为每一个请求都生成了一个 ClientSession,这样太浪费了,建议只用一个 session。doc:

    Session encapsulates a connection pool (connector instance) and supports keepalives by default. Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.

    https://docs.aiohttp.org/en/stable/client_reference.html
    zckun
        7
    zckun  
    OP
       2019-05-09 14:49:42 +08:00
    @GSM 之前 get_end_cursor 方法获取不到的话是返回 False 的,,忘了改了
    zckun
        8
    zckun  
    OP
       2019-05-09 14:50:01 +08:00
    @CSM 谢谢你,我看一下
    shawndev
        9
    shawndev  
       2019-05-09 15:06:32 +08:00
    GvR 有一个 500lines 项目你可以参考一下。另外楼上说的很对,这是典型的生产者消费者模式,另外似乎没有考虑去重、重定向和超时重试?
    zckun
        10
    zckun  
    OP
       2019-05-09 16:05:44 +08:00
    @shawndev 知道了,以前学 java 的时候学过,但是已经忘得差不多了,我去复习,谢谢
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1072 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 28ms UTC 17:58 PVG 01:58 LAX 10:58 JFK 13:58
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86