SQLAlchemy

SQLAlchemy 是 Python 中最流行的 SQL 工具包和 ORM(对象关系映射)库,提供了强大的数据库操作功能。

安装

pip install sqlalchemy

Core 模式(SQL 表达式语言)

from sqlalchemy import create_engine, text

# 创建数据库引擎
engine = create_engine('sqlite:///example.db')

# 执行原生 SQL
with engine.connect() as conn:
    result = conn.execute(text("SELECT 'Hello, SQLAlchemy!'"))
    print(result.fetchone())

# 创建表
with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            age INTEGER
        )
    """))
    conn.commit()

# 插入数据
with engine.connect() as conn:
    conn.execute(text("INSERT INTO users (name, age) VALUES ('张三', 25)"))
    conn.execute(text("INSERT INTO users (name, age) VALUES ('李四', 30)"))
    conn.commit()

# 查询数据
with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM users"))
    for row in result:
        print(row)

ORM 模式(对象关系映射)

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker

# 创建基类
Base = declarative_base()

# 定义模型
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)
    age = Column(Integer)

    def __repr__(self):
        return f""

# 创建引擎和会话
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

# 创建(插入)
user1 = User(name='王五', age=28)
user2 = User(name='赵六', age=32)
session.add(user1)
session.add(user2)
session.commit()

# 读取(查询)
all_users = session.query(User).all()
for user in all_users:
    print(user)

# 条件查询
user = session.query(User).filter_by(name='王五').first()
print(f"找到用户: {user}")

# 更新
user = session.query(User).filter_by(name='王五').first()
user.age = 29
session.commit()

# 删除
user = session.query(User).filter_by(name='赵六').first()
session.delete(user)
session.commit()

# 关闭会话
session.close()

关系映射

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

Base = declarative_base()

class Author(Base):
    __tablename__ = 'authors'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    # 一对多关系
    books = relationship("Book", back_populates="author")

class Book(Base):
    __tablename__ = 'books'

    id = Column(Integer, primary_key=True)
    title = Column(String(100))
    author_id = Column(Integer, ForeignKey('authors.id'))

    # 多对一关系
    author = relationship("Author", back_populates="books")

# 创建表
engine = create_engine('sqlite:///library.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

# 创建作者和书籍
author = Author(name='鲁迅')
book1 = Book(title='狂人日记', author=author)
book2 = Book(title='阿Q正传', author=author)

session.add(author)
session.commit()

# 查询作者及其书籍
author = session.query(Author).filter_by(name='鲁迅').first()
print(f"作者: {author.name}")
print(f"书籍: {[book.title for book in author.books]}")

session.close()

连接 MySQL 数据库

from sqlalchemy import create_engine

# MySQL 连接字符串
engine = create_engine('mysql+pymysql://username:password@localhost:3306/database_name')

# 或者使用 mysql-connector-python
engine = create_engine('mysql+mysqlconnector://username:password@localhost:3306/database_name')
💡 提示:ORM 模式更适合复杂应用,Core 模式更适合简单查询或需要精细控制的场景。

MySQL

MySQL 是最流行的开源关系型数据库管理系统之一。Python 提供了多种方式连接和操作 MySQL。

安装驱动

# 方法1:使用 PyMySQL
pip install pymysql

# 方法2:使用 mysql-connector-python(官方驱动)
pip install mysql-connector-python

# 方法3:使用 MySQLdb
pip install mysqlclient

使用 PyMySQL

import pymysql

# 创建连接
connection = pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='test_db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

try:
    with connection.cursor() as cursor:
        # 创建表
        sql = """
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(50) NOT NULL,
            email VARCHAR(100) UNIQUE,
            age INT
        )
        """
        cursor.execute(sql)
        connection.commit()

        # 插入数据
        sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
        cursor.execute(sql, ('张三', 'zhangsan@example.com', 25))
        cursor.execute(sql, ('李四', 'lisi@example.com', 30))
        connection.commit()

        # 查询数据
        sql = "SELECT * FROM users"
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            print(row)

        # 更新数据
        sql = "UPDATE users SET age = %s WHERE name = %s"
        cursor.execute(sql, (26, '张三'))
        connection.commit()

        # 删除数据
        sql = "DELETE FROM users WHERE name = %s"
        cursor.execute(sql, ('李四',))
        connection.commit()

finally:
    connection.close()

使用 mysql-connector-python

import mysql.connector

# 创建连接
connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='test_db'
)

cursor = connection.cursor()

# 执行 SQL 查询
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()
print(f"MySQL 版本: {version[0]}")

# 创建表
cursor.execute("""
    CREATE TABLE IF NOT EXISTS products (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100),
        price DECIMAL(10, 2)
    )
