Vanson's Eternal Blog

Java之并发场景

Java basic.png
Published on
/8 mins read/---

Concurrency

解决方案典型场景关键优势注意事项
synchronized单 JVM 简单同步使用简单,JVM 内置支持锁粒度控制不当导致性能问题
ReentrantLock需要锁高级特性(可中断/公平/超时)可中断、超时、公平锁必须手动释放锁
读写锁读多写少的共享数据提升读并发性能写锁饥饿问题
原子类计数器 / 状态标志无锁高性能不适用于复杂操作
阻塞队列生产者-消费者模型解耦生产消费速度队列大小设置需谨慎
CountDownLatch主线程等待子任务完成灵活的任务协调一次性使用
CompletableFuture异步任务编排强大的链式调用回调地狱风险
不可变对象配置信息等只读数据天然线程安全需要创建新对象
ThreadLocal线程封闭(如日期格式化)避免同步开销内存泄漏风险
Redis 分布式锁跨 JVM 资源控制(如库存扣减)解决分布式协调问题需处理锁续期、故障转移
并行流CPU 密集型数据处理自动利用多核不适合 IO 密集型任务
  1. 优先考虑无锁方案(原子类、不可变对象)。
  2. 读写分离场景使用读写锁。
  3. 任务协调使用 CountDownLatch / CyclicBarrier
  4. 分布式环境必须用外部协调服务(Redis / ZooKeeper)。
  5. 复杂异步流程使用 CompletableFuture
  6. 避免过早优化,先保证正确性再提升性能。

基础同步机制

synchronized

基于 JVM 内置锁(监视器锁),保证同一时刻只有一个线程访问临界区。

• 并发不高、临界区短小、逻辑简单;不想额外引入 Lock 对象。 原理 • JVM 对象头 Mark Word + 监视器(Monitor);JDK6 之后有偏向/轻量/重量三级优化。 代码

public class Counter {
    private int count = 0;
 
    // 同步方法
    public synchronized void increment() {
        count++;
    }
 
    // 同步代码块(更细粒度控制)
    public void decrement() {
        synchronized(this) {
            count--;
        }
    }
}
 

volatile

确保变量修改的可见性(禁止指令重排序),不保证原子性。

public class TaskRunner {
    private volatile boolean isRunning = true; // 保证可见性
 
    public void stop() {
        isRunning = false; // 一个线程修改
    }
 
    public void run() {
        while(isRunning) { // 多个线程读取
            // 执行任务
        }
    }
}
 

JUC工具包

原子类

无锁的计数器/状态更新

• 高频、竞争不激烈的计数器 / 累加器(接口调用次数、日志序号等)。 原理 • 基于 CAS(Compare-And-Swap)实现无锁线程安全,一条 CPU 指令完成“读-改-写”。 代码

public class AtomicCounter {
    private final AtomicInteger count = new AtomicInteger(0);
 
    // 线程安全的自增
    public void increment() {
        count.incrementAndGet(); // CAS操作
    }
 
    // 高并发场景更优的计数器(Java 8+)
    public void highConcurrencyIncrement() {
        LongAdder adder = new LongAdder();
        adder.increment(); // 分段CAS,减少竞争
    }
}

可重入锁

ReentrantLock:需要尝试获取锁、超时机制或公平锁的场景

• 需要“可中断、可超时、公平性、多条件队列”等高级功能; • 替代 synchronized 在竞争激烈时获得更好吞吐量。

public class Account {
    private final Lock lock = new ReentrantLock();
    private int balance = 1000;
 
    public boolean transfer(int amount) {
        if (lock.tryLock(1, TimeUnit.SECONDS)) { // 尝试获取锁
            try {
                if (balance >= amount) {
                    balance -= amount;
                    return true;
                }
            } finally {
                lock.unlock(); // 必须手动释放
            }
        }
        return false;
    }
}
 

读写锁

ReentrantReadWriteLock 读多写少的共享数据(如缓存系统)

public class DataCache {
    private final Map<String, String> cache = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
    public String get(String key) {
        rwLock.readLock().lock(); // 获取读锁(可并发)
        try {
            return cache.get(key);
        } finally {
            rwLock.readLock().unlock();
        }
    }
 
    public void put(String key, String value) {
        rwLock.writeLock().lock(); // 获取写锁(独占)
        try {
            cache.put(key, value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}
 

阻塞队列

BlockingQueue

生产者-消费者模型

public class LogProcessor {
    private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
 
    // 生产者
    public void produceLog(String log) throws InterruptedException {
        queue.put(log); // 队列满时阻塞
    }
 
    // 消费者
    public void consumeLogs() {
        new Thread(() -> {
            while (true) {
                String log = queue.take(); // 队列空时阻塞
                processLog(log);
            }
        }).start();
    }
 
    private void processLog(String log) { /* 处理日志 */ }
}
 

CountDownLatch

场景:主线程等待多个子任务全部完成后再汇总。

public class StartupManager {
    public void startServices() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
 
        // 启动三个服务
        new Thread(() -> { startDB(); latch.countDown(); }).start();
        new Thread(() -> { startCache(); latch.countDown(); }).start();
        new Thread(() -> { startAPI(); latch.countDown(); }).start();
 
        latch.await(); // 等待所有服务启动
        System.out.println("所有服务已就绪!");
    }
}

CompletableFuture

异步任务编排

public class OrderService {
    public CompletableFuture<Order> processOrderAsync(Order order) {
        return CompletableFuture.supplyAsync(() -> validate(order))
                .thenApplyAsync(this::calculatePrice)
                .thenApplyAsync(this::applyDiscount)
                .thenComposeAsync(this::saveToDatabase);
    }
 
    // 示例:组合多个异步任务
    public CompletableFuture<Void> executeParallelTasks() {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1");
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");
 
        return CompletableFuture.allOf(task1, task2)
                .thenRun(() -> System.out.println("所有任务完成"));
    }
}
 

无锁编程与特殊场景

不可变对象

场景:共享数据只读不写(如配置信息)

// 所有字段final + 无setter
public final class ImmutableConfig {
    private final String serverUrl;
    private final int timeout;
 
    public ImmutableConfig(String url, int timeout) {
        this.serverUrl = url;
        this.timeout = timeout;
    }
 
    // 只有getter方法
    public String getServerUrl() { return serverUrl; }
    public int getTimeout() { return timeout; }
}
 

ThreadLocal

场景:线程封闭(如 SimpleDateFormat 安全使用)

public class DateFormatter {
    // 每个线程独立副本
    private static final ThreadLocal<SimpleDateFormat> formatter =
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
 
    public String format(Date date) {
        return formatter.get().format(date); // 线程安全
    }
}

分布式并发控制

Redis 分布式锁(Redisson 实现)

场景:跨 JVM 的共享资源控制(如库存扣减)

public class InventoryService {
    private final RedissonClient redisson;
 
    public boolean deductStock(String itemId, int quantity) {
        RLock lock = redisson.getLock("LOCK_STOCK:" + itemId);
        try {
            // 尝试加锁(等待10秒,锁有效期30秒)
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                int stock = getStockFromDB(itemId);
                if (stock >= quantity) {
                    updateStock(itemId, stock - quantity);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock(); // 确保释放锁
        }
    }
}