# 05-文件与数据处理
Python在文件处理和数据处理方面功能强大,标准库和第三方库配合使用效率极高。
## 文件操作进阶
### 文件编码处理
```python
# 指定编码(避免乱码)
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 错误处理
with open('file.txt', 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# errors选项:
# 'strict':默认,抛异常
# 'ignore':忽略错误字符
# 'replace':用?替代
# 'backslashreplace':用\xNN替代
```
### 大文件处理
```python
# 逐行读取(节省内存)
with open('large_file.txt', 'r') as f:
for line in f: # 迭代器,不会全部加载
process(line.strip())
# 分块读取
def read_in_chunks(file_path, chunk_size=1024):
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
for chunk in read_in_chunks('large_file.bin'):
process(chunk)
```
### CSV文件
```python
import csv
# 读取CSV
with open('data.csv', 'r') as f:
reader = csv.reader(f)
headers = next(reader) # 跳过表头
for row in reader:
print(row) # 列表
# DictReader(字典形式)
with open('data.csv', 'r') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['name'], row['age'])
# 写入CSV
with open('output.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['name', 'age'])
writer.writerows([['Alice', 25], ['Bob', 30]])
# DictWriter
with open('output.csv', 'w', newline='') as f:
fieldnames = ['name', 'age']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'name': 'Alice', 'age': 25})
```
### JSON处理
```python
import json
# Python ↔ JSON类型映射
# dict → object
# list/tuple → array
# str → string
# int/float → number
# True/False → true/false
# None → null
# 自定义编码器
class Student:
def __init__(self, name, age):
self.name = name
self.age = age
class StudentEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Student):
return {'name': obj.name, 'age': obj.age}
return super().default(obj)
json.dumps(student, cls=StudentEncoder)
# 或使用default参数
json.dumps(student, default=lambda o: o.__dict__)
```
### pickle序列化
```python
import pickle
# 序列化Python对象
data = {'key': 'value', 'number': 42}
with open('data.pkl', 'wb') as f:
pickle.dump(data, f)
# 反序列化
with open('data.pkl', 'rb') as f:
data = pickle.load(f)
# 序列化多个对象
with open('data.pkl', 'wb') as f:
pickle.dump(obj1, f)
pickle.dump(obj2, f)
with open('data.pkl', 'rb') as f:
obj1 = pickle.load(f)
obj2 = pickle.load(f)
```
## 数据处理
### 列表操作技巧
```python
# 去重(保持顺序)
def unique(lst):
seen = set()
return [x for x in lst if not (x in seen or seen.add(x))]
# 展平嵌套列表
nested = [[1, 2], [3, 4], [5, 6]]
flat = [item for sublist in nested for item in sublist] # [1,2,3,4,5,6]
# 分组(chunk)
def chunk(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i+n]
list(chunk([1,2,3,4,5,6,7], 3)) # [[1,2,3], [4,5,6], [7]]
# 滑动窗口
from collections import deque
def sliding_window(lst, n):
window = deque(maxlen=n)
for item in lst:
window.append(item)
if len(window) == n:
yield list(window)
list(sliding_window([1,2,3,4,5], 3)) # [[1,2,3], [2,3,4], [3,4,5]]
```
### 字典操作技巧
```python
# 合并字典
d1 = {'a': 1, 'b': 2}
d2 = {'b': 3, 'c': 4}
# Python 3.9+
merged = d1 | d2 # {'a': 1, 'b': 3, 'c': 4}
# Python 3.5+
merged = {**d1, **d2}
# 传统方法
merged = d1.copy()
merged.update(d2)
# 字典推导式过滤
d = {'a': 1, 'b': 2, 'c': 3}
filtered = {k: v for k, v in d.items() if v > 1} # {'b': 2, 'c': 3}
# 反转字典
reversed_d = {v: k for k, v in d.items()}
# 分组
from itertools import groupby
data = [('a', 1), ('a', 2), ('b', 3), ('b', 4)]
grouped = {k: list(v) for k, v in groupby(sorted(data), key=lambda x: x[0])}
```
## 正则表达式进阶
### 常用模式
```python
import re
# 手机号
phone = r'1[3-9]\d{9}'
# 邮箱
email = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
# URL
url = r'https?://[\w\-]+(\.[\w\-]+)+([\w\-\.,@?^=%&:/~\+#]*[\w\-\@?^=%&/~\+#])?'
# IPv4
ipv4 = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
# 日期
date = r'(\d{4})-(\d{2})-(\d{2})'
# 身份证
id_card = r'\d{17}[\dXx]'
```
### 贪婪vs非贪婪
```python
text = '
content
'
# 贪婪匹配(默认)
re.findall(r'<.*>', text) # ['content
'](尽可能长)
# 非贪婪匹配
re.findall(r'<.*?>', text) # ['', '
'](尽可能短)
```
### 预编译提升性能
```python
# 重复使用时编译
pattern = re.compile(r'\d+')
for text in large_list:
pattern.findall(text) # 比每次re.findall快
```
## 命令行工具
### subprocess
```python
import subprocess
# 执行命令
result = subprocess.run(['ls', '-l'],
capture_output=True,
text=True)
print(result.stdout)
print(result.returncode)
# 管道
p1 = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'txt'], stdin=p1.stdout, stdout=subprocess.PIPE)
output, _ = p2.communicate()
# 超时控制
try:
result = subprocess.run(['sleep', '10'], timeout=5)
except subprocess.TimeoutExpired:
print("Timeout!")
# 检查返回码
subprocess.run(['false'], check=True) # 非0退出抛异常
```
### shutil(高级文件操作)
```python
import shutil
# 复制
shutil.copy('src.txt', 'dst.txt') # 复制文件
shutil.copytree('src_dir', 'dst_dir') # 复制目录树
# 移动
shutil.move('src', 'dst')
# 删除
shutil.rmtree('dir') # 删除目录树
# 压缩
shutil.make_archive('archive', 'zip', 'folder')
# 解压
shutil.unpack_archive('archive.zip', 'extract_dir')
# 磁盘使用
usage = shutil.disk_usage('/')
print(f"Total: {usage.total / 1e9:.2f} GB")
print(f"Used: {usage.used / 1e9:.2f} GB")
print(f"Free: {usage.free / 1e9:.2f} GB")
```
## 数据序列化格式对比
| 格式 | 可读性 | 大小 | 速度 | 跨语言 | 场景 |
|------|-------|------|------|--------|------|
| JSON | 高 | 中 | 中 | 是 | 配置、API |
| pickle | 低 | 小 | 快 | 否(仅Python) | 缓存、临时存储 |
| CSV | 高 | 大 | 慢 | 是 | 表格数据 |
| XML | 高 | 大 | 慢 | 是 | 配置、文档 |
| YAML | 高 | 中 | 慢 | 是 | 配置文件 |
| Protocol Buffers | 低 | 小 | 快 | 是 | RPC、大数据 |
**选择:**
- 配置文件:JSON或YAML
- API数据:JSON
- Python内部:pickle
- 表格数据:CSV或Pandas
- 高性能:Protocol Buffers或MessagePack
## glob模式匹配
```python
import glob
# 匹配文件
glob.glob('*.txt') # 当前目录所有txt
glob.glob('**/*.py', recursive=True) # 递归查找py文件
glob.glob('[0-9]*.txt') # 数字开头的txt
# pathlib更现代
from pathlib import Path
list(Path('.').glob('*.txt'))
list(Path('.').rglob('*.py')) # 递归
```
## 性能对比
### 数据结构选择
```python
import timeit
# 查找性能(10000元素)
# list.index: ~500μs
# set.in: ~0.1μs(5000倍提升)
# 添加性能(10000次)
# list.append: ~500μs
# deque.append: ~300μs
# set.add: ~600μs
# 遍历性能(100000元素)
# for in list: ~5ms
# for in tuple: ~5ms(几乎相同)
# for in generator: ~3ms(省内存)
```
### 字符串拼接
```python
# 慢(O(n²))
s = ""
for i in range(10000):
s += str(i) # 每次创建新字符串
# 快(O(n))
s = "".join(str(i) for i in range(10000))
# f-string也高效
items = ['a', 'b', 'c']
result = f"{items[0]}, {items[1]}, {items[2]}"
```
## 实用脚本模板
### 文件批量处理
```python
#!/usr/bin/env python3
from pathlib import Path
def process_files(directory, pattern='*.txt'):
"""批量处理文件"""
for file_path in Path(directory).rglob(pattern):
process_file(file_path)
def process_file(file_path):
"""处理单个文件"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 处理content
result = content.upper()
# 写入新文件
output_path = file_path.with_suffix('.processed.txt')
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result)
if __name__ == '__main__':
process_files('/path/to/files')
```
### 数据清洗脚本
```python
import csv
import re
def clean_data(input_file, output_file):
"""清洗CSV数据"""
with open(input_file, 'r') as fin, \
open(output_file, 'w', newline='') as fout:
reader = csv.DictReader(fin)
writer = csv.DictWriter(fout, fieldnames=reader.fieldnames)
writer.writeheader()
for row in reader:
# 清洗数据
row['name'] = row['name'].strip().title()
row['email'] = row['email'].lower()
row['age'] = int(row['age']) if row['age'].isdigit() else 0
# 验证
if validate_row(row):
writer.writerow(row)
def validate_row(row):
"""验证数据"""
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return (row['name'] and
row['age'] > 0 and
re.match(email_pattern, row['email']))
```
### 日志分析脚本
```python
import re
from collections import Counter
from datetime import datetime
def analyze_log(log_file):
"""分析访问日志"""
ip_counter = Counter()
status_counter = Counter()
# 日志格式:IP - - [时间] "请求" 状态 大小
pattern = r'(\d+\.\d+\.\d+\.\d+).*\[(.+?)\].*" (\d{3})'
with open(log_file, 'r') as f:
for line in f:
match = re.search(pattern, line)
if match:
ip, timestamp, status = match.groups()
ip_counter[ip] += 1
status_counter[status] += 1
# 统计
print("Top 10 IPs:")
for ip, count in ip_counter.most_common(10):
print(f" {ip}: {count}")
print("\nStatus codes:")
for status, count in status_counter.most_common():
print(f" {status}: {count}")
analyze_log('access.log')
```
## 数据分析基础
### NumPy基础
```python
import numpy as np
# 创建数组
arr = np.array([1, 2, 3, 4, 5])
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
rand = np.random.rand(3, 3)
# 数组操作
arr + 10 # 向量化操作
arr * 2
arr ** 2
arr > 3 # 布尔索引
# 统计
arr.sum()
arr.mean()
arr.std()
arr.min()
arr.max()
# 索引和切片
arr[0]
arr[1:4]
arr[[0, 2, 4]] # 花式索引
arr[arr > 3] # 布尔索引
# 形状操作
arr.reshape(5, 1)
arr.flatten()
arr.T # 转置
```
### Pandas基础
```python
import pandas as pd
# 创建DataFrame
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'city': ['Beijing', 'Shanghai', 'Guangzhou']
})
# 读取文件
df = pd.read_csv('data.csv')
df = pd.read_excel('data.xlsx')
df = pd.read_json('data.json')
# 查看数据
df.head() # 前5行
df.tail() # 后5行
df.info() # 信息
df.describe() # 统计摘要
# 选择数据
df['name'] # 列
df[['name', 'age']] # 多列
df.loc[0] # 行(标签)
df.iloc[0] # 行(位置)
df.loc[0, 'name'] # 单元格
# 过滤
df[df['age'] > 25]
df[(df['age'] > 25) & (df['city'] == 'Beijing')]
# 排序
df.sort_values('age')
df.sort_values(['city', 'age'], ascending=[True, False])
# 分组统计
df.groupby('city')['age'].mean()
df.groupby('city').agg({'age': ['mean', 'min', 'max']})
# 新增列
df['adult'] = df['age'] >= 18
df['age_group'] = pd.cut(df['age'], bins=[0, 18, 60, 100],
labels=['child', 'adult', 'senior'])
# 保存
df.to_csv('output.csv', index=False)
df.to_excel('output.xlsx', index=False)
```
## Web抓取
### requests
```python
import requests
# GET请求
response = requests.get('https://api.example.com/data')
print(response.status_code) # 200
print(response.text) # 响应文本
data = response.json() # JSON解析
# POST请求
data = {'key': 'value'}
response = requests.post('https://api.example.com', json=data)
# 请求头
headers = {'User-Agent': 'MyApp/1.0'}
response = requests.get(url, headers=headers)
# 超时
response = requests.get(url, timeout=5)
# Session(保持连接)
session = requests.Session()
session.get(url1)
session.get(url2) # 复用连接
```
### BeautifulSoup(HTML解析)
```python
from bs4 import BeautifulSoup
html = '''
Title
Paragraph 1
Paragraph 2
'''
soup = BeautifulSoup(html, 'html.parser')
# 查找
soup.find('h1').text # 'Title'
soup.find('div', class_='content')
soup.find_all('p') # 所有p标签
soup.select('.content p') # CSS选择器
# 遍历
for p in soup.find_all('p'):
print(p.text)
# 提取属性
link = soup.find('a')
link['href'] # 链接地址
```
## 数据库操作
### SQLite
```python
import sqlite3
# 连接
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER
)
''')
# 插入
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 25))
conn.commit()
# 批量插入
users = [('Bob', 30), ('Charlie', 35)]
cursor.executemany('INSERT INTO users (name, age) VALUES (?, ?)', users)
conn.commit()
# 查询
cursor.execute('SELECT * FROM users WHERE age > ?', (20,))
rows = cursor.fetchall() # 所有结果
row = cursor.fetchone() # 单条结果
# 关闭
cursor.close()
conn.close()
# 使用with自动管理
with sqlite3.connect('database.db') as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
# 自动commit和close
```
## 配置文件
### ConfigParser(INI格式)
```python
import configparser
# 读取
config = configparser.ConfigParser()
config.read('config.ini')
value = config['section']['key']
value = config.get('section', 'key', fallback='default')
# 写入
config['NEW_SECTION'] = {'key': 'value'}
with open('config.ini', 'w') as f:
config.write(f)
```
### YAML(需要PyYAML)
```python
import yaml
# 读取
with open('config.yaml', 'r') as f:
config = yaml.safe_load(f)
# 写入
data = {'name': 'Alice', 'scores': [90, 85, 92]}
with open('output.yaml', 'w') as f:
yaml.dump(data, f, default_flow_style=False)
```
## 最佳实践
1. **with管理文件**:自动关闭
2. **pathlib操作路径**:比os.path更清晰
3. **json处理API数据**:通用格式
4. **csv处理表格数据**:简单高效
5. **正则预编译**:重复使用时
6. **生成器处理大文件**:逐行读取
7. **pandas处理结构化数据**:功能强大
8. **subprocess执行命令**:捕获输出
9. **logging记录日志**:不用print调试
10. **类型提示**:大型项目必备
**核心:** 选择合适的工具处理不同类型的数据,标准库优先。