""")

# 插入数据
sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
val = ("Python 书籍", 59.99)
cursor.execute(sql, val)
connection.commit()

# 批量插入
sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
vals = [
    ("Java 书籍", 49.99),
    ("Go 书籍", 45.99),
    ("Rust 书籍", 55.99)
]
cursor.executemany(sql, vals)
connection.commit()

# 查询数据
cursor.execute("SELECT * FROM products")
results = cursor.fetchall()
for row in results:
    print(row)

connection.close()

事务处理

import pymysql

connection = pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='test_db'
)

try:
    with connection.cursor() as cursor:
        # 开始事务
        connection.begin()

        # 多个操作
        cursor.execute("UPDATE account SET balance = balance - 100 WHERE id = 1")
        cursor.execute("UPDATE account SET balance = balance + 100 WHERE id = 2")

        # 提交事务
        connection.commit()
        print("事务成功")

except Exception as e:
    # 回滚事务
    connection.rollback()
    print(f"事务失败: {e}")

finally:
    connection.close()

连接池

from dbutils.pooled_db import PooledDB
import pymysql

# 创建连接池
pool = PooledDB(
    creator=pymysql,
    maxconnections=6,
    mincached=2,
    maxcached=4,
    host='localhost',
    user='root',
    password='your_password',
    database='test_db'
)

# 从连接池获取连接
connection = pool.connection()
try:
    with connection.cursor() as cursor:
        cursor.execute("SELECT * FROM users")
        results = cursor.fetchall()
        print(results)
finally:
    connection.close()  # 连接会返回到连接池
💡 提示:在生产环境中,建议使用连接池来提高性能和资源利用率。

MongoDB

MongoDB 是一个流行的 NoSQL 文档数据库,PyMongo 是 Python 的 MongoDB 驱动程序。

安装

pip install pymongo

连接数据库

from pymongo import MongoClient

# 连接到 MongoDB
client = MongoClient('mongodb://localhost:27017/')

# 或使用连接字符串
# client = MongoClient('mongodb://username:password@localhost:27017/')

# 获取数据库
db = client['test_db']

# 或使用属性访问
# db = client.test_db

print(f"数据库列表: {client.list_database_names()}")

创建集合和插入文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']

# 获取集合(表)
collection = db['users']

# 插入单个文档
user = {
    'name': '张三',
    'age': 25,
    'email': 'zhangsan@example.com',
    'hobbies': ['阅读', '编程', '旅行']
}
result = collection.insert_one(user)
print(f"插入的文档 ID: {result.inserted_id}")

# 插入多个文档
users = [
    {'name': '李四', 'age': 30, 'email': 'lisi@example.com'},
    {'name': '王五', 'age': 28, 'email': 'wangwu@example.com'},
    {'name': '赵六', 'age': 32, 'email': 'zhaoliu@example.com'}
]
result = collection.insert_many(users)
print(f"插入的文档 IDs: {result.inserted_ids}")

查询文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']

# 查询所有文档
for user in collection.find():
    print(user)

# 查询单个文档
user = collection.find_one({'name': '张三'})
print(f"找到用户: {user}")

# 条件查询
for user in collection.find({'age': {'$gt': 25}}):
    print(f"年龄大于25的用户: {user}")

# 投影(只返回特定字段)
for user in collection.find({}, {'name': 1, 'age': 1}):
    print(user)

# 排序
for user in collection.find().sort('age', -1):  # -1 降序,1 升序
    print(user)

# 限制数量
for user in collection.find().limit(2):
    print(user)

更新文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']

# 更新单个文档
result = collection.update_one(
    {'name': '张三'},
    {'$set': {'age': 26}}
)
print(f"匹配到 {result.matched_count} 个文档,修改了 {result.modified_count} 个")

# 更新多个文档
result = collection.update_many(
    {'age': {'$lt': 30}},
    {'$inc': {'age': 1}}  # 年龄加1
)
print(f"匹配到 {result.matched_count} 个文档,修改了 {result.modified_count} 个")

# 添加字段
result = collection.update_many(
    {},
    {'$set': {'status': 'active'}}
)

删除文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']

# 删除单个文档
result = collection.delete_one({'name': '赵六'})
print(f"删除了 {result.deleted_count} 个文档")

# 删除多个文档
result = collection.delete_many({'age': {'$gt': 30}})
print(f"删除了 {result.deleted_count} 个文档")

# 删除所有文档
result = collection.delete_many({})
print(f"删除了 {result.deleted_count} 个文档")

聚合操作

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']

# 统计
pipeline = [
    {'$match': {'age': {'$gte': 20}}},
    {'$group': {'_id': '$status', 'count': {'$sum': 1}}},
    {'$sort': {'count': -1}}
]

for result in collection.aggregate(pipeline):
    print(result)

# 计算平均年龄
pipeline = [
    {'$group': {'_id': None, 'avg_age': {'$avg': '$age'}}}
]

for result in collection.aggregate(pipeline):
    print(f"平均年龄: {result['avg_age']:.2f}")

索引

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']

# 创建索引
collection.create_index('name')
collection.create_index([('name', 1), ('age', -1)])  # 复合索引

# 创建唯一索引
collection.create_index('email', unique=True)

# 查看索引
for index in collection.list_indexes():
    print(index)

# 删除索引
collection.drop_index('name_1')
💡 提示:MongoDB 适合存储非结构化数据,如日志、文档、社交网络数据等。

Redis

Redis 是一个高性能的键值对数据库,常用于缓存、消息队列、排行榜等场景。

安装

pip install redis

连接 Redis

import redis

# 连接到 Redis
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True  # 自动解码为字符串
)

# 测试连接
print(f"Redis 连接状态: {r.ping()}")

字符串操作

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 设置键值
r.set('name', '张三')
r.set('age', 25)

# 获取值
print(f"姓名: {r.get('name')}")
print(f"年龄: {r.get('age')}")

# 设置过期时间(秒)
r.set('session:123', 'user_data', ex=3600)  # 1小时后过期

# 批量设置
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})

# 批量获取
values = r.mget(['key1', 'key2', 'key3'])
print(f"批量获取: {values}")

# 自增
r.set('counter', 0)
r.incr('counter')
r.incr('counter', 5)
print(f"计数器: {r.get('counter')}")

# 自减
r.decr('counter')
print(f"计数器: {r.get('counter')}")

# 追加
r.append('name', ' (Python开发者)')
print(f"追加后: {r.get('name')}")

# 检查键是否存在
print(f"'name' 键是否存在: {r.exists('name')}")

# 删除键
r.delete('key1')
print(f"'key1' 键是否存在: {r.exists('key1')}")

列表操作

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 从左边推入
r.lpush('mylist', 'item3', 'item2', 'item1')
print(f"列表: {r.lrange('mylist', 0, -1)}")

# 从右边推入
r.rpush('mylist', 'item4', 'item5')
print(f"列表: {r.lrange('mylist', 0, -1)}")

# 获取列表长度
print(f"列表长度: {r.llen('mylist')}")

# 获取指定范围的元素
print(f"前3个元素: {r.lrange('mylist', 0, 2)}")

# 弹出元素
left_item = r.lpop('mylist')
right_item = r.rpop('mylist')
print(f"左边弹出: {left_item}, 右边弹出: {right_item}")

# 根据索引获取元素
print(f"索引0的元素: {r.lindex('mylist', 0)}")

# 删除指定值的元素
r.lrem('mylist', 0, 'item2')  # 从列表中删除所有 item2

# 设置索引位置的值
r.lset('mylist', 0, 'new_item')

# 保留指定范围的元素
r.ltrim('mylist', 0, 1)  # 只保留前两个元素

哈希操作

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 设置哈希字段
r.hset('user:1', 'name', '张三')
r.hset('user:1', 'age', 25)
r.hset('user:1', 'email', 'zhangsan@example.com')

# 批量设置
r.hmset('user:2', {
    'name': '李四',
    'age': 30,
    'email': 'lisi@example.com'
})

# 获取单个字段
print(f"姓名: {r.hget('user:1', 'name')}")

# 获取所有字段
user1 = r.hgetall('user:1')
print(f"用户1: {user1}")

# 获取所有字段名
print(f"字段名: {r.hkeys('user:1')}")

# 获取所有值
print(f"值: {r.hvals('user:1')}")

# 检查字段是否存在
print(f"'name' 字段是否存在: {r.hexists('user:1', 'name')}")

# 删除字段
r.hdel('user:1', 'email')

# 获取哈希长度
print(f"哈希长度: {r.hlen('user:1')}")

# 自增
r.hincrby('user:1', 'age', 1)
print(f"年龄: {r.hget('user:1', 'age')}")

集合操作

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 添加元素
r.sadd('myset', 'apple', 'banana', 'orange')
print(f"集合: {r.smembers('myset')}")

# 获取集合长度
print(f"集合长度: {r.scard('myset')}")

# 检查元素是否存在
print(f"'apple' 是否在集合中: {r.sismember('myset', 'apple')}")

# 获取所有元素
print(f"所有元素: {r.smembers('myset')}")

# 随机获取元素
print(f"随机元素: {r.srandmember('myset')}")

# 随机弹出元素
print(f"弹出元素: {r.spop('myset')}")

# 删除元素
r.srem('myset', 'banana')

# 集合运算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')

# 交集
print(f"交集: {r.sinter('set1', 'set2')}")

# 并集
print(f"并集: {r.sunion('set1', 'set2')}")

# 差集
print(f"差集: {r.sdiff('set1', 'set2')}")

有序集合操作

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 添加元素(带分数)
r.zadd('leaderboard', {'张三': 100, '李四': 90, '王五': 95})

# 获取排名
print(f"排行榜: {r.zrange('leaderboard', 0, -1, withscores=True)}")

# 获取倒序排名
print(f"倒序排行榜: {r.zrevrange('leaderboard', 0, -1, withscores=True)}")

# 获取排名
print(f"张三的排名: {r.zrank('leaderboard', '张三')}")
print(f"张三的倒序排名: {r.zrevrank('leaderboard', '张三')}")

# 获取分数
print(f"张三的分数: {r.zscore('leaderboard', '张三')}")

# 增加分数
r.zincrby('leaderboard', 5, '张三')

# 获取指定范围的元素
print(f"前2名: {r.zrange('leaderboard', 0, 1, withscores=True)}")

# 获取指定分数范围的元素
print(f"分数90-100: {r.zrangebyscore('leaderboard', 90, 100, withscores=True)}")

# 获取集合长度
print(f"排行榜长度: {r.zcard('leaderboard')}")

# 删除元素
r.zrem('leaderboard', '王五')

发布订阅

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 发布消息
r.publish('news_channel', 'Python 3.14 发布了!')
r.publish('news_channel', 'FastAPI 更新到新版本!')

# 订阅频道
pubsub = r.pubsub()
pubsub.subscribe('news_channel')

# 监听消息
for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"收到消息: {message['data']}")

事务

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 开启事务
pipe = r.pipeline()

# 添加命令
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')

# 执行事务
results = pipe.execute()
print(f"事务结果: {results}")

Redis 常用场景:

  • 🚀 缓存:提高应用性能
  • 📊 排行榜:使用有序集合
  • 💬 消息队列:使用列表
  • 🔐 会话存储:存储用户会话
  • 📈 计数器:使用原子递增
  • 🎯 分布式锁:防止并发冲突
💡 提示:Redis 的数据都在内存中,访问速度极快,但需要注意内存使用情况。