高级特性

Python 提供了许多强大的高级特性,可以让代码更简洁、更优雅。

列表推导式

# 传统方式
squares = []
for i in range(10):
    squares.append(i ** 2)

# 列表推导式
squares = [i ** 2 for i in range(10)]
print(squares)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# 带条件的列表推导式
even_numbers = [i for i in range(20) if i % 2 == 0]
print(even_numbers)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# 嵌套列表推导式
matrix = [[i * j for j in range(3)] for i in range(3)]
print(matrix)  # [[0, 0, 0], [0, 1, 2], [0, 2, 4]]

字典推导式

# 创建字典
squares_dict = {i: i ** 2 for i in range(5)}
print(squares_dict)  # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

# 字典转换
prices = {'apple': 5, 'banana': 3, 'orange': 4}
discounted = {k: v * 0.9 for k, v in prices.items()}
print(discounted)  # {'apple': 4.5, 'banana': 2.7, 'orange': 3.6}

生成器

# 生成器函数
def fibonacci(n):
    """生成斐波那契数列"""
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

# 使用生成器
for num in fibonacci(10):
    print(num, end=' ')  # 0 1 1 2 3 5 8 13 21 34

# 生成器表达式
squares_gen = (i ** 2 for i in range(5))
print(list(squares_gen))  # [0, 1, 4, 9, 16]

装饰器

def timer(func):
    """计时装饰器"""
    import time
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行时间: {end - start:.4f} 秒")
        return result
    return wrapper

@timer
def slow_function():
    import time
    time.sleep(1)
    return "完成"

slow_function()  # 输出执行时间

lambda 函数

# lambda 函数(匿名函数)
add = lambda x, y: x + y
print(add(3, 5))  # 8

# 与内置函数结合使用
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x ** 2, numbers))
print(squared)  # [1, 4, 9, 16, 25]

# 过滤
even = list(filter(lambda x: x % 2 == 0, numbers))
print(even)  # [2, 4]

上下文管理器

class Timer:
    """自定义上下文管理器"""
    import time
    
    def __enter__(self):
        self.start = time.time()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end = time.time()
        print(f"代码块执行时间: {self.end - self.start:.4f} 秒")

with Timer():
    import time
    time.sleep(0.5)
    print("执行一些操作...")

Python 特性总结:

  • 🎯 列表推导式:简洁地创建列表
  • ⚡ 生成器:惰性求值,节省内存
  • 🎨 装饰器:优雅地扩展函数功能
  • 🔧 上下文管理器:自动管理资源
  • 📝 lambda 函数:简洁的匿名函数

Fabric

Fabric 是一个 Python 库和命令行工具,用于简化 SSH 远程执行和系统管理任务。

安装

pip install fabric

基本使用

from fabric import Connection

# 创建连接
conn = Connection('user@hostname')

# 执行远程命令
result = conn.run('ls -la', hide=True)
print(result.stdout)

# 上传文件
conn.put('local_file.txt', '/remote/path/')

# 下载文件
conn.get('/remote/path/remote_file.txt', 'local_path/')

# 关闭连接
conn.close()

批量操作多台服务器

from fabric import Connection

# 服务器列表
servers = [
    {'host': 'server1.example.com', 'user': 'admin'},
    {'host': 'server2.example.com', 'user': 'admin'},
    {'host': 'server3.example.com', 'user': 'admin'}
]

# 批量执行命令
for server in servers:
    conn = Connection(f"{server['user']}@{server['host']}")
    result = conn.run('uptime', hide=True)
    print(f"{server['host']}: {result.stdout.strip()}")
    conn.close()

Sudo 命令

from fabric import Connection

conn = Connection('user@hostname')

# 使用 sudo 执行命令
result = conn.sudo('apt-get update', hide=True)
print(result.stdout)

# sudo 带密码
result = conn.sudo('systemctl restart nginx', password='your_password')

conn.close()

上下文管理器

from fabric import Connection

conn = Connection('user@hostname')

# 切换目录
with conn.cd('/var/www'):
    conn.run('ls -la')

# 设置环境变量
with conn.prefix('source /etc/profile'):
    conn.run('echo $PATH')

# 激活虚拟环境
with conn.prefix('source /path/to/venv/bin/activate'):
    conn.run('pip list')

conn.close()

并行执行

from fabric import ThreadingGroup

# 创建连接组
group = ThreadingGroup(
    'user@server1',
    'user@server2',
    'user@server3'
)

# 并行执行命令
results = group.run('hostname')
for conn, result in results.items():
    print(f"{conn.host}: {result.stdout.strip()}")

# 并行上传
group.put('config.txt', '/tmp/')

使用 Fabric 任务

from fabric import task, Connection

