SQLAlchemy
SQLAlchemy 是 Python 中最流行的 SQL 工具包和 ORM(对象关系映射)库,提供了强大的数据库操作功能。
安装
pip install sqlalchemy
Core 模式(SQL 表达式语言)
from sqlalchemy import create_engine, text
# 创建数据库引擎
engine = create_engine('sqlite:///example.db')
# 执行原生 SQL
with engine.connect() as conn:
result = conn.execute(text("SELECT 'Hello, SQLAlchemy!'"))
print(result.fetchone())
# 创建表
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER
)
"""))
conn.commit()
# 插入数据
with engine.connect() as conn:
conn.execute(text("INSERT INTO users (name, age) VALUES ('张三', 25)"))
conn.execute(text("INSERT INTO users (name, age) VALUES ('李四', 30)"))
conn.commit()
# 查询数据
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users"))
for row in result:
print(row)
ORM 模式(对象关系映射)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
# 创建基类
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
age = Column(Integer)
def __repr__(self):
return f""
# 创建引擎和会话
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 创建(插入)
user1 = User(name='王五', age=28)
user2 = User(name='赵六', age=32)
session.add(user1)
session.add(user2)
session.commit()
# 读取(查询)
all_users = session.query(User).all()
for user in all_users:
print(user)
# 条件查询
user = session.query(User).filter_by(name='王五').first()
print(f"找到用户: {user}")
# 更新
user = session.query(User).filter_by(name='王五').first()
user.age = 29
session.commit()
# 删除
user = session.query(User).filter_by(name='赵六').first()
session.delete(user)
session.commit()
# 关闭会话
session.close()
关系映射
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
Base = declarative_base()
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(50))
# 一对多关系
books = relationship("Book", back_populates="author")
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String(100))
author_id = Column(Integer, ForeignKey('authors.id'))
# 多对一关系
author = relationship("Author", back_populates="books")
# 创建表
engine = create_engine('sqlite:///library.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 创建作者和书籍
author = Author(name='鲁迅')
book1 = Book(title='狂人日记', author=author)
book2 = Book(title='阿Q正传', author=author)
session.add(author)
session.commit()
# 查询作者及其书籍
author = session.query(Author).filter_by(name='鲁迅').first()
print(f"作者: {author.name}")
print(f"书籍: {[book.title for book in author.books]}")
session.close()
连接 MySQL 数据库
from sqlalchemy import create_engine
# MySQL 连接字符串
engine = create_engine('mysql+pymysql://username:password@localhost:3306/database_name')
# 或者使用 mysql-connector-python
engine = create_engine('mysql+mysqlconnector://username:password@localhost:3306/database_name')
💡 提示:ORM 模式更适合复杂应用,Core 模式更适合简单查询或需要精细控制的场景。
MySQL
MySQL 是最流行的开源关系型数据库管理系统之一。Python 提供了多种方式连接和操作 MySQL。
安装驱动
# 方法1:使用 PyMySQL
pip install pymysql
# 方法2:使用 mysql-connector-python(官方驱动)
pip install mysql-connector-python
# 方法3:使用 MySQLdb
pip install mysqlclient
使用 PyMySQL
import pymysql
# 创建连接
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='test_db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# 创建表
sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
age INT
)
"""
cursor.execute(sql)
connection.commit()
# 插入数据
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(sql, ('张三', 'zhangsan@example.com', 25))
cursor.execute(sql, ('李四', 'lisi@example.com', 30))
connection.commit()
# 查询数据
sql = "SELECT * FROM users"
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
print(row)
# 更新数据
sql = "UPDATE users SET age = %s WHERE name = %s"
cursor.execute(sql, (26, '张三'))
connection.commit()
# 删除数据
sql = "DELETE FROM users WHERE name = %s"
cursor.execute(sql, ('李四',))
connection.commit()
finally:
connection.close()
使用 mysql-connector-python
import mysql.connector
# 创建连接
connection = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='test_db'
)
cursor = connection.cursor()
# 执行 SQL 查询
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()
print(f"MySQL 版本: {version[0]}")
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10, 2)
)
""")
# 插入数据
sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
val = ("Python 书籍", 59.99)
cursor.execute(sql, val)
connection.commit()
# 批量插入
sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
vals = [
("Java 书籍", 49.99),
("Go 书籍", 45.99),
("Rust 书籍", 55.99)
]
cursor.executemany(sql, vals)
connection.commit()
# 查询数据
cursor.execute("SELECT * FROM products")
results = cursor.fetchall()
for row in results:
print(row)
connection.close()
事务处理
import pymysql
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='test_db'
)
try:
with connection.cursor() as cursor:
# 开始事务
connection.begin()
# 多个操作
cursor.execute("UPDATE account SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE account SET balance = balance + 100 WHERE id = 2")
# 提交事务
connection.commit()
print("事务成功")
except Exception as e:
# 回滚事务
connection.rollback()
print(f"事务失败: {e}")
finally:
connection.close()
连接池
from dbutils.pooled_db import PooledDB
import pymysql
# 创建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
maxcached=4,
host='localhost',
user='root',
password='your_password',
database='test_db'
)
# 从连接池获取连接
connection = pool.connection()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
print(results)
finally:
connection.close() # 连接会返回到连接池
💡 提示:在生产环境中,建议使用连接池来提高性能和资源利用率。
MongoDB
MongoDB 是一个流行的 NoSQL 文档数据库,PyMongo 是 Python 的 MongoDB 驱动程序。
安装
pip install pymongo
连接数据库
from pymongo import MongoClient
# 连接到 MongoDB
client = MongoClient('mongodb://localhost:27017/')
# 或使用连接字符串
# client = MongoClient('mongodb://username:password@localhost:27017/')
# 获取数据库
db = client['test_db']
# 或使用属性访问
# db = client.test_db
print(f"数据库列表: {client.list_database_names()}")
创建集合和插入文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
# 获取集合(表)
collection = db['users']
# 插入单个文档
user = {
'name': '张三',
'age': 25,
'email': 'zhangsan@example.com',
'hobbies': ['阅读', '编程', '旅行']
}
result = collection.insert_one(user)
print(f"插入的文档 ID: {result.inserted_id}")
# 插入多个文档
users = [
{'name': '李四', 'age': 30, 'email': 'lisi@example.com'},
{'name': '王五', 'age': 28, 'email': 'wangwu@example.com'},
{'name': '赵六', 'age': 32, 'email': 'zhaoliu@example.com'}
]
result = collection.insert_many(users)
print(f"插入的文档 IDs: {result.inserted_ids}")
查询文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']
# 查询所有文档
for user in collection.find():
print(user)
# 查询单个文档
user = collection.find_one({'name': '张三'})
print(f"找到用户: {user}")
# 条件查询
for user in collection.find({'age': {'$gt': 25}}):
print(f"年龄大于25的用户: {user}")
# 投影(只返回特定字段)
for user in collection.find({}, {'name': 1, 'age': 1}):
print(user)
# 排序
for user in collection.find().sort('age', -1): # -1 降序,1 升序
print(user)
# 限制数量
for user in collection.find().limit(2):
print(user)
更新文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']
# 更新单个文档
result = collection.update_one(
{'name': '张三'},
{'$set': {'age': 26}}
)
print(f"匹配到 {result.matched_count} 个文档,修改了 {result.modified_count} 个")
# 更新多个文档
result = collection.update_many(
{'age': {'$lt': 30}},
{'$inc': {'age': 1}} # 年龄加1
)
print(f"匹配到 {result.matched_count} 个文档,修改了 {result.modified_count} 个")
# 添加字段
result = collection.update_many(
{},
{'$set': {'status': 'active'}}
)
删除文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']
# 删除单个文档
result = collection.delete_one({'name': '赵六'})
print(f"删除了 {result.deleted_count} 个文档")
# 删除多个文档
result = collection.delete_many({'age': {'$gt': 30}})
print(f"删除了 {result.deleted_count} 个文档")
# 删除所有文档
result = collection.delete_many({})
print(f"删除了 {result.deleted_count} 个文档")
聚合操作
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']
# 统计
pipeline = [
{'$match': {'age': {'$gte': 20}}},
{'$group': {'_id': '$status', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
for result in collection.aggregate(pipeline):
print(result)
# 计算平均年龄
pipeline = [
{'$group': {'_id': None, 'avg_age': {'$avg': '$age'}}}
]
for result in collection.aggregate(pipeline):
print(f"平均年龄: {result['avg_age']:.2f}")
索引
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['test_db']
collection = db['users']
# 创建索引
collection.create_index('name')
collection.create_index([('name', 1), ('age', -1)]) # 复合索引
# 创建唯一索引
collection.create_index('email', unique=True)
# 查看索引
for index in collection.list_indexes():
print(index)
# 删除索引
collection.drop_index('name_1')
💡 提示:MongoDB 适合存储非结构化数据,如日志、文档、社交网络数据等。
Redis
Redis 是一个高性能的键值对数据库,常用于缓存、消息队列、排行榜等场景。
安装
pip install redis
连接 Redis
import redis
# 连接到 Redis
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # 自动解码为字符串
)
# 测试连接
print(f"Redis 连接状态: {r.ping()}")
字符串操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 设置键值
r.set('name', '张三')
r.set('age', 25)
# 获取值
print(f"姓名: {r.get('name')}")
print(f"年龄: {r.get('age')}")
# 设置过期时间(秒)
r.set('session:123', 'user_data', ex=3600) # 1小时后过期
# 批量设置
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
# 批量获取
values = r.mget(['key1', 'key2', 'key3'])
print(f"批量获取: {values}")
# 自增
r.set('counter', 0)
r.incr('counter')
r.incr('counter', 5)
print(f"计数器: {r.get('counter')}")
# 自减
r.decr('counter')
print(f"计数器: {r.get('counter')}")
# 追加
r.append('name', ' (Python开发者)')
print(f"追加后: {r.get('name')}")
# 检查键是否存在
print(f"'name' 键是否存在: {r.exists('name')}")
# 删除键
r.delete('key1')
print(f"'key1' 键是否存在: {r.exists('key1')}")
列表操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 从左边推入
r.lpush('mylist', 'item3', 'item2', 'item1')
print(f"列表: {r.lrange('mylist', 0, -1)}")
# 从右边推入
r.rpush('mylist', 'item4', 'item5')
print(f"列表: {r.lrange('mylist', 0, -1)}")
# 获取列表长度
print(f"列表长度: {r.llen('mylist')}")
# 获取指定范围的元素
print(f"前3个元素: {r.lrange('mylist', 0, 2)}")
# 弹出元素
left_item = r.lpop('mylist')
right_item = r.rpop('mylist')
print(f"左边弹出: {left_item}, 右边弹出: {right_item}")
# 根据索引获取元素
print(f"索引0的元素: {r.lindex('mylist', 0)}")
# 删除指定值的元素
r.lrem('mylist', 0, 'item2') # 从列表中删除所有 item2
# 设置索引位置的值
r.lset('mylist', 0, 'new_item')
# 保留指定范围的元素
r.ltrim('mylist', 0, 1) # 只保留前两个元素
哈希操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 设置哈希字段
r.hset('user:1', 'name', '张三')
r.hset('user:1', 'age', 25)
r.hset('user:1', 'email', 'zhangsan@example.com')
# 批量设置
r.hmset('user:2', {
'name': '李四',
'age': 30,
'email': 'lisi@example.com'
})
# 获取单个字段
print(f"姓名: {r.hget('user:1', 'name')}")
# 获取所有字段
user1 = r.hgetall('user:1')
print(f"用户1: {user1}")
# 获取所有字段名
print(f"字段名: {r.hkeys('user:1')}")
# 获取所有值
print(f"值: {r.hvals('user:1')}")
# 检查字段是否存在
print(f"'name' 字段是否存在: {r.hexists('user:1', 'name')}")
# 删除字段
r.hdel('user:1', 'email')
# 获取哈希长度
print(f"哈希长度: {r.hlen('user:1')}")
# 自增
r.hincrby('user:1', 'age', 1)
print(f"年龄: {r.hget('user:1', 'age')}")
集合操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 添加元素
r.sadd('myset', 'apple', 'banana', 'orange')
print(f"集合: {r.smembers('myset')}")
# 获取集合长度
print(f"集合长度: {r.scard('myset')}")
# 检查元素是否存在
print(f"'apple' 是否在集合中: {r.sismember('myset', 'apple')}")
# 获取所有元素
print(f"所有元素: {r.smembers('myset')}")
# 随机获取元素
print(f"随机元素: {r.srandmember('myset')}")
# 随机弹出元素
print(f"弹出元素: {r.spop('myset')}")
# 删除元素
r.srem('myset', 'banana')
# 集合运算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
# 交集
print(f"交集: {r.sinter('set1', 'set2')}")
# 并集
print(f"并集: {r.sunion('set1', 'set2')}")
# 差集
print(f"差集: {r.sdiff('set1', 'set2')}")
有序集合操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 添加元素(带分数)
r.zadd('leaderboard', {'张三': 100, '李四': 90, '王五': 95})
# 获取排名
print(f"排行榜: {r.zrange('leaderboard', 0, -1, withscores=True)}")
# 获取倒序排名
print(f"倒序排行榜: {r.zrevrange('leaderboard', 0, -1, withscores=True)}")
# 获取排名
print(f"张三的排名: {r.zrank('leaderboard', '张三')}")
print(f"张三的倒序排名: {r.zrevrank('leaderboard', '张三')}")
# 获取分数
print(f"张三的分数: {r.zscore('leaderboard', '张三')}")
# 增加分数
r.zincrby('leaderboard', 5, '张三')
# 获取指定范围的元素
print(f"前2名: {r.zrange('leaderboard', 0, 1, withscores=True)}")
# 获取指定分数范围的元素
print(f"分数90-100: {r.zrangebyscore('leaderboard', 90, 100, withscores=True)}")
# 获取集合长度
print(f"排行榜长度: {r.zcard('leaderboard')}")
# 删除元素
r.zrem('leaderboard', '王五')
发布订阅
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 发布消息
r.publish('news_channel', 'Python 3.14 发布了!')
r.publish('news_channel', 'FastAPI 更新到新版本!')
# 订阅频道
pubsub = r.pubsub()
pubsub.subscribe('news_channel')
# 监听消息
for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到消息: {message['data']}")
事务
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 开启事务
pipe = r.pipeline()
# 添加命令
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
# 执行事务
results = pipe.execute()
print(f"事务结果: {results}")
Redis 常用场景:
- 🚀 缓存:提高应用性能
- 📊 排行榜:使用有序集合
- 💬 消息队列:使用列表
- 🔐 会话存储:存储用户会话
- 📈 计数器:使用原子递增
- 🎯 分布式锁:防止并发冲突
💡 提示:Redis 的数据都在内存中,访问速度极快,但需要注意内存使用情况。