Python 高级特性

装饰器

# 装饰器本质:闭包
def timer(func):
    def wrapper(*args, **kwargs):
        import time
        start = time.time()
        result = func(*args, **kwargs)
        print(f"耗时: {time.time() - start:.2f}s")
        return result
    return wrapper

@timer
def slow_func():
    import time
    time.sleep(1)
    return "done"

slow_func()

带参数的装饰器

def retry(times=3):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for _ in range(times):
                try:
                    return func(*args, **kwargs)
                except:
                    continue
            return None
        return wrapper
    return decorator

@retry(times=5)
def fetch():
    pass

异步编程

import asyncio

async def main():
    # 并发任务
    results = await asyncio.gather(
        asyncio.sleep(1, result="a"),
        asyncio.sleep(1, result="b")
    )
    print(results)  # ['a', 'b']

asyncio.run(main())

上下文管理器

# 类方式
class File:
    def __enter__(self):
        self.f = open("file.txt")
        return self.f
    def __exit__(self, *args):
        self.f.close()

# with 使用
with File() as f:
    f.read()

# 或用 contextlib
from contextlib import contextmanager
@contextmanager
def timer():
    import time
    start = time.time()
    yield
    print(f"耗时: {time.time() - start:.2f}")
E -->|否| G[切换执行其他任务] G --> D F --> H[事件循环结束] style E fill:#f093fb,color:#fff

🔄 同步 vs 异步对比

flowchart LR subgraph 同步 A1[任务1开始] --> A2[任务1结束] A2 --> A3[任务2开始] A3 --> A4[任务2结束] end subgraph 异步 B1[任务1开始] --> B2[任务2开始] B2 --> B3[任务1结束] B3 --> B4[任务2结束] end style 同步 fill:#e8f5e9 style 异步 fill:#e3f2fd

列表推导式

# 传统方式
squares = []
for i in range(10):
    squares.append(i ** 2)

# 列表推导式
squares = [i ** 2 for i in range(10)]
print(squares)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# 带条件的列表推导式
even_numbers = [i for i in range(20) if i % 2 == 0]
print(even_numbers)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# 嵌套列表推导式
matrix = [[i * j for j in range(3)] for i in range(3)]
print(matrix)  # [[0, 0, 0], [0, 1, 2], [0, 2, 4]]

字典推导式

# 创建字典
squares_dict = {i: i ** 2 for i in range(5)}
print(squares_dict)  # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

# 字典转换
prices = {'apple': 5, 'banana': 3, 'orange': 4}
discounted = {k: v * 0.9 for k, v in prices.items()}
print(discounted)  # {'apple': 4.5, 'banana': 2.7, 'orange': 3.6}

生成器

# 生成器函数
def fibonacci(n):
    """生成斐波那契数列"""
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

# 使用生成器
for num in fibonacci(10):
    print(num, end=' ')  # 0 1 1 2 3 5 8 13 21 34

# 生成器表达式
squares_gen = (i ** 2 for i in range(5))
print(list(squares_gen))  # [0, 1, 4, 9, 16]

装饰器

装饰器是一种用于修改函数或类行为的工具,本质上是一个返回函数的高阶函数。

# 简单的装饰器
def decorator(func):
    def wrapper():
        print("执行前...")
        func()
        print("执行后...")
    return wrapper

@decorator
def say_hello():
    print("Hello!")

say_hello()
# 输出:
# 执行前...
# Hello!
# 执行后...

带参数的装饰器