@task
def deploy(c):
    """部署应用"""
    conn = Connection('user@hostname')
    
    # 拉取代码
    conn.run('cd /var/www/app && git pull')
    
    # 安装依赖
    with conn.prefix('source venv/bin/activate'):
        conn.run('pip install -r requirements.txt')
    
    # 重启服务
    conn.sudo('systemctl restart myapp')
    
    print("部署完成")

@task
def check_status(c):
    """检查服务状态"""
    conn = Connection('user@hostname')
    result = conn.sudo('systemctl status myapp', hide=True)
    print(result.stdout)
💡 提示:Fabric 适合自动化运维、批量服务器管理等场景,可以大大简化 SSH 操作。

psutil

psutil 是一个跨平台库,用于获取系统和进程信息,如 CPU、内存、磁盘、网络等。

安装

pip install psutil

CPU 信息

import psutil

# CPU 逻辑核心数
print(f"CPU 核心数: {psutil.cpu_count(logical=True)}")
print(f"CPU 物理核心数: {psutil.cpu_count(logical=False)}")

# CPU 使用率
print(f"CPU 使用率: {psutil.cpu_percent(interval=1)}%")

# 每个 CPU 的使用率
print(f"各 CPU 使用率: {psutil.cpu_percent(interval=1, percpu=True)}")

# CPU 时间统计
print(f"CPU 时间: {psutil.cpu_times()}")

# CPU 频率
print(f"CPU 频率: {psutil.cpu_freq()}")

内存信息

import psutil

# 内存信息
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / (1024**3):.2f} GB")
print(f"可用内存: {mem.available / (1024**3):.2f} GB")
print(f"已用内存: {mem.used / (1024**3):.2f} GB")
print(f"内存使用率: {mem.percent}%")

# 交换内存信息
swap = psutil.swap_memory()
print(f"总交换内存: {swap.total / (1024**3):.2f} GB")
print(f"已用交换内存: {swap.used / (1024**3):.2f} GB")

磁盘信息

import psutil

# 磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
    print(f"设备: {partition.device}")
    print(f"挂载点: {partition.mountpoint}")
    print(f"文件系统: {partition.fstype}")

# 磁盘使用情况
disk_usage = psutil.disk_usage('/')
print(f"总空间: {disk_usage.total / (1024**3):.2f} GB")
print(f"已用空间: {disk_usage.used / (1024**3):.2f} GB")
print(f"可用空间: {disk_usage.free / (1024**3):.2f} GB")
print(f"使用率: {disk_usage.percent}%")

# 磁盘 I/O 统计
disk_io = psutil.disk_io_counters()
print(f"读次数: {disk_io.read_count}")
print(f"写字节数: {disk_io.write_bytes / (1024**2):.2f} MB")

网络信息

import psutil

# 网络接口
net_io = psutil.net_io_counters()
print(f"发送字节数: {net_io.bytes_sent / (1024**2):.2f} MB")
print(f"接收字节数: {net_io.bytes_recv / (1024**2):.2f} MB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")

# 网络接口详情
interfaces = psutil.net_if_addrs()
for interface, addrs in interfaces.items():
    print(f"接口: {interface}")
    for addr in addrs:
        print(f"  地址: {addr.address}")

# 网络连接
connections = psutil.net_connections(kind='inet')
for conn in connections[:5]:
    print(f"本地地址: {conn.laddr}, 远程地址: {conn.raddr}, 状态: {conn.status}")

进程信息

import psutil

# 获取当前进程
current_process = psutil.Process()
print(f"进程 ID: {current_process.pid}")
print(f"进程名称: {current_process.name()}")
print(f"进程状态: {current_process.status()}")
print(f"创建时间: {current_process.create_time()}")
print(f"CPU 使用率: {current_process.cpu_percent(interval=1)}%")
print(f"内存使用: {current_process.memory_info().rss / (1024**2):.2f} MB")

# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'username']):
    print(f"PID: {proc.info['pid']}, 名称: {proc.info['name']}, 用户: {proc.info['username']}")

# 查找特定进程
for proc in psutil.process_iter(['pid', 'name']):
    if proc.info['name'] == 'python':
        print(f"找到 Python 进程: PID {proc.info['pid']}")

系统信息

import psutil
import platform

# 系统信息
print(f"系统: {platform.system()}")
print(f"版本: {platform.release()}")
print(f"架构: {platform.machine()}")

# 启动时间
boot_time = psutil.boot_time()
print(f"系统启动时间: {boot_time}")

# 用户信息
users = psutil.users()
for user in users:
    print(f"用户: {user.name}, 终端: {user.terminal}, 主机: {user.host}")

监控脚本示例

import psutil
import time

def monitor_system(interval=5):
    """监控系统资源"""
    while True:
        # CPU 使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # 内存使用率
        mem = psutil.virtual_memory()
        mem_percent = mem.percent
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent
        
        print(f"CPU: {cpu_percent}%, 内存: {mem_percent}%, 磁盘: {disk_percent}%")
        time.sleep(interval)

