Python 高级特性
装饰器
# 装饰器本质:闭包
def timer(func):
def wrapper(*args, **kwargs):
import time
start = time.time()
result = func(*args, **kwargs)
print(f"耗时: {time.time() - start:.2f}s")
return result
return wrapper
@timer
def slow_func():
import time
time.sleep(1)
return "done"
slow_func()
带参数的装饰器
def retry(times=3):
def decorator(func):
def wrapper(*args, **kwargs):
for _ in range(times):
try:
return func(*args, **kwargs)
except:
continue
return None
return wrapper
return decorator
@retry(times=5)
def fetch():
pass
异步编程
import asyncio
async def main():
# 并发任务
results = await asyncio.gather(
asyncio.sleep(1, result="a"),
asyncio.sleep(1, result="b")
)
print(results) # ['a', 'b']
asyncio.run(main())
上下文管理器
# 类方式
class File:
def __enter__(self):
self.f = open("file.txt")
return self.f
def __exit__(self, *args):
self.f.close()
# with 使用
with File() as f:
f.read()
# 或用 contextlib
from contextlib import contextmanager
@contextmanager
def timer():
import time
start = time.time()
yield
print(f"耗时: {time.time() - start:.2f}")
🔄 同步 vs 异步对比
flowchart LR
subgraph 同步
A1[任务1开始] --> A2[任务1结束]
A2 --> A3[任务2开始]
A3 --> A4[任务2结束]
end
subgraph 异步
B1[任务1开始] --> B2[任务2开始]
B2 --> B3[任务1结束]
B3 --> B4[任务2结束]
end
style 同步 fill:#e8f5e9
style 异步 fill:#e3f2fd
列表推导式
# 传统方式
squares = []
for i in range(10):
squares.append(i ** 2)
# 列表推导式
squares = [i ** 2 for i in range(10)]
print(squares) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 带条件的列表推导式
even_numbers = [i for i in range(20) if i % 2 == 0]
print(even_numbers) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 嵌套列表推导式
matrix = [[i * j for j in range(3)] for i in range(3)]
print(matrix) # [[0, 0, 0], [0, 1, 2], [0, 2, 4]]
字典推导式
# 创建字典
squares_dict = {i: i ** 2 for i in range(5)}
print(squares_dict) # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}
# 字典转换
prices = {'apple': 5, 'banana': 3, 'orange': 4}
discounted = {k: v * 0.9 for k, v in prices.items()}
print(discounted) # {'apple': 4.5, 'banana': 2.7, 'orange': 3.6}
生成器
# 生成器函数
def fibonacci(n):
"""生成斐波那契数列"""
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# 使用生成器
for num in fibonacci(10):
print(num, end=' ') # 0 1 1 2 3 5 8 13 21 34
# 生成器表达式
squares_gen = (i ** 2 for i in range(5))
print(list(squares_gen)) # [0, 1, 4, 9, 16]
装饰器
装饰器是一种用于修改函数或类行为的工具,本质上是一个返回函数的高阶函数。
# 简单的装饰器
def decorator(func):
def wrapper():
print("执行前...")
func()
print("执行后...")
return wrapper
@decorator
def say_hello():
print("Hello!")
say_hello()
# 输出:
# 执行前...
# Hello!
# 执行后...
带参数的装饰器
def repeat(times):
"""重复执行指定次数的装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
for _ in range(times):
result = func(*args, **kwargs)
return result
return wrapper
return decorator
@repeat(3)
def greet(name):
print(f"你好,{name}!")
greet("Python")
# 输出:
# 你好,Python!
# 你好,Python!
# 你好,Python!
多个装饰器
def decorator1(func):
def wrapper(*args, **kwargs):
print("装饰器 1: 执行前")
result = func(*args, **kwargs)
print("装饰器 1: 执行后")
return result
return wrapper
def decorator2(func):
def wrapper(*args, **kwargs):
print("装饰器 2: 执行前")
result = func(*args, **kwargs)
print("装饰器 2: 执行后")
return result
return wrapper
@decorator1
@decorator2
def say_hi():
print("Hi!")
say_hi()
# 输出:
# 装饰器 1: 执行前
# 装饰器 2: 执行前
# Hi!
# 装饰器 2: 执行后
# 装饰器 1: 执行后
类装饰器
class CountCalls:
"""统计函数调用次数的类装饰器"""
def __init__(self, func):
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
print(f"第 {self.count} 次调用")
return self.func(*args, **kwargs)
@CountCalls
def say_hello():
print("Hello!")
say_hello() # 第 1 次调用
say_hello() # 第 2 次调用
异步编程基础
Python 的 asyncio 模块提供了异步编程的支持,适用于 I/O 密集型任务。
import asyncio
async def say_hello():
"""异步函数"""
print("开始...")
await asyncio.sleep(1) # 模拟异步操作
print("Hello!")
print("结束...")
# 运行异步函数
asyncio.run(say_hello())
多个异步任务并发
import asyncio
async def fetch_data(name, delay):
"""模拟异步数据获取"""
print(f"{name}: 开始获取数据...")
await asyncio.sleep(delay)
print(f"{name}: 数据获取完成")
return f"{name} 的数据"
async def main():
# 并发执行多个任务
results = await asyncio.gather(
fetch_data("任务 A", 2),
fetch_data("任务 B", 1),
fetch_data("任务 C", 3)
)
print(f"所有结果:{results}")
# 运行
asyncio.run(main())
# 总耗时约 3 秒(最长时间的任务)
async 和 await
import asyncio
async def producer(queue, name):
"""生产者协程"""
for i in range(3):
item = f"{name}-{i}"
await queue.put(item)
print(f"{name} 生产了:{item}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
"""消费者协程"""
while True:
item = await queue.get()
if item is None:
break
print(f"{name} 消费了:{item}")
queue.task_done()
await asyncio.sleep(0.3)
async def main():
queue = asyncio.Queue()
# 创建生产者和消费者任务
producers = [producer(queue, f"生产者-{i}") for i in range(2)]
consumers = [consumer(queue, f"消费者-{i}") for i in range(3)]
# 等待生产者完成
await asyncio.gather(*producers)
# 等待队列处理完成
await queue.join()
# 停止消费者
for _ in range(3):
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main())
协程(Coroutine)
协程是一种比线程更轻量级的并发机制,在单线程中实现并发执行。
import asyncio
async def coroutine(name, delay):
"""协程函数"""
for i in range(3):
print(f"{name}: 第 {i+1} 次执行")
await asyncio.sleep(delay)
print(f"{name}: 执行完成")
async def main():
# 创建协程任务
task1 = asyncio.create_task(coroutine("协程 A", 0.5))
task2 = asyncio.create_task(coroutine("协程 B", 0.7))
# 等待所有任务完成
await task1
await task2
print("所有协程执行完毕")
asyncio.run(main())
异步上下文管理器
import asyncio
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("获取资源...")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源...")
await asyncio.sleep(0.3)
async def main():
async with AsyncResource() as resource:
print("使用资源中...")
await asyncio.sleep(1)
asyncio.run(main())
异步迭代器
import asyncio
class AsyncCounter:
"""异步计数器"""
def __init__(self, max_count):
self.max_count = max_count
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.max_count:
raise StopAsyncIteration
self.current += 1
await asyncio.sleep(0.5)
return self.current
async def main():
async for num in AsyncCounter(5):
print(f"当前数字:{num}")
asyncio.run(main())
lambda 函数
# lambda 函数(匿名函数)
add = lambda x, y: x + y
print(add(3, 5)) # 8
# 与内置函数结合使用
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x ** 2, numbers))
print(squared) # [1, 4, 9, 16, 25]
# 过滤
even = list(filter(lambda x: x % 2 == 0, numbers))
print(even) # [2, 4]
上下文管理器
class Timer:
"""自定义上下文管理器"""
import time
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end = time.time()
print(f"代码块执行时间: {self.end - self.start:.4f} 秒")
with Timer():
import time
time.sleep(0.5)
print("执行一些操作...")
Python 特性总结:
- 🎯 列表推导式:简洁地创建列表
- ⚡ 生成器:惰性求值,节省内存
- 🎨 装饰器:优雅地扩展函数功能
- 🔧 上下文管理器:自动管理资源
- 📝 lambda 函数:简洁的匿名函数
Fabric
Fabric 是一个 Python 库和命令行工具,用于简化 SSH 远程执行和系统管理任务。
安装
pip install fabric
基本使用
from fabric import Connection
# 创建连接
conn = Connection('user@hostname')
# 执行远程命令
result = conn.run('ls -la', hide=True)
print(result.stdout)
# 上传文件
conn.put('local_file.txt', '/remote/path/')
# 下载文件
conn.get('/remote/path/remote_file.txt', 'local_path/')
# 关闭连接
conn.close()
批量操作多台服务器
from fabric import Connection
# 服务器列表
servers = [
{'host': 'server1.example.com', 'user': 'admin'},
{'host': 'server2.example.com', 'user': 'admin'},
{'host': 'server3.example.com', 'user': 'admin'}
]
# 批量执行命令
for server in servers:
conn = Connection(f"{server['user']}@{server['host']}")
result = conn.run('uptime', hide=True)
print(f"{server['host']}: {result.stdout.strip()}")
conn.close()
Sudo 命令
from fabric import Connection
conn = Connection('user@hostname')
# 使用 sudo 执行命令
result = conn.sudo('apt-get update', hide=True)
print(result.stdout)
# sudo 带密码
result = conn.sudo('systemctl restart nginx', password='your_password')
conn.close()
上下文管理器
from fabric import Connection
conn = Connection('user@hostname')
# 切换目录
with conn.cd('/var/www'):
conn.run('ls -la')
# 设置环境变量
with conn.prefix('source /etc/profile'):
conn.run('echo $PATH')
# 激活虚拟环境
with conn.prefix('source /path/to/venv/bin/activate'):
conn.run('pip list')
conn.close()
并行执行
from fabric import ThreadingGroup
# 创建连接组
group = ThreadingGroup(
'user@server1',
'user@server2',
'user@server3'
)
# 并行执行命令
results = group.run('hostname')
for conn, result in results.items():
print(f"{conn.host}: {result.stdout.strip()}")
# 并行上传
group.put('config.txt', '/tmp/')
使用 Fabric 任务
from fabric import task, Connection
@task
def deploy(c):
"""部署应用"""
conn = Connection('user@hostname')
# 拉取代码
conn.run('cd /var/www/app && git pull')
# 安装依赖
with conn.prefix('source venv/bin/activate'):
conn.run('pip install -r requirements.txt')
# 重启服务
conn.sudo('systemctl restart myapp')
print("部署完成")
@task
def check_status(c):
"""检查服务状态"""
conn = Connection('user@hostname')
result = conn.sudo('systemctl status myapp', hide=True)
print(result.stdout)
💡 提示:Fabric 适合自动化运维、批量服务器管理等场景,可以大大简化 SSH 操作。
psutil
psutil 是一个跨平台库,用于获取系统和进程信息,如 CPU、内存、磁盘、网络等。
安装
pip install psutil
CPU 信息
import psutil
# CPU 逻辑核心数
print(f"CPU 核心数: {psutil.cpu_count(logical=True)}")
print(f"CPU 物理核心数: {psutil.cpu_count(logical=False)}")
# CPU 使用率
print(f"CPU 使用率: {psutil.cpu_percent(interval=1)}%")
# 每个 CPU 的使用率
print(f"各 CPU 使用率: {psutil.cpu_percent(interval=1, percpu=True)}")
# CPU 时间统计
print(f"CPU 时间: {psutil.cpu_times()}")
# CPU 频率
print(f"CPU 频率: {psutil.cpu_freq()}")
内存信息
import psutil
# 内存信息
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / (1024**3):.2f} GB")
print(f"可用内存: {mem.available / (1024**3):.2f} GB")
print(f"已用内存: {mem.used / (1024**3):.2f} GB")
print(f"内存使用率: {mem.percent}%")
# 交换内存信息
swap = psutil.swap_memory()
print(f"总交换内存: {swap.total / (1024**3):.2f} GB")
print(f"已用交换内存: {swap.used / (1024**3):.2f} GB")
磁盘信息
import psutil
# 磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
print(f"设备: {partition.device}")
print(f"挂载点: {partition.mountpoint}")
print(f"文件系统: {partition.fstype}")
# 磁盘使用情况
disk_usage = psutil.disk_usage('/')
print(f"总空间: {disk_usage.total / (1024**3):.2f} GB")
print(f"已用空间: {disk_usage.used / (1024**3):.2f} GB")
print(f"可用空间: {disk_usage.free / (1024**3):.2f} GB")
print(f"使用率: {disk_usage.percent}%")
# 磁盘 I/O 统计
disk_io = psutil.disk_io_counters()
print(f"读次数: {disk_io.read_count}")
print(f"写字节数: {disk_io.write_bytes / (1024**2):.2f} MB")
网络信息
import psutil
# 网络接口
net_io = psutil.net_io_counters()
print(f"发送字节数: {net_io.bytes_sent / (1024**2):.2f} MB")
print(f"接收字节数: {net_io.bytes_recv / (1024**2):.2f} MB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")
# 网络接口详情
interfaces = psutil.net_if_addrs()
for interface, addrs in interfaces.items():
print(f"接口: {interface}")
for addr in addrs:
print(f" 地址: {addr.address}")
# 网络连接
connections = psutil.net_connections(kind='inet')
for conn in connections[:5]:
print(f"本地地址: {conn.laddr}, 远程地址: {conn.raddr}, 状态: {conn.status}")
进程信息
import psutil
# 获取当前进程
current_process = psutil.Process()
print(f"进程 ID: {current_process.pid}")
print(f"进程名称: {current_process.name()}")
print(f"进程状态: {current_process.status()}")
print(f"创建时间: {current_process.create_time()}")
print(f"CPU 使用率: {current_process.cpu_percent(interval=1)}%")
print(f"内存使用: {current_process.memory_info().rss / (1024**2):.2f} MB")
# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'username']):
print(f"PID: {proc.info['pid']}, 名称: {proc.info['name']}, 用户: {proc.info['username']}")
# 查找特定进程
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == 'python':
print(f"找到 Python 进程: PID {proc.info['pid']}")
系统信息
import psutil
import platform
# 系统信息
print(f"系统: {platform.system()}")
print(f"版本: {platform.release()}")
print(f"架构: {platform.machine()}")
# 启动时间
boot_time = psutil.boot_time()
print(f"系统启动时间: {boot_time}")
# 用户信息
users = psutil.users()
for user in users:
print(f"用户: {user.name}, 终端: {user.terminal}, 主机: {user.host}")
监控脚本示例
import psutil
import time
def monitor_system(interval=5):
"""监控系统资源"""
while True:
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用率
mem = psutil.virtual_memory()
mem_percent = mem.percent
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = disk.percent
print(f"CPU: {cpu_percent}%, 内存: {mem_percent}%, 磁盘: {disk_percent}%")
time.sleep(interval)
# monitor_system()
💡 提示:psutil 适合系统监控、性能分析、资源管理等场景,支持 Windows、Linux、macOS 等多个平台。
logging
logging 是 Python 内置的日志记录模块,提供了灵活的日志记录功能。
基本使用
import logging
# 配置日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 不同级别的日志
logging.debug('调试信息')
logging.info('普通信息')
logging.warning('警告信息')
logging.error('错误信息')
logging.critical('严重错误')
日志级别
import logging
# 设置日志级别
logging.basicConfig(level=logging.DEBUG)
# DEBUG: 10 - 调试信息
logging.debug('这是调试信息')
# INFO: 20 - 普通信息
logging.info('这是普通信息')
# WARNING: 30 - 警告信息
logging.warning('这是警告信息')
# ERROR: 40 - 错误信息
logging.error('这是错误信息')
# CRITICAL: 50 - 严重错误
logging.critical('这是严重错误')
记录到文件
import logging
# 配置日志文件
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='app.log',
filemode='a' # 追加模式
)
logging.info('这条日志会写入文件')
logging.error('错误也会写入文件')
创建 Logger
import logging
# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
# 创建处理器
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('app.log')
# 设置处理器级别
console_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.DEBUG)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 使用 logger
logger.debug('调试信息')
logger.info('普通信息')
logger.warning('警告信息')
logger.error('错误信息')
日志轮转
import logging
from logging.handlers import RotatingFileHandler
# 创建轮转文件处理器
handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5 # 保留5个备份
)
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.info('这条日志会写入文件,文件超过10MB会自动轮转')
按时间轮转
import logging
from logging.handlers import TimedRotatingFileHandler
# 按天轮转
handler = TimedRotatingFileHandler(
'app.log',
when='midnight', # 每天午夜轮转
interval=1,
backupCount=7 # 保留7天的日志
)
handler.suffix = '%Y-%m-%d' # 文件名后缀
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.info('这条日志每天午夜会自动轮转')
异常跟踪
import logging
logging.basicConfig(
level=logging.ERROR,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
result = 10 / 0
except Exception as e:
# 记录异常信息
logging.error('发生错误', exc_info=True)
# 或者
logging.exception('发生错误')
结构化日志
import logging
import json
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
return json.dumps(log_data, ensure_ascii=False)
# 使用结构化格式
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger = logging.getLogger('structured')
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger.info('这是结构化日志')
💡 提示:logging 模块功能强大,建议在生产环境中使用,而不是使用 print 语句。
Ansible
Ansible 是一个自动化配置管理工具,Python 可以通过 ansible-runner 或 ansible-runner 接口调用 Ansible。
安装
pip install ansible-runner
使用 ansible-runner
import ansible_runner
# 执行 Ad-hoc 命令
r = ansible_runner.run(
private_data_dir='/tmp/ansible',
module='shell',
module_args='ls -la',
inventory='localhost,',
pattern='all'
)
print(f"状态: {r.status}")
print(f"输出: {r.stdout}")
print(f"错误: {r.stderr}")
执行 Playbook
import ansible_runner
# 执行 Playbook
r = ansible_runner.run(
private_data_dir='/tmp/ansible',
playbook='site.yml'
)
print(f"状态: {r.status}")
print(f"输出: {r.stdout}")
# 获取事件
for event in r.events:
print(event)
动态 Inventory
import ansible_runner
# 动态 inventory
inventory = {
'all': {
'hosts': {
'server1': {'ansible_host': '192.168.1.1'},
'server2': {'ansible_host': '192.168.1.2'}
}
}
}
r = ansible_runner.run(
private_data_dir='/tmp/ansible',
module='ping',
inventory=inventory
)
print(f"输出: {r.stdout}")
异步执行
import ansible_runner
# 异步执行
r = ansible_runner.run_async(
private_data_dir='/tmp/ansible',
playbook='site.yml'
)
# 检查状态
while not r.rc:
print("正在执行...")
time.sleep(1)
print(f"完成: {r.rc}")
使用 Ansible Python API
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.inventory.manager import InventoryManager
from ansible.vars.manager import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
# 加载 inventory
loader = DataLoader()
inventory = InventoryManager(loader=loader, sources='inventory')
variable_manager = VariableManager(loader=loader, inventory=inventory)
# 创建 PlaybookExecutor
playbook_path = 'site.yml'
executor = PlaybookExecutor(
playbooks=[playbook_path],
inventory=inventory,
variable_manager=variable_manager,
loader=loader
)
# 执行
executor.run()
💡 提示:Ansible 适合自动化配置管理、应用部署、任务编排等场景。