
一个礼拜前粗略的学习了下 asyncio + aiohttp 实现异步爬虫,三天前为了联手写了一个 ins 下载爬虫。 爬虫思路草图:
用文字描述把: 首先我参考了 aiohttp 官方的爬虫例子,官方爬虫例子在这:crawl.py,我的思路是这样的
1.获取 user id ; user id 是必须的,所以我把这个写成了一个方法在创建类的时候直接调用
2.获取页的数据;因为没发说是第几页,这个请求需要三个参数,一个是 user id,一个是一次获取到的数量,第三个是 end_cursor 用来获取下一页
3.解析数据;获取到的数据是 json 格式的,我需要获取两个东西,第一个是图片链接,第二个是 end_cursor,用来获取下一页
4.处理 url ;这个方法遍历 urls 调用 download 方法下载
5.下载;用到了 aiohttp 和 aiofiles,没有异常、下载完后我用 asyncio.Task(self.get_display_urls(end_cursor))回到了获取页数据的方法,以此循环,当然获取页数据的方法有判读 end_cursor 是否为空,直接 loop.stop()
为了实现抓取所有图片,我没有使用 run_until_complete,因为它只获取了一次就停了,我就是用的 run_forever
全部代码如下:
import aiohttp import asyncio import aiofiles import aioredis import re import json import os import signal import time import logging class Instagram(object): def __init__(self, username, loop, depth=0, maxtasks=200): """ :param username: 用户名 :param loop: :param depth: 下载页面数量 :param maxtasks: 最大并发限制 """ self.down_tasks = set() self.down_todo = set() self.down_busy = set() self.down_dOne= {} self.loop = loop self.sem = asyncio.Semaphore(maxtasks, loop=loop) self.username = username self.max_page = depth if depth >= 1 else -1 self.ROOT_URL = 'https://www.instagram.com/' self.headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36', 'Cookie': 'rur=ATN; mid=XM2K8QAEAAGy8fiEf1bT05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessiOnid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"' } self.proxy = "http://localhost:8001" t = asyncio.ensure_future(self.init(), loop=loop) loop.run_until_complete(t) async def run(self): """ :return: """ await self.init() t = asyncio.ensure_future(self.addurls(), loop=self.loop) while self.down_busy: await asyncio.sleep(1, loop=self.loop) await t self.loop.close() async def init(self): """ 初始化必要参数:user id :return: """ print('[init] 初始化参数...') shared_data = await self.get_shared_data() if not shared_data: print('!!!!!!!') exit(0) self.user_id = re.findall('"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0] async def _http_request(self, url, **kwargs): """ http 请求 :param url: 请求链接 :param kwargs: 链接参数 :return: 网页 response """ params = dict() if kwargs: for k, v in kwargs.items(): params.update(v) async with self.sem: async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10, proxy=self.proxy, headers=self.headers, params=params) as response: html = (await response.read()).decode('utf-8', 'replace') return html except Exception as exc: logging.warning("[_http_request] 异常: {}".format(exc)) async def get_shared_data(self): """ 获取 shared data :return: """ html = await self._http_request(self.ROOT_URL + self.username) if html: shared_data = html.split("window._sharedData = ")[1].split(";</script>")[0] return shared_data def get_ends_cursor(self, html): """ :param html: :return: """ if html: edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media'] edges = edge_media['edges'] if edges: end_cursor = edge_media['page_info']['end_cursor'] has_next_page = edge_media['page_info']['has_next_page'] if has_next_page: return end_cursor return '' async def get_display_url(self, max=50, end_cursor=""): """ 解析 display url :param max: 单次获取图片总量 :param end_cursor: end_cursor 是获取下一页的参数 :return: 包含{max}数量的图片链接列表 """ pic_params = { 'query_hash': 'f2405b236d85e8296cf30347c9f08c2a', 'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(self.user_id, max, end_cursor), } pic_url = self.ROOT_URL + 'graphql/query/' html = await self._http_request(pic_url, parms=pic_params) if html: edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media'] edges = edge_media['edges'] if edges: display_urls = [] for edge in edges: display_urls.append(edge['node']['display_url']) return display_urls, self.get_ends_cursor(html) async def download(self, url): """ 下载到本地 :param url: :return: """ print('processing:', url) # try: # async with self.sem: //如果使用 Semaphore 会卡住。。。虽然不会报错 self.down_todo.remove(url) self.down_busy.add(url) path = './instagram/' + self.username if not os.path.exists(path): os.makedirs(path) filename = path + '/' + url.split('?')[0].split('/')[-1] print('start download:', url) async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=self.headers, proxy=self.proxy) as resp: if resp.status == 200: f = await aiofiles.open(filename, 'wb') await f.write(await resp.read()) await f.close() await asyncio.Task(self.addurls(self.end_cursor)) resp.close() self.down_done[url] = True except Exception as exc: logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url)) self.down_done[url] = False self.down_busy.remove(url) print(len(self.down_done), 'completed tasks,', len(self.down_tasks), 'still pending, todo', len(self.down_todo)) # 这个判断根本没有任何用,不会调用,直接卡住 if self.end_cursor is False: print('下载完 la') self.loop.close() # except Exception as exc: # logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url)) async def add_down_urls(self, urls): print('[add_down_urls] 开始下载,数量:', len(urls)) async with asyncio.Semaphore() for url in urls: self.down_todo.add(url) await self.sem.acquire() task = asyncio.ensure_future(self.download(url), loop=self.loop) task.add_done_callback(lambda t: self.sem.release()) task.add_done_callback(self.down_tasks.remove) self.down_tasks.add(task) async def addurls(self, end_cursor=""): """ :param end_cursor: 当前页面的标示 base64 加密,用于加载下一页,如果没有下一页改参数为 Fasle :return: """ print("\n\n 开始获取下一页,end_cursor:", end_cursor) display_urls, self.end_cursor = await self.get_display_url(end_cursor=end_cursor) await self.add_down_urls(display_urls) if not self.end_cursor: return ''' 流程: run() --> addurls() --> add_own_urls() --> download() ^ | | | <-------<-----<--------<-------<-- ''' if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() ins = Instagram('taeri__taeri', loop) future = asyncio.ensure_future(ins.addurls(), loop=loop) try: loop.add_signal_handler(signal.SIGINT, loop.stop) except RuntimeError: pass loop.run_forever() # loop.run_until_complete(future) # for i in future.result(): # print(">>>>", i) # ins.main() end = time.time() print('耗时:', end - start) 我遇到的问题是不使用 Semaphore 的情况下一开始是疯狂下载,也的确是下载成功了,然后就直接卡住,也不停,就一直卡住(原谅我使用卡住这个词),希望能帮忙看一下错在哪,谢谢了
1 zckun OP 草图在这。。。为了找画图工具用了半个多小时 https://github.com/ZCKun/d/blob/master/a.png |
2 zckun OP ins 是新号,所以 cookie 没去掉就算了 |
3 zckun OP 不能重新编辑么。。代码有些错误忘了删除,希望别介意 |
4 zckun OP emmmm |
5 CSM 2019-05-09 14:15:46 +08:00 你好,研究了下你的代码,发现一个小问题 # 这个判断根本没有任何用,不会调用,直接卡住 if self.end_cursor is False: 这个是因为之前没有下一页的时候 end_cursor 是 '' 空字符串,而不是 False。 另外就是我觉得你的架构上有问题,这个问题是经典的生产者-消费者模型,请求并解析出图片链接作为生产者,然后启动多个消费者来下载这些链接就行了。我重构了一下你的代码,具体可见 https://gist.github.com/cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4 |
6 CSM 2019-05-09 14:25:25 +08:00 还有就是可以看到在 _http_request 方法里为每一个请求都生成了一个 ClientSession,这样太浪费了,建议只用一个 session。doc: Session encapsulates a connection pool (connector instance) and supports keepalives by default. Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling. https://docs.aiohttp.org/en/stable/client_reference.html |
9 shawndev 2019-05-09 15:06:32 +08:00 GvR 有一个 500lines 项目你可以参考一下。另外楼上说的很对,这是典型的生产者消费者模式,另外似乎没有考虑去重、重定向和超时重试? |