# monitor_system()
💡 提示:psutil 适合系统监控、性能分析、资源管理等场景,支持 Windows、Linux、macOS 等多个平台。

logging

logging 是 Python 内置的日志记录模块,提供了灵活的日志记录功能。

基本使用

import logging

# 配置日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 不同级别的日志
logging.debug('调试信息')
logging.info('普通信息')
logging.warning('警告信息')
logging.error('错误信息')
logging.critical('严重错误')

日志级别

import logging

# 设置日志级别
logging.basicConfig(level=logging.DEBUG)

# DEBUG: 10 - 调试信息
logging.debug('这是调试信息')

# INFO: 20 - 普通信息
logging.info('这是普通信息')

# WARNING: 30 - 警告信息
logging.warning('这是警告信息')

# ERROR: 40 - 错误信息
logging.error('这是错误信息')

# CRITICAL: 50 - 严重错误
logging.critical('这是严重错误')

记录到文件

import logging

# 配置日志文件
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='app.log',
    filemode='a'  # 追加模式
)

logging.info('这条日志会写入文件')
logging.error('错误也会写入文件')

创建 Logger

import logging

# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)

# 创建处理器
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('app.log')

# 设置处理器级别
console_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.DEBUG)

# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# 使用 logger
logger.debug('调试信息')
logger.info('普通信息')
logger.warning('警告信息')
logger.error('错误信息')

日志轮转

import logging
from logging.handlers import RotatingFileHandler

# 创建轮转文件处理器
handler = RotatingFileHandler(
    'app.log',
    maxBytes=10*1024*1024,  # 10MB
    backupCount=5  # 保留5个备份
)

# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

logger.info('这条日志会写入文件,文件超过10MB会自动轮转')

按时间轮转

import logging
from logging.handlers import TimedRotatingFileHandler

# 按天轮转
handler = TimedRotatingFileHandler(
    'app.log',
    when='midnight',  # 每天午夜轮转
    interval=1,
    backupCount=7  # 保留7天的日志
)

handler.suffix = '%Y-%m-%d'  # 文件名后缀

# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

logger.info('这条日志每天午夜会自动轮转')

异常跟踪

import logging

logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

try:
    result = 10 / 0
except Exception as e:
    # 记录异常信息
    logging.error('发生错误', exc_info=True)
    # 或者
    logging.exception('发生错误')

结构化日志

import logging
import json

class StructuredFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        return json.dumps(log_data, ensure_ascii=False)

# 使用结构化格式
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())

logger = logging.getLogger('structured')
logger.setLevel(logging.INFO)
logger.addHandler(handler)

logger.info('这是结构化日志')
💡 提示:logging 模块功能强大,建议在生产环境中使用,而不是使用 print 语句。

Ansible

Ansible 是一个自动化配置管理工具,Python 可以通过 ansible-runner 或 ansible-runner 接口调用 Ansible。

安装

pip install ansible-runner

使用 ansible-runner

import ansible_runner

# 执行 Ad-hoc 命令
r = ansible_runner.run(
    private_data_dir='/tmp/ansible',
    module='shell',
    module_args='ls -la',
    inventory='localhost,',
    pattern='all'
)

print(f"状态: {r.status}")
print(f"输出: {r.stdout}")
print(f"错误: {r.stderr}")

执行 Playbook

import ansible_runner

# 执行 Playbook
r = ansible_runner.run(
    private_data_dir='/tmp/ansible',
    playbook='site.yml'
)

print(f"状态: {r.status}")
print(f"输出: {r.stdout}")

# 获取事件
for event in r.events:
    print(event)

动态 Inventory

import ansible_runner

# 动态 inventory
inventory = {
    'all': {
        'hosts': {
            'server1': {'ansible_host': '192.168.1.1'},
            'server2': {'ansible_host': '192.168.1.2'}
        }
    }
}

r = ansible_runner.run(
    private_data_dir='/tmp/ansible',
    module='ping',
    inventory=inventory
)

print(f"输出: {r.stdout}")

异步执行

import ansible_runner

# 异步执行
r = ansible_runner.run_async(
    private_data_dir='/tmp/ansible',
    playbook='site.yml'
)

# 检查状态
while not r.rc:
    print("正在执行...")
    time.sleep(1)

print(f"完成: {r.rc}")

使用 Ansible Python API

from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.inventory.manager import InventoryManager
from ansible.vars.manager import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play

# 加载 inventory
loader = DataLoader()
inventory = InventoryManager(loader=loader, sources='inventory')

variable_manager = VariableManager(loader=loader, inventory=inventory)

# 创建 PlaybookExecutor
playbook_path = 'site.yml'
executor = PlaybookExecutor(
    playbooks=[playbook_path],
    inventory=inventory,
    variable_manager=variable_manager,
    loader=loader
)

# 执行
executor.run()
💡 提示:Ansible 适合自动化配置管理、应用部署、任务编排等场景。