异步编程 →

网络编程

Python 提供了丰富的网络编程支持,包括 socket 编程、HTTP 服务器等。

Socket 编程基础

import socket

# 创建 TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取主机名
hostname = socket.gethostname()
print(f"主机名: {hostname}")

# 获取 IP 地址
ip_address = socket.gethostbyname(hostname)
print(f"IP 地址: {ip_address}")

# 关闭 socket
sock.close()

TCP 服务器

import socket

# 创建 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定地址和端口
server_socket.bind(('localhost', 8888))

# 开始监听
server_socket.listen(5)
print("服务器启动,等待连接...")

while True:
    # 接受连接
    client_socket, address = server_socket.accept()
    print(f"连接来自: {address}")

    # 接收数据
    data = client_socket.recv(1024).decode('utf-8')
    print(f"收到消息: {data}")

    # 发送响应
    response = f"服务器回复: {data}"
    client_socket.send(response.encode('utf-8'))

    # 关闭连接
    client_socket.close()

TCP 客户端

import socket

# 创建 socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接服务器
client_socket.connect(('localhost', 8888))

# 发送数据
message = "Hello, Server!"
client_socket.send(message.encode('utf-8'))

# 接收响应
response = client_socket.recv(1024).decode('utf-8')
print(f"收到响应: {response}")

# 关闭连接
client_socket.close()

UDP 服务器

import socket

# 创建 UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 绑定地址和端口
server_socket.bind(('localhost', 9999))
print("UDP 服务器启动...")

while True:
    # 接收数据
    data, address = server_socket.recvfrom(1024)
    print(f"来自 {address} 的消息: {data.decode('utf-8')}")

    # 发送响应
    response = f"UDP 回复: {data.decode('utf-8')}"
    server_socket.sendto(response.encode('utf-8'), address)

UDP 客户端

import socket

# 创建 UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 发送数据
message = "Hello, UDP Server!"
client_socket.sendto(message.encode('utf-8'), ('localhost', 9999))

# 接收响应
response, address = client_socket.recvfrom(1024)
print(f"收到响应: {response.decode('utf-8')}")

# 关闭连接
client_socket.close()

HTTP 服务器

from http.server import HTTPServer, BaseHTTPRequestHandler

class MyHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 发送响应头
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()

        # 发送响应体
        response = """
        
        
            

Hello, HTTP Server!

这是一个简单的 HTTP 服务器

""" self.wfile.write(response.encode('utf-8')) def do_POST(self): # 获取 POST 数据 content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) print(f"收到 POST 数据: {post_data.decode('utf-8')}") # 发送响应 self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(b'POST 请求已接收') # 创建服务器 server = HTTPServer(('localhost', 8000), MyHTTPRequestHandler) print("HTTP 服务器运行在 http://localhost:8000") server.serve_forever()

简单的 HTTP 客户端

import urllib.request
import json

# GET 请求
url = 'https://api.github.com'
response = urllib.request.urlopen(url)
data = response.read().decode('utf-8')
print(data)

# POST 请求
url = 'https://httpbin.org/post'
data = json.dumps({'name': '张三', 'age': 25}).encode('utf-8')
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
response = urllib.request.urlopen(req)
print(response.read().decode('utf-8'))

URL 解析

from urllib.parse import urlparse, parse_qs, urljoin

# 解析 URL
url = 'https://www.example.com/path?name=张三&age=25#section'
parsed = urlparse(url)
print(f"协议: {parsed.scheme}")
print(f"域名: {parsed.netloc}")
print(f"路径: {parsed.path}")
print(f"查询参数: {parsed.query}")
print(f"片段: {parsed.fragment}")

# 解析查询参数
query = parse_qs(parsed.query)
print(f"查询参数: {query}")

# 拼接 URL
base_url = 'https://www.example.com/'
relative_url = 'path/to/resource'
full_url = urljoin(base_url, relative_url)
print(f"完整 URL: {full_url}")

JSON Web Token (JWT)

import jwt
import datetime

# 生成 JWT
secret = 'your-secret-key'
payload = {
    'user_id': 123,
    'username': '张三',
    'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm='HS256')
print(f"JWT: {token}")

# 验证 JWT
try:
    decoded = jwt.decode(token, secret, algorithms=['HS256'])
    print(f"解码后的数据: {decoded}")
except jwt.ExpiredSignatureError:
    print("Token 已过期")
except jwt.InvalidTokenError:
    print("无效的 Token")
💡 提示:Socket 编程是网络编程的基础,但在实际开发中,建议使用更高级的框架如 FastAPI、Flask 等。
异步编程 →