def repeat(times):
    """重复执行指定次数的装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            for _ in range(times):
                result = func(*args, **kwargs)
            return result
        return wrapper
    return decorator

@repeat(3)
def greet(name):
    print(f"你好,{name}!")

greet("Python")
# 输出:
# 你好,Python!
# 你好,Python!
# 你好,Python!

多个装饰器

def decorator1(func):
    def wrapper(*args, **kwargs):
        print("装饰器 1: 执行前")
        result = func(*args, **kwargs)
        print("装饰器 1: 执行后")
        return result
    return wrapper

def decorator2(func):
    def wrapper(*args, **kwargs):
        print("装饰器 2: 执行前")
        result = func(*args, **kwargs)
        print("装饰器 2: 执行后")
        return result
    return wrapper

@decorator1
@decorator2
def say_hi():
    print("Hi!")

say_hi()
# 输出:
# 装饰器 1: 执行前
# 装饰器 2: 执行前
# Hi!
# 装饰器 2: 执行后
# 装饰器 1: 执行后

类装饰器

class CountCalls:
    """统计函数调用次数的类装饰器"""
    def __init__(self, func):
        self.func = func
        self.count = 0
    
    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"第 {self.count} 次调用")
        return self.func(*args, **kwargs)

@CountCalls
def say_hello():
    print("Hello!")

say_hello()  # 第 1 次调用
say_hello()  # 第 2 次调用

异步编程基础

Python 的 asyncio 模块提供了异步编程的支持,适用于 I/O 密集型任务。

import asyncio

async def say_hello():
    """异步函数"""
    print("开始...")
    await asyncio.sleep(1)  # 模拟异步操作
    print("Hello!")
    print("结束...")

# 运行异步函数
asyncio.run(say_hello())

多个异步任务并发

import asyncio

async def fetch_data(name, delay):
    """模拟异步数据获取"""
    print(f"{name}: 开始获取数据...")
    await asyncio.sleep(delay)
    print(f"{name}: 数据获取完成")
    return f"{name} 的数据"

async def main():
    # 并发执行多个任务
    results = await asyncio.gather(
        fetch_data("任务 A", 2),
        fetch_data("任务 B", 1),
        fetch_data("任务 C", 3)
    )
    print(f"所有结果:{results}")

# 运行
asyncio.run(main())
# 总耗时约 3 秒(最长时间的任务)

async 和 await

import asyncio

async def producer(queue, name):
    """生产者协程"""
    for i in range(3):
        item = f"{name}-{i}"
        await queue.put(item)
        print(f"{name} 生产了:{item}")
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    """消费者协程"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"{name} 消费了:{item}")
        queue.task_done()
        await asyncio.sleep(0.3)

async def main():
    queue = asyncio.Queue()
    
    # 创建生产者和消费者任务
    producers = [producer(queue, f"生产者-{i}") for i in range(2)]
    consumers = [consumer(queue, f"消费者-{i}") for i in range(3)]
    
    # 等待生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列处理完成
    await queue.join()
    
    # 停止消费者
    for _ in range(3):
        await queue.put(None)
    
    await asyncio.gather(*consumers)

asyncio.run(main())

协程(Coroutine)

协程是一种比线程更轻量级的并发机制,在单线程中实现并发执行。

import asyncio

async def coroutine(name, delay):
    """协程函数"""
    for i in range(3):
        print(f"{name}: 第 {i+1} 次执行")
        await asyncio.sleep(delay)
    print(f"{name}: 执行完成")

async def main():
    # 创建协程任务
    task1 = asyncio.create_task(coroutine("协程 A", 0.5))
    task2 = asyncio.create_task(coroutine("协程 B", 0.7))
    
    # 等待所有任务完成
    await task1
    await task2
    
    print("所有协程执行完毕")

asyncio.run(main())

异步上下文管理器

import asyncio

class AsyncResource:
    """异步资源管理器"""
    async def __aenter__(self):
        print("获取资源...")
        await asyncio.sleep(0.5)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源...")
        await asyncio.sleep(0.3)

async def main():
    async with AsyncResource() as resource:
        print("使用资源中...")
        await asyncio.sleep(1)

asyncio.run(main())

异步迭代器

import asyncio

class AsyncCounter:
    """异步计数器"""
    def __init__(self, max_count):
        self.max_count = max_count
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.max_count:
            raise StopAsyncIteration
        self.current += 1
        await asyncio.sleep(0.5)
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(f"当前数字:{num}")

asyncio.run(main())

lambda 函数

# lambda 函数(匿名函数)
add = lambda x, y: x + y
print(add(3, 5))  # 8

# 与内置函数结合使用
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x ** 2, numbers))
print(squared)  # [1, 4, 9, 16, 25]

# 过滤
even = list(filter(lambda x: x % 2 == 0, numbers))
print(even)  # [2, 4]

