异步编程
asyncio 是 Python 的异步 I/O 库,用于编写并发代码,使用 async/await 语法。
async 和 await
import asyncio
# 定义异步函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟耗时操作
print("World")
# 运行异步函数
asyncio.run(say_hello())
创建任务
import asyncio
async def worker(name, delay):
print(f"{name} 开始工作")
await asyncio.sleep(delay)
print(f"{name} 完成工作")
return f"{name} 的结果"
async def main():
# 创建任务
task1 = asyncio.create_task(worker("工作1", 2))
task2 = asyncio.create_task(worker("工作2", 1))
task3 = asyncio.create_task(worker("工作3", 3))
# 等待所有任务完成
await task1
await task2
await task3
print("所有任务完成")
asyncio.run(main())
并发执行
import asyncio
async def fetch_url(url):
print(f"开始获取 {url}")
await asyncio.sleep(2) # 模拟网络请求
print(f"完成获取 {url}")
return f"{url} 的内容"
async def main():
urls = [
'https://api.github.com',
'https://httpbin.org/get',
'https://jsonplaceholder.typicode.com/posts/1'
]
# 并发执行
tasks = [fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 设置超时为 2 秒
result = await asyncio.wait_for(slow_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main())
异步 HTTP 请求
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://api.github.com',
'https://httpbin.org/get',
'https://jsonplaceholder.typicode.com/posts/1'
]
# 并发请求
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1}: {len(result)} 字符")
asyncio.run(main())
异步文件操作
import aiofiles
import asyncio
async def read_file(filename):
async with aiofiles.open(filename, 'r', encoding='utf-8') as file:
content = await file.read()
return content
async def write_file(filename, content):
async with aiofiles.open(filename, 'w', encoding='utf-8') as file:
await file.write(content)
async def main():
# 写入文件
await write_file('example.txt', 'Hello, Async World!')
# 读取文件
content = await read_file('example.txt')
print(content)
asyncio.run(main())
异步迭代器
import asyncio
class AsyncIterator:
def __init__(self, n):
self.n = n
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.i < self.n:
self.i += 1
await asyncio.sleep(0.5)
return self.i
else:
raise StopAsyncIteration
async def main():
async for i in AsyncIterator(5):
print(f"迭代: {i}")
asyncio.run(main())
异步上下文管理器
import asyncio
class AsyncContextManager:
async def __aenter__(self):
print("进入上下文")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出上下文")
await asyncio.sleep(0.5)
async def main():
async with AsyncContextManager():
print("在上下文中执行操作")
asyncio.run(main())
同步和异步混合
import asyncio
import time
def blocking_operation():
"""同步阻塞操作"""
time.sleep(2)
return "同步完成"
async def async_operation():
"""异步操作"""
await asyncio.sleep(2)
return "异步完成"
async def main():
# 在事件循环中运行同步操作
loop = asyncio.get_event_loop()
result1 = await loop.run_in_executor(None, blocking_operation)
print(result1)
# 运行异步操作
result2 = await async_operation()
print(result2)
asyncio.run(main())
事件循环
import asyncio
async def task1():
print("任务1 开始")
await asyncio.sleep(1)
print("任务1 完成")
async def task2():
print("任务2 开始")
await asyncio.sleep(2)
print("任务2 完成")
async def main():
# 获取事件循环
loop = asyncio.get_event_loop()
# 创建任务
t1 = loop.create_task(task1())
t2 = loop.create_task(task2())
# 等待任务完成
await t1
await t2
print("所有任务完成")
asyncio.run(main())
💡 提示:异步编程适合 I/O 密集型任务,如网络请求、文件操作等,可以显著提高性能。