05-Java多线程编程
多线程是Java的核心特性,JVM级别支持,API丰富。合理使用多线程可充分利用多核CPU。
线程创建
方式1:继承Thread类
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
// 使用
MyThread t = new MyThread();
t.start(); // 启动线程(不是t.run()!)
方式2:实现Runnable接口(推荐)
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable running");
}
}
// 使用
Thread t = new Thread(new MyRunnable());
t.start();
// Lambda(Java 8+)
new Thread(() -> System.out.println("Lambda thread")).start();
方式3:实现Callable接口
支持返回值和抛出异常。
import java.util.concurrent.*;
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 42;
}
}
// 使用
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new MyCallable());
Integer result = future.get(); // 阻塞等待结果
executor.shutdown();
线程状态
NEW(新建)
↓ start()
RUNNABLE(可运行)
↓ 获取锁/IO完成
RUNNING(运行)
↓ sleep/wait/阻塞IO
BLOCKED/WAITING/TIMED_WAITING(阻塞/等待)
↓ run()结束
TERMINATED(终止)
线程控制
Thread t = new Thread(() -> {
try {
Thread.sleep(1000); // 休眠1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start(); // 启动
t.join(); // 等待线程结束
t.interrupt(); // 中断线程
boolean alive = t.isAlive(); // 是否存活
线程同步
synchronized关键字
Java最基本的同步机制,保证互斥访问。
public class Counter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
// 同步块
public void decrement() {
synchronized (this) {
count--;
}
}
// 静态同步方法(锁类对象)
public static synchronized void staticMethod() {
// ...
}
}
锁对象:
实例方法:锁this
静态方法:锁Class对象
同步块:锁指定对象
Lock接口
比synchronized更灵活,可中断、可超时、可尝试获取。
import java.util.concurrent.locks.*;
public class Counter {
private int count = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 必须在finally中释放
}
}
// 尝试获取锁
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// 超时获取
public boolean incrementWithTimeout() throws InterruptedException {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
ReadWriteLock
读写分离锁,读多写少场景性能优于普通锁。
import java.util.concurrent.locks.*;
public class Cache {
private Map<String, Object> cache = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
private Lock readLock = rwLock.readLock();
private Lock writeLock = rwLock.writeLock();
// 读操作(多个线程可同时读)
public Object get(String key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
// 写操作(独占)
public void put(String key, Object value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
}
线程通信
wait/notify
Object类的方法,必须在synchronized块中使用。
public class ProducerConsumer {
private Queue<Integer> queue = new LinkedList<>();
private int capacity = 10;
public synchronized void produce(int item) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // 队列满,等待
}
queue.add(item);
System.out.println("Produced: " + item);
notifyAll(); // 通知消费者
}
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 队列空,等待
}
int item = queue.poll();
System.out.println("Consumed: " + item);
notifyAll(); // 通知生产者
return item;
}
}
Condition
Lock配套的等待/通知机制,比wait/notify更灵活。
import java.util.concurrent.locks.*;
public class BoundedBuffer {
private Queue<Integer> queue = new LinkedList<>();
private int capacity = 10;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public void put(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待不满
}
queue.add(item);
notEmpty.signal(); // 通知不空
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待不空
}
int item = queue.poll();
notFull.signal(); // 通知不满
return item;
} finally {
lock.unlock();
}
}
}
线程池
线程创建销毁开销大,线程池复用线程,提高性能。
Executors工厂方法
import java.util.concurrent.*;
// 固定线程数
ExecutorService executor = Executors.newFixedThreadPool(5);
// 单线程
ExecutorService single = Executors.newSingleThreadExecutor();
// 缓存线程池(自动扩缩容)
ExecutorService cached = Executors.newCachedThreadPool();
// 定时任务
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// 提交任务
executor.submit(() -> System.out.println("Task"));
Future<Integer> future = executor.submit(() -> 42);
// 关闭
executor.shutdown(); // 不接受新任务,等待已提交任务完成
executor.shutdownNow(); // 尝试中断所有任务
ThreadPoolExecutor
手动配置线程池参数,更灵活。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 拒绝策略:
// - AbortPolicy:抛异常(默认)
// - CallerRunsPolicy:调用线程执行
// - DiscardPolicy:丢弃
// - DiscardOldestPolicy:丢弃最老任务
CompletableFuture(Java 8+)
异步编程增强,支持链式调用和组合。
import java.util.concurrent.CompletableFuture;
// 异步执行
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Async task");
});
// 异步计算
CompletableFuture<Integer> compute = CompletableFuture.supplyAsync(() -> {
return 42;
});
// 链式调用
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 转换
.thenApply(String::toUpperCase) // 再转换
.exceptionally(ex -> "Error"); // 异常处理
// 组合多个Future
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combined = f1.thenCombine(f2, (a, b) -> a + b);
Integer sum = combined.get(); // 30
// 等待所有完成
CompletableFuture.allOf(f1, f2).join();
// 等待任一完成
CompletableFuture.anyOf(f1, f2).join();
并发集合
ConcurrentHashMap
线程安全的HashMap,分段锁实现,性能优于Hashtable。
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 原子操作
map.putIfAbsent("key", 1);
map.computeIfPresent("key", (k, v) -> v + 1);
map.merge("key", 1, Integer::sum); // 原子计数
// 不需要额外同步
CopyOnWriteArrayList
写时复制,适合读多写少。
import java.util.concurrent.CopyOnWriteArrayList;
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 写操作:复制整个数组
list.add("item"); // 加锁,复制数组,添加元素
// 读操作:无锁,快照读取
for (String item : list) { // 不会ConcurrentModificationException
// ...
}
BlockingQueue
阻塞队列,线程安全,支持阻塞操作。
import java.util.concurrent.*;
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 生产者
new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put(i); // 队列满时阻塞
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者
new Thread(() -> {
try {
while (true) {
Integer item = queue.take(); // 队列空时阻塞
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
原子类
AtomicInteger
无锁线程安全的整数,基于CAS(Compare-And-Swap)。
import java.util.atomic.AtomicInteger;
AtomicInteger count = new AtomicInteger(0);
// 原子操作
count.incrementAndGet(); // ++count
count.decrementAndGet(); // --count
count.addAndGet(5); // count += 5
count.compareAndSet(5, 10); // CAS
// 性能:无锁 > synchronized
其他原子类
AtomicLong // 长整型
AtomicBoolean // 布尔型
AtomicReference<T> // 引用类型
AtomicIntegerArray // 整数数组
AtomicReferenceArray<T> // 引用数组
LongAdder // 高并发计数(比AtomicLong更快)
常见并发问题
死锁
// ❌ 死锁示例
Object lock1 = new Object();
Object lock2 = new Object();
new Thread(() -> {
synchronized (lock1) {
Thread.sleep(100);
synchronized (lock2) { // 等待lock2
// ...
}
}
}).start();
new Thread(() -> {
synchronized (lock2) {
Thread.sleep(100);
synchronized (lock1) { // 等待lock1(死锁!)
// ...
}
}
}).start();
// ✅ 解决:固定加锁顺序
synchronized (lock1) {
synchronized (lock2) {
// 两个线程都按相同顺序加锁
}
}
线程安全问题
// ❌ 非线程安全
public class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 非原子:读-改-写
}
}
// ✅ 同步方法
public synchronized void increment() {
count++;
}
// ✅ 原子类
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
// ✅ volatile(仅可见性,不保证原子性)
private volatile boolean flag = false;
public void setFlag() {
flag = true; // 简单赋值可用volatile
}
volatile关键字
保证可见性和有序性,但不保证原子性。
public class VolatileExample {
private volatile boolean running = true;
public void run() {
while (running) { // 其他线程修改running立即可见
// ...
}
}
public void stop() {
running = false; // 修改对所有线程可见
}
}
使用场景:
状态标志
双重检查锁的单例
读多写少的变量
注意: i++ 等复合操作即使用volatile也不是线程安全的。
ThreadLocal
线程局部变量,每个线程独立副本。
public class ThreadLocalExample {
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
new Thread(() -> {
threadLocal.set(10);
System.out.println("Thread 1: " + threadLocal.get()); // 10
}).start();
new Thread(() -> {
threadLocal.set(20);
System.out.println("Thread 2: " + threadLocal.get()); // 20
}).start();
}
}
使用场景:
数据库连接
用户Session
SimpleDateFormat(非线程安全,用ThreadLocal包装)
注意: 线程池环境必须手动清理,否则内存泄漏。
try {
threadLocal.set(value);
// 使用
} finally {
threadLocal.remove(); // 清理
}
最佳实践
线程池配置
// CPU密集型:线程数 = CPU核心数 + 1
int cpuCount = Runtime.getRuntime().availableProcessors();
ExecutorService cpuBound = Executors.newFixedThreadPool(cpuCount + 1);
// IO密集型:线程数 = 2 * CPU核心数
ExecutorService ioBound = Executors.newFixedThreadPool(cpuCount * 2);
// 自定义线程池(推荐)
ThreadPoolExecutor custom = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Worker-" + count.incrementAndGet());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
并发编程原则
优先使用并发集合:ConcurrentHashMap > Hashtable
优先使用线程池:复用线程,避免频繁创建
最小化锁范围:只锁必要的代码
避免嵌套锁:易死锁
用final减少可见性问题:不变对象天然线程安全
优先无锁方案:原子类 > synchronized
读写分离:ReadWriteLock适合读多写少
CompletableFuture异步:替代手动线程管理
ThreadLocal注意清理:线程池环境避免泄漏
充分测试:并发Bug难复现
核心: 能不用多线程就不用;必须用就用高层API;手写就加倍小心。
并发工具类
CountDownLatch
等待多个线程完成。
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 执行任务
System.out.println("Task done");
latch.countDown(); // 计数减1
}).start();
}
latch.await(); // 等待计数归0
System.out.println("All tasks completed");
CyclicBarrier
多线程同步点,所有线程到达后一起继续。
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached barrier");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("Thread waiting at barrier");
barrier.await(); // 等待其他线程
System.out.println("Thread continues");
}).start();
}
Semaphore
信号量,控制同时访问的线程数。
Semaphore semaphore = new Semaphore(3); // 最多3个线程
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
// 访问资源
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
应用: 限流、连接池、资源访问控制。