上下文管理器

class Timer:
    """自定义上下文管理器"""
    import time
    
    def __enter__(self):
        self.start = time.time()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end = time.time()
        print(f"代码块执行时间: {self.end - self.start:.4f} 秒")

with Timer():
    import time
    time.sleep(0.5)
    print("执行一些操作...")

Python 特性总结:

Fabric

Fabric 是一个 Python 库和命令行工具,用于简化 SSH 远程执行和系统管理任务。

安装

pip install fabric

基本使用

from fabric import Connection

# 创建连接
conn = Connection('user@hostname')

# 执行远程命令
result = conn.run('ls -la', hide=True)
print(result.stdout)

# 上传文件
conn.put('local_file.txt', '/remote/path/')

# 下载文件
conn.get('/remote/path/remote_file.txt', 'local_path/')

# 关闭连接
conn.close()

批量操作多台服务器

from fabric import Connection

# 服务器列表
servers = [
    {'host': 'server1.example.com', 'user': 'admin'},
    {'host': 'server2.example.com', 'user': 'admin'},
    {'host': 'server3.example.com', 'user': 'admin'}
]

# 批量执行命令
for server in servers:
    conn = Connection(f"{server['user']}@{server['host']}")
    result = conn.run('uptime', hide=True)
    print(f"{server['host']}: {result.stdout.strip()}")
    conn.close()

Sudo 命令

from fabric import Connection

conn = Connection('user@hostname')

# 使用 sudo 执行命令
result = conn.sudo('apt-get update', hide=True)
print(result.stdout)

# sudo 带密码
result = conn.sudo('systemctl restart nginx', password='your_password')

conn.close()

上下文管理器

from fabric import Connection

conn = Connection('user@hostname')

# 切换目录
with conn.cd('/var/www'):
    conn.run('ls -la')

# 设置环境变量
with conn.prefix('source /etc/profile'):
    conn.run('echo $PATH')

# 激活虚拟环境
with conn.prefix('source /path/to/venv/bin/activate'):
    conn.run('pip list')

conn.close()

并行执行

from fabric import ThreadingGroup

# 创建连接组
group = ThreadingGroup(
    'user@server1',
    'user@server2',
    'user@server3'
)

# 并行执行命令
results = group.run('hostname')
for conn, result in results.items():
    print(f"{conn.host}: {result.stdout.strip()}")

# 并行上传
group.put('config.txt', '/tmp/')

使用 Fabric 任务

from fabric import task, Connection

@task
def deploy(c):
    """部署应用"""
    conn = Connection('user@hostname')
    
    # 拉取代码
    conn.run('cd /var/www/app && git pull')
    
    # 安装依赖
    with conn.prefix('source venv/bin/activate'):
        conn.run('pip install -r requirements.txt')
    
    # 重启服务
    conn.sudo('systemctl restart myapp')
    
    print("部署完成")

@task
def check_status(c):
    """检查服务状态"""
    conn = Connection('user@hostname')
    result = conn.sudo('systemctl status myapp', hide=True)
    print(result.stdout)
💡 提示:Fabric 适合自动化运维、批量服务器管理等场景,可以大大简化 SSH 操作。

psutil

psutil 是一个跨平台库,用于获取系统和进程信息,如 CPU、内存、磁盘、网络等。

安装

pip install psutil

CPU 信息

import psutil

# CPU 逻辑核心数
print(f"CPU 核心数: {psutil.cpu_count(logical=True)}")
print(f"CPU 物理核心数: {psutil.cpu_count(logical=False)}")

# CPU 使用率
print(f"CPU 使用率: {psutil.cpu_percent(interval=1)}%")

# 每个 CPU 的使用率
print(f"各 CPU 使用率: {psutil.cpu_percent(interval=1, percpu=True)}")

# CPU 时间统计
print(f"CPU 时间: {psutil.cpu_times()}")

# CPU 频率
print(f"CPU 频率: {psutil.cpu_freq()}")

内存信息

import psutil

# 内存信息
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / (1024**3):.2f} GB")
print(f"可用内存: {mem.available / (1024**3):.2f} GB")
print(f"已用内存: {mem.used / (1024**3):.2f} GB")
print(f"内存使用率: {mem.percent}%")

