高级特性
Python 提供了许多强大的高级特性,可以让代码更简洁、更优雅。
列表推导式
# 传统方式
squares = []
for i in range(10):
squares.append(i ** 2)
# 列表推导式
squares = [i ** 2 for i in range(10)]
print(squares) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 带条件的列表推导式
even_numbers = [i for i in range(20) if i % 2 == 0]
print(even_numbers) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 嵌套列表推导式
matrix = [[i * j for j in range(3)] for i in range(3)]
print(matrix) # [[0, 0, 0], [0, 1, 2], [0, 2, 4]]
字典推导式
# 创建字典
squares_dict = {i: i ** 2 for i in range(5)}
print(squares_dict) # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}
# 字典转换
prices = {'apple': 5, 'banana': 3, 'orange': 4}
discounted = {k: v * 0.9 for k, v in prices.items()}
print(discounted) # {'apple': 4.5, 'banana': 2.7, 'orange': 3.6}
生成器
# 生成器函数
def fibonacci(n):
"""生成斐波那契数列"""
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# 使用生成器
for num in fibonacci(10):
print(num, end=' ') # 0 1 1 2 3 5 8 13 21 34
# 生成器表达式
squares_gen = (i ** 2 for i in range(5))
print(list(squares_gen)) # [0, 1, 4, 9, 16]
装饰器
def timer(func):
"""计时装饰器"""
import time
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} 执行时间: {end - start:.4f} 秒")
return result
return wrapper
@timer
def slow_function():
import time
time.sleep(1)
return "完成"
slow_function() # 输出执行时间
lambda 函数
# lambda 函数(匿名函数)
add = lambda x, y: x + y
print(add(3, 5)) # 8
# 与内置函数结合使用
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x ** 2, numbers))
print(squared) # [1, 4, 9, 16, 25]
# 过滤
even = list(filter(lambda x: x % 2 == 0, numbers))
print(even) # [2, 4]
上下文管理器
class Timer:
"""自定义上下文管理器"""
import time
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end = time.time()
print(f"代码块执行时间: {self.end - self.start:.4f} 秒")
with Timer():
import time
time.sleep(0.5)
print("执行一些操作...")
Python 特性总结:
- 🎯 列表推导式:简洁地创建列表
- ⚡ 生成器:惰性求值,节省内存
- 🎨 装饰器:优雅地扩展函数功能
- 🔧 上下文管理器:自动管理资源
- 📝 lambda 函数:简洁的匿名函数
Fabric
Fabric 是一个 Python 库和命令行工具,用于简化 SSH 远程执行和系统管理任务。
安装
pip install fabric
基本使用
from fabric import Connection
# 创建连接
conn = Connection('user@hostname')
# 执行远程命令
result = conn.run('ls -la', hide=True)
print(result.stdout)
# 上传文件
conn.put('local_file.txt', '/remote/path/')
# 下载文件
conn.get('/remote/path/remote_file.txt', 'local_path/')
# 关闭连接
conn.close()
批量操作多台服务器
from fabric import Connection
# 服务器列表
servers = [
{'host': 'server1.example.com', 'user': 'admin'},
{'host': 'server2.example.com', 'user': 'admin'},
{'host': 'server3.example.com', 'user': 'admin'}
]
# 批量执行命令
for server in servers:
conn = Connection(f"{server['user']}@{server['host']}")
result = conn.run('uptime', hide=True)
print(f"{server['host']}: {result.stdout.strip()}")
conn.close()
Sudo 命令
from fabric import Connection
conn = Connection('user@hostname')
# 使用 sudo 执行命令
result = conn.sudo('apt-get update', hide=True)
print(result.stdout)
# sudo 带密码
result = conn.sudo('systemctl restart nginx', password='your_password')
conn.close()
上下文管理器
from fabric import Connection
conn = Connection('user@hostname')
# 切换目录
with conn.cd('/var/www'):
conn.run('ls -la')
# 设置环境变量
with conn.prefix('source /etc/profile'):
conn.run('echo $PATH')
# 激活虚拟环境
with conn.prefix('source /path/to/venv/bin/activate'):
conn.run('pip list')
conn.close()
并行执行
from fabric import ThreadingGroup
# 创建连接组
group = ThreadingGroup(
'user@server1',
'user@server2',
'user@server3'
)
# 并行执行命令
results = group.run('hostname')
for conn, result in results.items():
print(f"{conn.host}: {result.stdout.strip()}")
# 并行上传
group.put('config.txt', '/tmp/')
使用 Fabric 任务
from fabric import task, Connection
@task
def deploy(c):
"""部署应用"""
conn = Connection('user@hostname')
# 拉取代码
conn.run('cd /var/www/app && git pull')
# 安装依赖
with conn.prefix('source venv/bin/activate'):
conn.run('pip install -r requirements.txt')
# 重启服务
conn.sudo('systemctl restart myapp')
print("部署完成")
@task
def check_status(c):
"""检查服务状态"""
conn = Connection('user@hostname')
result = conn.sudo('systemctl status myapp', hide=True)
print(result.stdout)
💡 提示:Fabric 适合自动化运维、批量服务器管理等场景,可以大大简化 SSH 操作。
psutil
psutil 是一个跨平台库,用于获取系统和进程信息,如 CPU、内存、磁盘、网络等。
安装
pip install psutil
CPU 信息
import psutil
# CPU 逻辑核心数
print(f"CPU 核心数: {psutil.cpu_count(logical=True)}")
print(f"CPU 物理核心数: {psutil.cpu_count(logical=False)}")
# CPU 使用率
print(f"CPU 使用率: {psutil.cpu_percent(interval=1)}%")
# 每个 CPU 的使用率
print(f"各 CPU 使用率: {psutil.cpu_percent(interval=1, percpu=True)}")
# CPU 时间统计
print(f"CPU 时间: {psutil.cpu_times()}")
# CPU 频率
print(f"CPU 频率: {psutil.cpu_freq()}")
内存信息
import psutil
# 内存信息
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / (1024**3):.2f} GB")
print(f"可用内存: {mem.available / (1024**3):.2f} GB")
print(f"已用内存: {mem.used / (1024**3):.2f} GB")
print(f"内存使用率: {mem.percent}%")
# 交换内存信息
swap = psutil.swap_memory()
print(f"总交换内存: {swap.total / (1024**3):.2f} GB")
print(f"已用交换内存: {swap.used / (1024**3):.2f} GB")
磁盘信息
import psutil
# 磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
print(f"设备: {partition.device}")
print(f"挂载点: {partition.mountpoint}")
print(f"文件系统: {partition.fstype}")
# 磁盘使用情况
disk_usage = psutil.disk_usage('/')
print(f"总空间: {disk_usage.total / (1024**3):.2f} GB")
print(f"已用空间: {disk_usage.used / (1024**3):.2f} GB")
print(f"可用空间: {disk_usage.free / (1024**3):.2f} GB")
print(f"使用率: {disk_usage.percent}%")
# 磁盘 I/O 统计
disk_io = psutil.disk_io_counters()
print(f"读次数: {disk_io.read_count}")
print(f"写字节数: {disk_io.write_bytes / (1024**2):.2f} MB")
网络信息
import psutil
# 网络接口
net_io = psutil.net_io_counters()
print(f"发送字节数: {net_io.bytes_sent / (1024**2):.2f} MB")
print(f"接收字节数: {net_io.bytes_recv / (1024**2):.2f} MB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")
# 网络接口详情
interfaces = psutil.net_if_addrs()
for interface, addrs in interfaces.items():
print(f"接口: {interface}")
for addr in addrs:
print(f" 地址: {addr.address}")
# 网络连接
connections = psutil.net_connections(kind='inet')
for conn in connections[:5]:
print(f"本地地址: {conn.laddr}, 远程地址: {conn.raddr}, 状态: {conn.status}")
进程信息
import psutil
# 获取当前进程
current_process = psutil.Process()
print(f"进程 ID: {current_process.pid}")
print(f"进程名称: {current_process.name()}")
print(f"进程状态: {current_process.status()}")
print(f"创建时间: {current_process.create_time()}")
print(f"CPU 使用率: {current_process.cpu_percent(interval=1)}%")
print(f"内存使用: {current_process.memory_info().rss / (1024**2):.2f} MB")
# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'username']):
print(f"PID: {proc.info['pid']}, 名称: {proc.info['name']}, 用户: {proc.info['username']}")
# 查找特定进程
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == 'python':
print(f"找到 Python 进程: PID {proc.info['pid']}")
系统信息
import psutil
import platform
# 系统信息
print(f"系统: {platform.system()}")
print(f"版本: {platform.release()}")
print(f"架构: {platform.machine()}")
# 启动时间
boot_time = psutil.boot_time()
print(f"系统启动时间: {boot_time}")
# 用户信息
users = psutil.users()
for user in users:
print(f"用户: {user.name}, 终端: {user.terminal}, 主机: {user.host}")
监控脚本示例
import psutil
import time
def monitor_system(interval=5):
"""监控系统资源"""
while True:
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用率
mem = psutil.virtual_memory()
mem_percent = mem.percent
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = disk.percent
print(f"CPU: {cpu_percent}%, 内存: {mem_percent}%, 磁盘: {disk_percent}%")
time.sleep(interval)
# monitor_system()
💡 提示:psutil 适合系统监控、性能分析、资源管理等场景,支持 Windows、Linux、macOS 等多个平台。
logging
logging 是 Python 内置的日志记录模块,提供了灵活的日志记录功能。
基本使用
import logging
# 配置日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 不同级别的日志
logging.debug('调试信息')
logging.info('普通信息')
logging.warning('警告信息')
logging.error('错误信息')
logging.critical('严重错误')
日志级别
import logging
# 设置日志级别
logging.basicConfig(level=logging.DEBUG)
# DEBUG: 10 - 调试信息
logging.debug('这是调试信息')
# INFO: 20 - 普通信息
logging.info('这是普通信息')
# WARNING: 30 - 警告信息
logging.warning('这是警告信息')
# ERROR: 40 - 错误信息
logging.error('这是错误信息')
# CRITICAL: 50 - 严重错误
logging.critical('这是严重错误')
记录到文件
import logging
# 配置日志文件
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='app.log',
filemode='a' # 追加模式
)
logging.info('这条日志会写入文件')
logging.error('错误也会写入文件')
创建 Logger
import logging
# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
# 创建处理器
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('app.log')
# 设置处理器级别
console_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.DEBUG)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 使用 logger
logger.debug('调试信息')
logger.info('普通信息')
logger.warning('警告信息')
logger.error('错误信息')
日志轮转
import logging
from logging.handlers import RotatingFileHandler
# 创建轮转文件处理器
handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5 # 保留5个备份
)
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.info('这条日志会写入文件,文件超过10MB会自动轮转')
按时间轮转
import logging
from logging.handlers import TimedRotatingFileHandler
# 按天轮转
handler = TimedRotatingFileHandler(
'app.log',
when='midnight', # 每天午夜轮转
interval=1,
backupCount=7 # 保留7天的日志
)
handler.suffix = '%Y-%m-%d' # 文件名后缀
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.info('这条日志每天午夜会自动轮转')
异常跟踪
import logging
logging.basicConfig(
level=logging.ERROR,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
result = 10 / 0
except Exception as e:
# 记录异常信息
logging.error('发生错误', exc_info=True)
# 或者
logging.exception('发生错误')
结构化日志
import logging
import json
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
return json.dumps(log_data, ensure_ascii=False)
# 使用结构化格式
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger = logging.getLogger('structured')
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger.info('这是结构化日志')
💡 提示:logging 模块功能强大,建议在生产环境中使用,而不是使用 print 语句。
Ansible
Ansible 是一个自动化配置管理工具,Python 可以通过 ansible-runner 或 ansible-runner 接口调用 Ansible。
安装
pip install ansible-runner
使用 ansible-runner
import ansible_runner
# 执行 Ad-hoc 命令
r = ansible_runner.run(
private_data_dir='/tmp/ansible',
module='shell',
module_args='ls -la',
inventory='localhost,',
pattern='all'
)
print(f"状态: {r.status}")
print(f"输出: {r.stdout}")
print(f"错误: {r.stderr}")
执行 Playbook
import ansible_runner
# 执行 Playbook
r = ansible_runner.run(
private_data_dir='/tmp/ansible',
playbook='site.yml'
)
print(f"状态: {r.status}")
print(f"输出: {r.stdout}")
# 获取事件
for event in r.events:
print(event)
动态 Inventory
import ansible_runner
# 动态 inventory
inventory = {
'all': {
'hosts': {
'server1': {'ansible_host': '192.168.1.1'},
'server2': {'ansible_host': '192.168.1.2'}
}
}
}
r = ansible_runner.run(
private_data_dir='/tmp/ansible',
module='ping',
inventory=inventory
)
print(f"输出: {r.stdout}")
异步执行
import ansible_runner
# 异步执行
r = ansible_runner.run_async(
private_data_dir='/tmp/ansible',
playbook='site.yml'
)
# 检查状态
while not r.rc:
print("正在执行...")
time.sleep(1)
print(f"完成: {r.rc}")
使用 Ansible Python API
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.inventory.manager import InventoryManager
from ansible.vars.manager import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
# 加载 inventory
loader = DataLoader()
inventory = InventoryManager(loader=loader, sources='inventory')
variable_manager = VariableManager(loader=loader, inventory=inventory)
# 创建 PlaybookExecutor
playbook_path = 'site.yml'
executor = PlaybookExecutor(
playbooks=[playbook_path],
inventory=inventory,
variable_manager=variable_manager,
loader=loader
)
# 执行
executor.run()
💡 提示:Ansible 适合自动化配置管理、应用部署、任务编排等场景。