异步编程 →

网络编程

Python 提供了丰富的网络编程支持,包括 socket 编程、HTTP 服务器等。

🌐 TCP 连接流程图

sequenceDiagram participant Client as 客户端 participant Server as 服务器 Server->>Server: socket() 创建socket Server->>Server: bind() 绑定地址端口 Server->>Server: listen() 开始监听 Note over Server: 等待连接... Client->>Client: socket() 创建socket Client->>Server: connect() 请求连接 Server->>Client: accept() 接受连接 Note over Client,Server: TCP连接建立 Client->>Server: send() 发送数据 Server->>Client: recv() 接收数据 Server->>Client: send() 发送响应 Client->>Server: recv() 接收响应 Client->>Server: close() 关闭连接 Server->>Server: close() 关闭连接

📡 HTTP 请求流程

flowchart TD A[客户端] --> B[构建HTTP请求] B --> C[DNS解析] C --> D[建立TCP连接] D --> E[发送HTTP请求] E --> F{服务器处理} F -->|200 OK| G[返回响应] F -->|404 Not Found| H[返回错误] F -->|500 Server Error| I[返回错误] G --> J[解析响应] H --> J I --> J J --> K[关闭连接] style F fill:#f093fb,color:#fff

🔌 网络协议层次

flowchart TB A[应用层] --> A1[HTTP/FTP/SMTP] B[传输层] --> B1[TCP/UDP] C[网络层] --> C1[IP/ICMP] D[链路层] --> D1[以太网/WiFi] E[物理层] --> E1[电信号/光信号] A --> B B --> C C --> D D --> E style A fill:#11998e,color:#fff style B fill:#667eea,color:#fff style C fill:#764ba2,color:#fff style D fill:#f093fb,color:#fff style E fill:#4facfe,color:#fff

Socket 编程基础

import socket

# 创建 TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取主机名
hostname = socket.gethostname()
print(f"主机名: {hostname}")

# 获取 IP 地址
ip_address = socket.gethostbyname(hostname)
print(f"IP 地址: {ip_address}")

# 关闭 socket
sock.close()

TCP 服务器

import socket

# 创建 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定地址和端口
server_socket.bind(('localhost', 8888))

# 开始监听
server_socket.listen(5)
print("服务器启动,等待连接...")

while True:
    # 接受连接
    client_socket, address = server_socket.accept()
    print(f"连接来自: {address}")

    # 接收数据
    data = client_socket.recv(1024).decode('utf-8')
    print(f"收到消息: {data}")

    # 发送响应
    response = f"服务器回复: {data}"
    client_socket.send(response.encode('utf-8'))

    # 关闭连接
    client_socket.close()

TCP 客户端

import socket

# 创建 socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接服务器
client_socket.connect(('localhost', 8888))

# 发送数据
message = "Hello, Server!"
client_socket.send(message.encode('utf-8'))

# 接收响应
response = client_socket.recv(1024).decode('utf-8')
print(f"收到响应: {response}")

# 关闭连接
client_socket.close()

UDP 服务器

import socket

# 创建 UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 绑定地址和端口
server_socket.bind(('localhost', 9999))
print("UDP 服务器启动...")

while True:
    # 接收数据
    data, address = server_socket.recvfrom(1024)
    print(f"来自 {address} 的消息: {data.decode('utf-8')}")

    # 发送响应
    response = f"UDP 回复: {data.decode('utf-8')}"
    server_socket.sendto(response.encode('utf-8'), address)

UDP 客户端

import socket

# 创建 UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 发送数据
message = "Hello, UDP Server!"
client_socket.sendto(message.encode('utf-8'), ('localhost', 9999))

# 接收响应
response, address = client_socket.recvfrom(1024)
print(f"收到响应: {response.decode('utf-8')}")

# 关闭连接
client_socket.close()

HTTP 服务器

from http.server import HTTPServer, BaseHTTPRequestHandler

class MyHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 发送响应头
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()

        # 发送响应体
        response = """
        
        
            

Hello, HTTP Server!

这是一个简单的 HTTP 服务器

""" self.wfile.write(response.encode('utf-8')) def do_POST(self): # 获取 POST 数据 content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) print(f"收到 POST 数据: {post_data.decode('utf-8')}") # 发送响应 self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(b'POST 请求已接收') # 创建服务器 server = HTTPServer(('localhost', 8000), MyHTTPRequestHandler) print("HTTP 服务器运行在 http://localhost:8000") server.serve_forever()

简单的 HTTP 客户端

import urllib.request
import json

# GET 请求
url = 'https://api.github.com'
response = urllib.request.urlopen(url)
data = response.read().decode('utf-8')
print(data)

# POST 请求
url = 'https://httpbin.org/post'
data = json.dumps({'name': '张三', 'age': 25}).encode('utf-8')
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
response = urllib.request.urlopen(req)
print(response.read().decode('utf-8'))

URL 解析

from urllib.parse import urlparse, parse_qs, urljoin

# 解析 URL
url = 'https://www.example.com/path?name=张三&age=25#section'
parsed = urlparse(url)
print(f"协议: {parsed.scheme}")
print(f"域名: {parsed.netloc}")
print(f"路径: {parsed.path}")
print(f"查询参数: {parsed.query}")
print(f"片段: {parsed.fragment}")

# 解析查询参数
query = parse_qs(parsed.query)
print(f"查询参数: {query}")

# 拼接 URL
base_url = 'https://www.example.com/'
relative_url = 'path/to/resource'
full_url = urljoin(base_url, relative_url)
print(f"完整 URL: {full_url}")

JSON Web Token (JWT)

import jwt
import datetime

# 生成 JWT
secret = 'your-secret-key'
payload = {
    'user_id': 123,
    'username': '张三',
    'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm='HS256')
print(f"JWT: {token}")

# 验证 JWT
try:
    decoded = jwt.decode(token, secret, algorithms=['HS256'])
    print(f"解码后的数据: {decoded}")
except jwt.ExpiredSignatureError:
    print("Token 已过期")
except jwt.InvalidTokenError:
    print("无效的 Token")
💡 提示:Socket 编程是网络编程的基础,但在实际开发中,建议使用更高级的框架如 FastAPI、Flask 等。
异步编程 →