# 交换内存信息
swap = psutil.swap_memory()
print(f"总交换内存: {swap.total / (1024**3):.2f} GB")
print(f"已用交换内存: {swap.used / (1024**3):.2f} GB")

磁盘信息

import psutil

# 磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
    print(f"设备: {partition.device}")
    print(f"挂载点: {partition.mountpoint}")
    print(f"文件系统: {partition.fstype}")

# 磁盘使用情况
disk_usage = psutil.disk_usage('/')
print(f"总空间: {disk_usage.total / (1024**3):.2f} GB")
print(f"已用空间: {disk_usage.used / (1024**3):.2f} GB")
print(f"可用空间: {disk_usage.free / (1024**3):.2f} GB")
print(f"使用率: {disk_usage.percent}%")

# 磁盘 I/O 统计
disk_io = psutil.disk_io_counters()
print(f"读次数: {disk_io.read_count}")
print(f"写字节数: {disk_io.write_bytes / (1024**2):.2f} MB")

网络信息

import psutil

# 网络接口
net_io = psutil.net_io_counters()
print(f"发送字节数: {net_io.bytes_sent / (1024**2):.2f} MB")
print(f"接收字节数: {net_io.bytes_recv / (1024**2):.2f} MB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")

# 网络接口详情
interfaces = psutil.net_if_addrs()
for interface, addrs in interfaces.items():
    print(f"接口: {interface}")
    for addr in addrs:
        print(f"  地址: {addr.address}")

# 网络连接
connections = psutil.net_connections(kind='inet')
for conn in connections[:5]:
    print(f"本地地址: {conn.laddr}, 远程地址: {conn.raddr}, 状态: {conn.status}")

进程信息

import psutil

# 获取当前进程
current_process = psutil.Process()
print(f"进程 ID: {current_process.pid}")
print(f"进程名称: {current_process.name()}")
print(f"进程状态: {current_process.status()}")
print(f"创建时间: {current_process.create_time()}")
print(f"CPU 使用率: {current_process.cpu_percent(interval=1)}%")
print(f"内存使用: {current_process.memory_info().rss / (1024**2):.2f} MB")

# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'username']):
    print(f"PID: {proc.info['pid']}, 名称: {proc.info['name']}, 用户: {proc.info['username']}")

# 查找特定进程
for proc in psutil.process_iter(['pid', 'name']):
    if proc.info['name'] == 'python':
        print(f"找到 Python 进程: PID {proc.info['pid']}")

系统信息

import psutil
import platform

# 系统信息
print(f"系统: {platform.system()}")
print(f"版本: {platform.release()}")
print(f"架构: {platform.machine()}")

# 启动时间
boot_time = psutil.boot_time()
print(f"系统启动时间: {boot_time}")

# 用户信息
users = psutil.users()
for user in users:
    print(f"用户: {user.name}, 终端: {user.terminal}, 主机: {user.host}")

监控脚本示例

import psutil
import time

def monitor_system(interval=5):
    """监控系统资源"""
    while True:
        # CPU 使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # 内存使用率
        mem = psutil.virtual_memory()
        mem_percent = mem.percent
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent
        
        print(f"CPU: {cpu_percent}%, 内存: {mem_percent}%, 磁盘: {disk_percent}%")
        time.sleep(interval)

# monitor_system()
💡 提示:psutil 适合系统监控、性能分析、资源管理等场景,支持 Windows、Linux、macOS 等多个平台。

logging

logging 是 Python 内置的日志记录模块,提供了灵活的日志记录功能。

基本使用

import logging

# 配置日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 不同级别的日志
logging.debug('调试信息')
logging.info('普通信息')
logging.warning('警告信息')
logging.error('错误信息')
logging.critical('严重错误')

日志级别

import logging

# 设置日志级别
logging.basicConfig(level=logging.DEBUG)

# DEBUG: 10 - 调试信息
logging.debug('这是调试信息')

# INFO: 20 - 普通信息
logging.info('这是普通信息')

# WARNING: 30 - 警告信息
logging.warning('这是警告信息')

