aiohttp
aiohttp 是一个异步 HTTP 客户端/服务器库,基于 asyncio,适合高并发场景。
安装
pip install aiohttp
基本 GET 请求
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
url = 'https://api.github.com'
content = await fetch(url)
print(content[:100])
asyncio.run(main())
并发请求
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://api.github.com',
'https://httpbin.org/get',
'https://jsonplaceholder.typicode.com/posts/1'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1}: {len(result)} 字符")
asyncio.run(main())
POST 请求
import aiohttp
import asyncio
async def post_data():
data = {'name': '张三', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('https://httpbin.org/post', json=data) as response:
result = await response.json()
print(result)
asyncio.run(post_data())
设置请求头
import aiohttp
import asyncio
async def fetch_with_headers():
headers = {
'User-Agent': 'MyApp/1.0',
'Authorization': 'Bearer YOUR_TOKEN'
}
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com/user', headers=headers) as response:
return await response.text()
asyncio.run(fetch_with_headers())
超时控制
import aiohttp
import asyncio
async def fetch_with_timeout():
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://api.github.com') as response:
return await response.text()
except asyncio.TimeoutError:
print("请求超时")
asyncio.run(fetch_with_timeout())
文件下载
import aiohttp
import asyncio
async def download_file():
url = 'https://example.com/file.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
with open('downloaded.txt', 'wb') as f:
async for chunk in response.content.iter_chunked(1024):
f.write(chunk)
print("下载完成")
asyncio.run(download_file())
文件上传
import aiohttp
import asyncio
async def upload_file():
url = 'https://httpbin.org/post'
async with aiohttp.ClientSession() as session:
with open('example.txt', 'rb') as f:
files = {'file': f}
async with session.post(url, data=files) as response:
result = await response.json()
print(result)
asyncio.run(upload_file())
WebSocket 客户端
import aiohttp
import asyncio
async def websocket_client():
uri = 'wss://echo.websocket.org'
async with aiohttp.ClientSession() as session:
async with session.ws_connect(uri) as ws:
# 发送消息
await ws.send_str('Hello, WebSocket!')
# 接收消息
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"收到消息: {msg.data}")
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
asyncio.run(websocket_client())
💡 提示:aiohttp 适合高并发场景,性能优于同步的 Requests 库。