目录
引言
钢铁知识库,一个学习python爬虫、数据分析的知识库。人生苦短,快用python。
之前我们使用requests库爬取某个站点的时候,每发出一个请求,程序必须等待网站返回响应才能接着运行,而在整个爬虫过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。
像这种占用磁盘/内存IO、网络IO的任务,大部分时间是CPU在等待的操作,就叫IO密集型任务。对于这种情况有没有优化方案呢,当然有,那就是使用aiohttp库实现异步爬虫。
aiohttp是什么
我们在使用requests请求时,只能等一个请求先出去再回来,才会发送下一个请求。明显效率不高阿,这时候如果换成异步请求的方式,就不会有这个等待。一个请求发出去,不管这个请求什么时间响应,程序通过await挂起协程对象后直接进行下一个请求。
解决方法就是通过 aiohttp + asyncio,什么是aiohttp?一个基于 asyncio 的异步 HTTP 网络模块,可用于实现异步爬虫,速度明显快于 requests 的同步爬虫。
requests和aiohttp区别
区别就是一个同步一个是异步。话不多说直接上代码看效果。
安装aiohttp
pip install aiohttp
- requests同步示例:
#!/usr/bin/env python # -*- coding: utf-8 -*- # author: 钢铁知识库 import time import requests # 同步请求 def main(): start = time.time() for i in range(5): res = requests.get(\'http://httpbin.org/delay/2\') print(f\'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}\') print(f\'requests同步耗时:{time.time() - start}\') if __name__ == \'__main__\': main() \'\'\' 当前时间:2022-09-05 15:44:51.991685, status_code = 200 当前时间:2022-09-05 15:44:54.528918, status_code = 200 当前时间:2022-09-05 15:44:57.057373, status_code = 200 当前时间:2022-09-05 15:44:59.643119, status_code = 200 当前时间:2022-09-05 15:45:02.167362, status_code = 200 requests同步耗时:12.785893440246582 \'\'\'
可以看到5次请求总共用12.7秒,再来看同样的请求异步多少时间。
- aiohttp异步示例:
#!/usr/bin/env python # file: day6-9同步和异步.py # author: 钢铁知识库 import asyncio import time import aiohttp async def async_http(): # 声明一个支持异步的上下文管理器 async with aiohttp.ClientSession() as session: res = await session.get(\'http://httpbin.org/delay/2\') print(f\'当前时间:{datetime.datetime.now()}, status_code = {res.status}\') tasks = [async_http() for _ in range(5)] start = time.time() # Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替最后的启动操作 asyncio.run(asyncio.wait(tasks)) print(f\'aiohttp异步耗时:{time.time() - start}\') \'\'\' 当前时间:2022-09-05 15:42:32.363966, status_code = 200 当前时间:2022-09-05 15:42:32.366957, status_code = 200 当前时间:2022-09-05 15:42:32.374973, status_code = 200 当前时间:2022-09-05 15:42:32.384909, status_code = 200 当前时间:2022-09-05 15:42:32.390318, status_code = 200 aiohttp异步耗时:2.5826876163482666 \'\'\'
两次对比可以看到执行过程,时间一个是顺序执行,一个是同时执行。这就是同步和异步的区别。
aiohttp使用介绍
接下来我们会详细介绍aiohttp库的用法和爬取实战。aiohttp 是一个支持异步请求的库,它和 asyncio 配合使用,可以使我们非常方便地实现异步请求操作。asyncio模块,其内部实现了对TCP、UDP、SSL协议的异步操作,但是对于HTTP请求,就需要aiohttp实现了。
aiohttp分为两部分,一部分是Client,一部分是Server。下面来说说aiohttp客户端部分的用法。
基本实例
先写一个简单的案例
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 钢铁知识库 import asyncio import aiohttp async def get_api(session, url): # 声明一个支持异步的上下文管理器 async with session.get(url) as response: return await response.text(), response.status async def main(): async with aiohttp.ClientSession() as session: html, status = await get_api(session, \'http://httpbin.org/delay/2\') print(f\'html: {html[:50]}\') print(f\'status : {status}\') if __name__ == \'__main__\': # Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作 asyncio.get_event_loop().run_until_complete(main()) \'\'\' html: { \"args\": {}, \"data\": \"\", \"files\": {}, status : 200 Process finished with exit code 0 \'\'\'
aiohttp请求的方法和之前有明显区别,主要包括如下几点:
- 除了导入aiohttp库,还必须引入asyncio库,因为要实现异步,需要启动协程。
- 异步的方法定义不同,前面都要统一加async来修饰。
- with as用于声明上下文管理器,帮我们自动分配和释放资源,加上async代码支持异步。
- 对于返回协程对象的操作,前面需要加await来修饰。response.text()返回的是协程对象。
- 最后运行启用循环事件
注意:Python3.7及以后的版本中,可以使用asyncio.run(main())代替最后的启动操作。
URL参数设置
对于URL参数的设置,我们可以借助params设置,传入一个字典即可,实例如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 钢铁知识库 import aiohttp import asyncio async def main(): params = {\'name\': \'钢铁知识库\', \'age\': 23} async with aiohttp.ClientSession() as session: async with session.get(\'https://www.httpbin.org/get\', params=params) as res: print(await res.json()) if __name__ == \'__main__\': asyncio.get_event_loop().run_until_complete(main()) \'\'\' {\'args\': {\'age\': \'23\', \'name\': \'钢铁知识库\'}, \'headers\': {\'Accept\': \'*/*\', \'Accept-Encoding\': \'gzip, deflate\', \'Host\': \'www.httpbin.org\', \'User-Agent\': \'Python/3.8 aiohttp/3.8.1\', \'X-Amzn-Trace-Id\': \'Root=1-63162e34-1acf7bde7a6d801368494c72\'}, \'origin\': \'122.55.11.188\', \'url\': \'https://www.httpbin.org/get?name=钢铁知识库&age=23\'} \'\'\'
可以看到实际请求的URL后面带了后缀,这就是params的内容。
请求类型
除了get请求,aiohttp还支持其它请求类型,如POST、PUT、DELETE等,和requests使用方式类似。
session.post(\'http://httpbin.org/post\', data=b\'data\') session.put(\'http://httpbin.org/put\', data=b\'data\') session.delete(\'http://httpbin.org/delete\') session.head(\'http://httpbin.org/get\') session.options(\'http://httpbin.org/get\') session.patch(\'http://httpbin.org/patch\', data=b\'data\')
要使用这些方法,只需要把对应的方法和参数替换一下。用法和get类似就不再举例。
响应的几个方法
对于响应来说,我们可以用如下方法分别获取其中的响应情况。状态码、响应头、响应体、响应体二进制内容、响应体JSON结果,实例如下:
#!/usr/bin/env python # @Author : 钢铁知识库 import aiohttp import asyncio async def main(): data = {\'name\': \'钢铁知识库\', \'age\': 23} async with aiohttp.ClientSession() as session: async with session.post(\'https://www.httpbin.org/post\', data=data) as response: print(\'status:\', response.status) # 状态码 print(\'headers:\', response.headers) # 响应头 print(\'body:\', await response.text()) # 响应体 print(\'bytes:\', await response.read()) # 响应体二进制内容 print(\'json:\', await response.json()) # 响应体json数据 if __name__ == \'__main__\': asyncio.get_event_loop().run_until_complete(main())
\'\'\' status: 200 headers: <CIMultiDictProxy(\'Date\': \'Tue, 06 Sep 2022 00:18:36 GMT\', \'Content-Type\': \'application/json\', \'Content-Length\': \'534\', \'Connection\': \'keep-alive\', \'Server\': \'gunicorn/19.9.0\', \'Access-Control-Allow-Origin\': \'*\', \'Access-Control-Allow-Credentials\': \'true\')> body: { \"args\": {}, \"data\": \"\", \"files\": {}, \"form\": { \"age\": \"23\", \"name\": \"\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93\" }, \"headers\": { \"Accept\": \"*/*\", \"Accept-Encoding\": \"gzip, deflate\", \"Content-Length\": \"57\", \"Content-Type\": \"application/x-www-form-urlencoded\", \"Host\": \"www.httpbin.org\", \"User-Agent\": \"Python/3.8 aiohttp/3.8.1\", \"X-Amzn-Trace-Id\": \"Root=1-631691dc-6aa1b2b85045a1a0481d06e1\" }, \"json\": null, \"origin\": \"122.55.11.188\", \"url\": \"https://www.httpbin.org/post\" } bytes: b\'{\\n \"args\": {}, \\n \"data\": \"\", \\n \"files\": {}, \\n \"form\": {\\n \"age\": \"23\", \\n \"name\": \"\\\\u94a2\\\\u94c1\\\\u77e5\\\\u8bc6\\\\u5e93\"\\n }, \\n \"headers\": {\\n \"Accept\": \"*/*\", \\n \"Accept-Encoding\": \"gzip, deflate\", \\n \"Content-Length\": \"57\", \\n \"Content-Type\": \"application/x-www-form-urlencoded\", \\n \"Host\": \"www.httpbin.org\", \\n \"User-Agent\": \"Python/3.8 aiohttp/3.8.1\", \\n \"X-Amzn-Trace-Id\": \"Root=1-631691dc-6aa1b2b85045a1a0481d06e1\"\\n }, \\n \"json\": null, \\n \"origin\": \"122.5.132.196\", \\n \"url\": \"https://www.httpbin.org/post\"\\n}\\n\' json: {\'args\': {}, \'data\': \'\', \'files\': {}, \'form\': {\'age\': \'23\', \'name\': \'钢铁知识库\'}, \'headers\': {\'Accept\': \'*/*\', \'Accept-Encoding\': \'gzip, deflate\', \'Content-Length\': \'57\', \'Content-Type\': \'application/x-www-form-urlencoded\', \'Host\': \'www.httpbin.org\', \'User-Agent\': \'Python/3.8 aiohttp/3.8.1\', \'X-Amzn-Trace-Id\': \'Root=1-631691dc-6aa1b2b85045a1a0481d06e1\'}, \'json\': None, \'origin\': \'122.55.11.188\', \'url\': \'https://www.httpbin.org/post\'} \'\'\'
可以看到有些字段前面需要加await,因为其返回的是一个协程对象(如async修饰的方法),那么前面就要加await。
超时设置
我们可以借助ClientTimeout
对象设置超时,例如要设置1秒的超时时间,可以这么实现:
#!/usr/bin/env python # @Author : 钢铁知识库 import aiohttp import asyncio async def main(): # 设置 1 秒的超时 timeout = aiohttp.ClientTimeout(total=1) data = {\'name\': \'钢铁知识库\', \'age\': 23} async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(\'https://www.httpbin.org/delay/2\', data=data) as response: print(\'status:\', response.status) # 状态码 if __name__ == \'__main__\': asyncio.get_event_loop().run_until_complete(main()) \'\'\' Traceback (most recent call last): ####中间省略#### raise asyncio.TimeoutError from None asyncio.exceptions.TimeoutError \'\'\'
这里设置了超时1秒请求延时2秒,发现抛出异常asyncio.TimeoutError
,如果正常则响应200。
并发限制
aiohttp可以支持非常高的并发量,但面对高并发网站可能会承受不住,随时有挂掉的危险,这时需要对并发进行一些控制。现在我们借助asyncio 的Semaphore来控制并发量,实例如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 钢铁知识库 import asyncio from datetime import datetime import aiohttp # 声明最大并发量 semaphore = asyncio.Semaphore(2) async def get_api(): async with semaphore: print(f\'scrapting...{datetime.now()}\') async with session.get(\'https://www.baidu.com\') as response: await asyncio.sleep(2) # print(f\'当前时间:{datetime.now()}, {response.status}\') async def main(): global session session = aiohttp.ClientSession() tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)] await asyncio.gather(*tasks) await session.close() if __name__ == \'__main__\': asyncio.get_event_loop().run_until_complete(main()) \'\'\' scrapting...2022-09-07 08:11:14.190000 scrapting...2022-09-07 08:11:14.292000 scrapting...2022-09-07 08:11:16.482000 scrapting...2022-09-07 08:11:16.504000 scrapting...2022-09-07 08:11:18.520000 scrapting...2022-09-07 08:11:18.521000 \'\'\'
在main方法里,我们声明了1000个task,如果没有通过Semaphore进行并发限制,那这1000放到gather方法后会被同时执行,并发量相当大。有了信号量的控制之后,同时运行的task数量就会被控制,这样就能给aiohttp限制速度了。
aiohttp异步爬取实战
接下来我们通过异步方式练手一个小说爬虫,需求如下:
需求页面:https://dushu.baidu.com/pc/detail?gid=4308080950
目录接口:https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}
详情接口:
https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}
关键参数:book_id
:小说ID、cid
:章节id
采集要求:使用协程方式写入,数据存放进mongo
需求分析:点开需求页面,通过F12抓包可以发现两个接口。一个目录接口,一个详情接口。
首先第一步先请求目录接口拿到cid章节id,然后将cid传递给详情接口拿到小说数据,最后存入mongo即可。
话不多说,直接上代码:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 钢铁知识库 # 不合适就是不合适,真正合适的,你不会有半点犹豫。 import asyncio import json,re import logging import aiohttp import requests from utils.conn_db import ConnDb # 日志格式 logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(levelname)s: %(message)s\') # 章节目录api b_id = \'4308080950\' url = \'https://dushu.baidu.com/api/pc/getCatalog?data={\"book_id\":\"\'+b_id+\'\"}\' headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) \" \"Chrome/104.0.0.0 Safari/537.36\" } # 并发声明 semaphore = asyncio.Semaphore(5) async def download(title,b_id, cid): data = { \"book_id\": b_id, \"cid\": f\'{b_id}|{cid}\', } data = json.dumps(data) detail_url = \'https://dushu.baidu.com/api/pc/getChapterContent?data={}\'.format(data) async with semaphore: async with aiohttp.ClientSession(headers=headers) as session: async with session.get(detail_url) as response: res = await response.json() content = { \'title\': title, \'content\': res[\'data\'][\'novel\'][\'content\'] } # print(title) await save_data(content) async def save_data(data): if data: client = ConnDb().conn_motor_mongo() db = client.baidu_novel collection = db.novel logging.info(\'saving data %s\', data) await collection.update_one( {\'title\': data.get(\'title\')}, {\'$set\': data}, upsert=True ) async def main(): res = requests.get(url, headers=headers) tasks = [] for re in res.json()[\'data\'][\'novel\'][\'items\']: # 拿到某小说目录cid title = re[\'title\'] cid = re[\'cid\'] tasks.append(download(title, b_id, cid)) # 将请求放到列表里,再通过gather执行并发 await asyncio.gather(*tasks) if __name__ == \'__main__\': asyncio.run(main())
至此,我们就使用aiohttp完成了对小说章节的爬取。
要实现异步处理,得先要有挂起操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样才能充分利用好资源,要实现异步,需要了解 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。
await 后面的对象必须是如下格式之一:
- A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
- A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine 对象。
- An object with an await method returning an iterator,一个包含 await 方法的对象返回的一个迭代器。
总结
以上就是借助协程async和异步aiohttp两个主要模块完成异步爬虫的内容,
aiohttp 以异步方式爬取网站的耗时远小于 requests 同步方式,以上列举的例子希望对你有帮助。
注意,线程和协程是两个概念,后面找机会我们再聊聊进程和线程、线程和协程的关系
更多关于python aiohttp异步爬虫的资料请关注其它相关文章!
暂无评论内容