# ERROR: 40 - 错误信息
logging.error('这是错误信息')

# CRITICAL: 50 - 严重错误
logging.critical('这是严重错误')

记录到文件

import logging

# 配置日志文件
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='app.log',
    filemode='a'  # 追加模式
)

logging.info('这条日志会写入文件')
logging.error('错误也会写入文件')

创建 Logger

import logging

# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)

# 创建处理器
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('app.log')

# 设置处理器级别
console_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.DEBUG)

# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# 使用 logger
logger.debug('调试信息')
logger.info('普通信息')
logger.warning('警告信息')
logger.error('错误信息')

日志轮转

import logging
from logging.handlers import RotatingFileHandler

# 创建轮转文件处理器
handler = RotatingFileHandler(
    'app.log',
    maxBytes=10*1024*1024,  # 10MB
    backupCount=5  # 保留5个备份
)

# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

logger.info('这条日志会写入文件,文件超过10MB会自动轮转')

按时间轮转

import logging
from logging.handlers import TimedRotatingFileHandler

# 按天轮转
handler = TimedRotatingFileHandler(
    'app.log',
    when='midnight',  # 每天午夜轮转
    interval=1,
    backupCount=7  # 保留7天的日志
)

handler.suffix = '%Y-%m-%d'  # 文件名后缀

# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# 创建 logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

logger.info('这条日志每天午夜会自动轮转')

异常跟踪

import logging

logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

try:
    result = 10 / 0
except Exception as e:
    # 记录异常信息
    logging.error('发生错误', exc_info=True)
    # 或者
    logging.exception('发生错误')

结构化日志

import logging
import json

class StructuredFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        return json.dumps(log_data, ensure_ascii=False)

# 使用结构化格式
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())

logger = logging.getLogger('structured')
logger.setLevel(logging.INFO)
logger.addHandler(handler)

logger.info('这是结构化日志')
💡 提示:logging 模块功能强大,建议在生产环境中使用,而不是使用 print 语句。

Ansible

Ansible 是一个自动化配置管理工具,Python 可以通过 ansible-runner 或 ansible-runner 接口调用 Ansible。

安装

pip install ansible-runner

使用 ansible-runner

import ansible_runner

# 执行 Ad-hoc 命令
r = ansible_runner.run(
    private_data_dir='/tmp/ansible',
    module='shell',
    module_args='ls -la',
    inventory='localhost,',
    pattern='all'
)

print(f"状态: {r.status}")
print(f"输出: {r.stdout}")
print(f"错误: {r.stderr}")

执行 Playbook

import ansible_runner

# 执行 Playbook
r = ansible_runner.run(
    private_data_dir='/tmp/ansible',
    playbook='site.yml'
)

print(f"状态: {r.status}")
print(f"输出: {r.stdout}")

# 获取事件
for event in r.events:
    print(event)

动态 Inventory

import ansible_runner

# 动态 inventory
inventory = {
    'all': {
        'hosts': {
            'server1': {'ansible_host': '192.168.1.1'},
            'server2': {'ansible_host': '192.168.1.2'}
        }
    }
}

r = ansible_runner.run(
    private_data_dir='/tmp/ansible',
    module='ping',
    inventory=inventory
)

print(f"输出: {r.stdout}")

异步执行

import ansible_runner

# 异步执行
r = ansible_runner.run_async(
    private_data_dir='/tmp/ansible',
    playbook='site.yml'
)

# 检查状态
while not r.rc:
    print("正在执行...")
    time.sleep(1)

print(f"完成: {r.rc}")

使用 Ansible Python API

from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.inventory.manager import InventoryManager
from ansible.vars.manager import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play

# 加载 inventory
loader = DataLoader()
inventory = InventoryManager(loader=loader, sources='inventory')

variable_manager = VariableManager(loader=loader, inventory=inventory)

# 创建 PlaybookExecutor
playbook_path = 'site.yml'
executor = PlaybookExecutor(
    playbooks=[playbook_path],
    inventory=inventory,
    variable_manager=variable_manager,
    loader=loader
)

# 执行
executor.run()
💡 提示:Ansible 适合自动化配置管理、应用部署、任务编排等场景。