FunTester Java 并发编程基础(下)

FunTester · 2024年02月20日 · 2169 次阅读

书接上回,继续分享 Java 并发编程基础内容。

Deadlock、Livelock 和 Thread Starvation

Deadlock

Deadlock 是两个或多个线程无法继续执行的情况,因为它们都在等待其他线程释放资源或锁。这会导致任何线程都无法取得进展的停滞状态。死锁通常是由于不正确的同步或针对导致阻塞的资源的资源分配而引起的。请看一个涉及两个线程和两个锁的死锁场景的示例:


private static final Object lock1 = new Object()  

private static final Object lock2 = new Object()  

static void main(String[] args) {  
    Thread thread1 = new Thread(() -> {  
        synchronized (lock1) {  
            println("线程1持有锁1")  
            try {  
                Thread.sleep(100)  
            } catch (InterruptedException e) {  
            }  
            println("线程1等待获取锁2")  
            synchronized (lock2) {  
                println("线程池持有锁2")  
            }  
        }  
    })  

    Thread thread2 = new Thread(() -> {  
        synchronized (lock2) {  
            println("线程2持有锁2")  
            try {  
                Thread.sleep(100)  
            } catch (InterruptedException e) {  
            }  
            println("线程2等待获取锁1")  
            synchronized (lock1) {  
                println("线程2持有锁1")  
            }  
        }  
    })  
    thread1.start()  
    thread2.start()  
}

在这个例子中:

  • thread1获取lock1然后等待lock2
  • thread2获取lock2然后等待lock1

两个线程现在都在等待对方持有的资源,从而导致死锁。该程序将无限期挂起。如果我们此时打印现成转储则可以看到两个线程的状态:

"Thread-1" #28 [25091] prio=5 os_prio=31 cpu=13.87ms elapsed=24.40s tid=0x00007fceea86d000 nid=25091 waiting for monitor entry  [0x0000700010268000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at com.funtest.temp.ImmutablePerson$_main_closure2.doCall(ImmutablePerson.groovy:35)
    - waiting to lock <0x000000070e355c90> (a java.lang.Object)
    - locked <0x000000070e32b478> (a java.lang.Object)

克服僵局

可以通过多种技术来避免或解决死锁:

  1. 使用超时:设置获取锁的超时时间。如果线程无法在指定时间内获取锁,它可以释放其持有的任何锁并重试或中止。ReentrantLock使用该包可以轻松实现此功能java.util.concurrent.locks
  2. 锁顺序:建立跨所有线程获取锁的一致顺序,以防止循环等待,如下例所示。
  3. 资源分配图:使用资源分配图等算法来检测死锁并从死锁中恢复。
  4. 避免死锁的设计:设计多线程代码以最大限度地减少死锁的可能性,例如使用类等更高级别的抽象java.util.concurrent

private static final Lock lock1 = new ReentrantLock()
private static final Lock lock2 = new ReentrantLock()

static void main(String[] args) {
    Runnable acquireLocks = () -> {
        lock1.lock()
        try {
            println(Thread.currentThread().getName() + ": 持有锁1")
            try {
                Thread.sleep(100)
            } catch (InterruptedException e) {
            }
            println(Thread.currentThread().getName() + ": 等待获取锁2")

            boolean acquiredLock2 = lock2.tryLock(500, TimeUnit.MILLISECONDS)
            if (acquiredLock2) {
                try {
                    println(Thread.currentThread().getName() + ": 获取锁2")
                } finally {
                    lock2.unlock()
                }
            } else {
                println(Thread.currentThread().getName() + ": 获取锁2超时")
            }
        } finally {
            lock1.unlock()
        }
    }
    Thread thread1 = new Thread(acquireLocks)
    Thread thread2 = new Thread(acquireLocks)
    thread1.start()
    thread2.start()
}

Livelock

Livelock 是多线程编程中一种特殊形式的死锁,其中线程之间通过不断地响应彼此而无法继续执行的情况。与传统死锁不同,Livelock 中的线程并不被阻塞,而是不断尝试执行,但总是在彼此之间交替失败。

典型的 Livelock 场景包括两个或多个线程试图通过调整自己的状态来避免冲突,但却在不断地互相干扰中无法取得进展。例如,两个人试图在狭窄的通道中让对方通过,但却不断地向相同的一侧让路,导致两人都无法通过。在软件系统中,Livelock 可能会发生在处理并发请求时,其中多个线程试图避免争用资源,但由于彼此之间的反应而无法顺利进行。

解决 Livelock 的方法通常涉及引入随机性或者超时机制,以打破线程之间的循环竞争。例如,通过引入随机的等待时间或者限制重试次数来破坏线程之间的死循环,使它们最终能够恢复正常执行。预防 Livelock 的关键在于设计良好的并发控制机制,避免线程在竞争资源时陷入无法解决的循环状态。

public class LivelockExample {  
    // 勺子类  
    static class Spoon {  
        private Diner owner;  

        public Spoon(Diner owner) {  
            this.owner = owner;  
        }  

        // 使用勺子  
        public synchronized void use() {  
            System.out.println(owner.getName() + "使用了勺子");  
        }  

        // 设置勺子的所有者  
        public synchronized void setOwner(Diner owner) {  
            this.owner = owner;  
        }  
    }  

    // 就餐者类  
    static class Diner {  
        private String name;  
        private boolean isHungry;  

        public Diner(String name) {  
            this.name = name;  
            this.isHungry = true;  
        }  

        public String getName() {  
            return name;  
        }  

        public boolean isHungry() {  
            return isHungry;  
        }  

        // 与配偶一起进餐  
        public void eatWith(Spoon spoon, Diner spouse) {  
            while (isHungry) {  
                // 如果勺子不属于当前就餐者,继续等待  
                if (spoon.owner != this) {  
                    continue;  
                }  

                // 如果配偶也在等待就餐,让配偶先吃  
                if (spouse.isHungry()) {  
                    System.out.println(getName() + ": 亲爱的 " + spouse.getName() + ",你先吃吧。");  
                    spoon.setOwner(spouse);  
                    continue;  
                }  

                // 否则就餐者使用勺子,标记自己不再饥饿,并将勺子所有权交给配偶  
                spoon.use();  
                isHungry = false;  
                System.out.println(getName() + ": 我吃完了,你可以吃了 " + spouse.getName() + ".");  
                spoon.setOwner(spouse);  
            }  
        }  
    }  

    public static void main(String[] args) {  
        final Diner husband = new Diner("丈夫");  
        final Diner wife = new Diner("妻子");  

        final Spoon sharedSpoon = new Spoon(husband);  

        // 创建丈夫线程,与妻子一起进餐  
        Thread husbandThread = new Thread(() -> husband.eatWith(sharedSpoon, wife));  
        husbandThread.start();  

        // 创建妻子线程,与丈夫一起进餐  
        Thread wifeThread = new Thread(() -> wife.eatWith(sharedSpoon, husband));  
        wifeThread.start();  
    }  
}

Thread Starvation

线程饥饿(Thread Starvation)是指某些线程由于无法获得所需的资源而被持续地阻塞的情况。这种情况通常发生在多线程系统中,其中一些线程可能会因为长时间等待资源而无法执行,而其他线程则可以持续地访问这些资源。线程饥饿可能导致性能下降和系统响应时间延迟,甚至可能导致系统崩溃。常见的线程饥饿场景包括资源竞争激烈、优先级反转等。解决线程饥饿问题的方法包括使用公平的资源分配机制、优化资源的使用方式以及合理设置线程的优先级等。

java.util.concurrent 包

java.util.concurrent包提供了大量支持并发和多线程编程的类和接口。这些类提供了管理线程、同步和并发数据结构的高级抽象,使编写高效且线程安全的代码变得更加容易。以下是一些最流行的类和接口的概述。

Executor 和 ExecutorService

Executor是一个接口,表示能够异步执行任务的对象。它将任务提交与任务执行解耦。ExecutorService是它的子接口Executor,它通过提供管理执行器生命周期和控制任务执行的方法来扩展功能。换句话说,ExecutorService是线程池的核心接口。

ExecutorService实现类提供了多种同时管理和执行任务的方法,每种方法都有自己的优点和用例。您可以在下表中找到最常用的。根据您的具体要求选择适当的实现,但请记住,在调整线程池大小时,根据运行代码的计算机具有的逻辑核心数量来确定线程池的大小通常很有用。您可以通过调用获得该值Runtime.getRuntime().availableProcessors()

执行器服务实现 描述
ThreadPoolExecutor 一种多功能且可定制的执行器服务,允许您创建具有指定核心和最大线程数、自定义线程工厂等的线程池。
ScheduledThreadPoolExecutor 扩展ThreadPoolExecutor以提供在特定时间或间隔执行任务的调度功能。
ForkJoinPool 专门ExecutorService为并行执行而设计,特别适合使用 Fork-Join 框架的递归任务和算法。
WorkStealingPool 其实现ForkJoinPool使用工作窃取算法在工作线程之间有效分配任务。
SingleThreadExecutor 创建具有单个工作线程的执行程序服务,适合一次顺序执行一个任务。
FixedThreadPool 固定大小的线程池执行器,管理预定数量的工作线程,非常适合固定工作负载。
CachedThreadPool 线程池执行器,可以根据任务需求自适应调整线程数量,适合短生命周期和突发性任务。
SingleThreadScheduledExecutor 创建一个单线程调度执行器,它允许调度任务在特定时间或以固定速率间隔执行。
FixedScheduledThreadPool 具有调度能力的固定大小线程池,将固定大小线程池的特性与任务调度相结合。

此外,java.util.concurrent还提供了Executors包含静态工厂方法的类,用于轻松创建上述线程池类型等。

可用的任务类型如下表所示。

Runnable Tasks 可运行任务是简单的、无返回的任务,它们实现Runnable接口并执行操作而不产生结果。
Runnable Tasks 可调用任务与可运行任务类似,但可以返回结果或引发异常。他们实现了Callable<V>接口。
Runnable Tasks 异步任务通常由Future<V>接口表示,并且可以独立于调用线程运行。FutureTask是它的具体实现Future,允许您包装CallableorRunnable并将其与执行器一起使用。

ExecutorService#submit使用、ExecutorService#invokeAll或将任务提交给执行器服务ExecutorService#invokeAny

ExecutorService返回实例的大多数方法Future<V>Future是一个表示异步计算结果的接口。它公开了检查计算是否完成或阻塞直到结果可用的方法。下面是一个例子。

static void main(String[] args) {  
    // 创建一个固定大小为2的线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(2)  
    // Runnable接口的run()方法没有返回值,所以使用Runnable接口的任务不会返回结果  
    Runnable runnableTask = () -> {  
        String threadName = Thread.currentThread().getName()  
        println("执行线程 " + threadName)  
    }  
    // Callable接口的call()方法返回一个结果,所以使用Callable接口的任务会返回结果  
    List<Callable<String>> callableTasks = List.of(  
            () -> {  
                String threadName = Thread.currentThread().getName()  
                return "任务执行线程:   " + threadName  
            },  
            () -> {  
                String threadName = Thread.currentThread().getName()  
                return "任务执行线程:   " + threadName  
            }  
    )  
    // 提交Runnable任务给线程池执行  
    executorService.submit(runnableTask)  
    try {  
        // 提交Callable任务给线程池执行,invokeAll()方法等待所有任务完成  
        List<Future<String>> futures = executorService.invokeAll(callableTasks)  
        // 获取所有任务的结果  
        for (Future<String> future : futures) {  
            println(future.get())  
        }  
        // 使用invokeAny提交一组Callable任务并等待第一个完成的任务  
        String firstResult = executorService.invokeAny(callableTasks)  
        println("第一个任务完成结果: " + firstResult)  
    } catch (Exception e) {  
        e.printStackTrace()  
    }  
    // 关闭线程池  
    executorService.shutdown()  
}

控制台输出:

执行线程 pool-1-thread-1
任务执行线程:   pool-1-thread-2
任务执行线程:   pool-1-thread-2
第一个任务完成结果: 任务执行线程:   pool-1-thread-2

Semaphore

Semaphore(信号量)是一种用于管理并发访问资源的同步机制,常用于多线程编程中。Semaphore 维护了一个内部的计数器,该计数器表示可用的许可证数量。线程在访问共享资源之前必须先获取许可证,Semaphore 控制着许可证的发放和归还。

Semaphore 通常用于限制同时访问某个资源的线程数量,或者在资源有限的情况下控制并发访问的数量。通过在初始化 Semaphore 时指定初始的许可证数量,可以限制并发访问的数量。当线程需要访问资源时,首先尝试从 Semaphore 获取许可证,如果许可证可用,则允许线程访问资源,并将许可证数量减少;如果许可证不可用,则线程将被阻塞,直到有其他线程释放了许可证。

Semaphore 提供了两个主要的方法:acquire()release()acquire()方法用于获取许可证,如果许可证不可用则线程将被阻塞,直到许可证可用;release()方法用于释放许可证,使得其他等待许可证的线程可以获取许可证并访问资源。

Semaphore 的应用场景包括但不限于:限制数据库连接池中的并发连接数、控制线程池中同时执行的任务数量、限制文件资源的并发访问数量等。在这些场景下,Semaphore 可以帮助管理并发访问,避免资源竞争和过度使用资源,从而提高系统的性能和稳定性。

Semaphore 是一种强大的同步工具,能够有效地管理并发访问资源,控制线程的数量,并提供了灵活的并发编程解决方案。

CountDownLatch

CountDownLatch(倒计时门栓)是一种多线程同步工具,用于控制一个或多个线程等待其他线程完成操作后再执行。CountDownLatch 维护了一个计数器,该计数器在初始化时被设置为一个正整数,表示需要等待完成的操作数量。每当一个线程完成了一个操作,它会调用 CountDownLatch 的countDown()方法来将计数器减一。其他线程可以通过调用await()方法来等待计数器的值变为零,一旦计数器的值为零,所有等待的线程将被释放,并可以继续执行。

CountDownLatch 的主要应用场景包括但不限于:在主线程等待所有子线程完成任务后再继续执行、实现并发测试中的线程同步、等待多个服务初始化完成后再启动应用程序等。例如,在并发测试中,可以使用 CountDownLatch 来确保所有测试线程都完成了测试任务后再进行结果汇总和分析。

CountDownLatch 提供了灵活的并发编程解决方案,能够帮助开发人员处理复杂的线程同步问题。通过合理设置计数器的初始值和调用countDown()方法的时机,可以实现精确控制线程的等待和执行顺序。然而,需要注意的是,一旦计数器的值被设置为零,就无法重置,因此 CountDownLatch 只能被使用一次。

CountDownLatch 是一种简单而强大的多线程同步工具,能够帮助开发人员实现线程之间的协调和同步,提高系统的并发性能和可靠性。

CyclicBarrier

CyclicBarrier(循环栅栏)是一种多线程同步工具,用于在多个线程之间创建一个屏障,只有当所有参与线程都到达了屏障位置时,才允许它们继续执行。与 CountDownLatch 不同的是,CyclicBarrier 的计数器可以被重置并循环使用。

CyclicBarrier 维护了一个计数器和一个屏障动作,计数器初始化时设置为要等待的线程数量。每个线程在到达屏障位置时,会调用 CyclicBarrier 的await()方法来等待其他线程。当所有线程都到达了屏障位置时,CyclicBarrier 会触发屏障动作,所有线程同时开始执行下一阶段的任务。一旦所有线程都离开了屏障,计数器会被重置,可以继续使用。

CyclicBarrier 通常用于将任务分解为多个子任务,各个线程独立执行子任务,最终等待所有子任务完成后再汇总结果。它也常用于并行计算中的分阶段计算和任务流水线等场景。例如,在并行排序算法中,可以将数据分成多个区块,每个线程对一个区块进行排序,然后使用 CyclicBarrier 等待所有线程完成排序后再进行合并排序。

CyclicBarrier 提供了一种简单而强大的线程同步机制,能够帮助开发人员实现复杂的并发任务协调和同步。通过合理设置计数器的初始值和定义屏障动作,可以实现灵活的多线程协作方案。然而,需要注意的是,CyclicBarrier 的计数器只能被重置一次,因此在重复使用时需要格外小心。

CyclicBarrier 是一种重要的多线程同步工具,能够提高系统的并发性能和可靠性,为开发人员提供了便捷而高效的并发编程解决方案。

并发集合

这些并发集合类为各种用例提供​​线程安全的数据结构,允许多个线程并发访问和修改数据,同时确保数据一致性并最大限度地减少争用。选择使用哪个类取决于并发应用程序的特定需求。

并发集合类 描述
ConcurrentHashMap 接口的高度并发、线程安全实现Map,专为多线程环境中的高效读写操作而设计。
ConcurrentSkipListMap 基于跳跃列表数据结构的并发排序映射,提供并发访问和排序顺序。
BlockingQueue(LinkedBlockingQueueDelayQueuePriorityBlockingQueueSynchronousQueue) 阻塞队列是线程安全的、有界或无界的队列,支持生产者 - 消费者场景的阻塞操作。InDelayQueue元素根据其延迟被删除,inPriorityBlockingQueue基于 aComparator和 inSynchronousQueue元素仅在新元素到达时才被删除。
ConcurrentLinkedQueue 基于链式节点结构的线程安全、非阻塞、无界队列,适用于高并发的生产者消费者场景。
ConcurrentLinkedDeque 线程安全、非阻塞、双端队列,支持两端并发访问和修改。
CopyOnWriteArrayList 每当进行修改时都会创建其内部数组的新副本的列表,以确保读取繁重的工作负载的线程安全。
CopyOnWriteArraySet 由 a 支持的线程安全集CopyOnWriteArrayList,为读取繁重的集提供线程安全性。
ConcurrentSkipListSet 基于跳跃列表数据结构的并发排序集,提供并发访问和排序顺序。

原子学

java.util.concurrent.atomic包提供了支持对单个变量进行原子操作的类。这些类设计用于多线程应用程序,以确保对共享变量的操作以原子方式执行,而不需要显式同步。这有助于避免数据竞争并确保线程安全。

常见原子类:

  1. AtomicInteger:可以原子递增、递减或更新的整数值。
  2. AtomicLong:支持原子操作的长值。
  3. AtomicBoolean:具有用于设置和获取的原子操作的布尔值。
  4. AtomicReference:支持原子更新的通用引用类型。
  5. AtomicStampedReference:它的一个变体AtomicReference包括用于检测更改的版本标记。
  6. AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray:原子值数组。

它们适用于需要对变量执行增量、比较和设置和更新等操作,而又不会因并发访问而导致数据损坏的情况。

synchronize 块相比,锁提供了更灵活、更先进的锁定机制,包括可重入、公平性和读写锁定等功能。该java.util.concurrent.locks包包含两个接口,Lock以及ReadWriteLock它们的实现类ReentrantLockReentrantReadWriteLock分别。

ReentrantLock是一种可重入互斥锁,其基本行为与synchronized块相同,但具有附加功能。它可用于控制对共享资源的访问,并提供更多的灵活性和对锁定的控制,例如获取有关锁定状态的信息、非阻塞tryLock()和可中断锁定。在此示例中,我们使用 aReentrantLock来保护代码的关键部分。

private static ReentrantLock lock = new ReentrantLock()  

static void main(String[] args) {  
    Runnable task = () -> {  
        lock.lock()// 获取锁  
        try {  
            System.out.println("Thread " + Thread.currentThread().getName() + " 获取到了锁")  
            // 模拟业务处理  
            Thread.sleep(1000)  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt()  
        } finally {  
            lock.unlock() // 释放锁  
            System.out.println("Thread " + Thread.currentThread().getName() + " 释放了锁")  
        }  
    }  
    // 创建多个线程来访问临界区  
    for (i in 0..<2) {  
        new Thread(task).start()  
    }  
}

ReentrantReadWriteLock为读和写提供单独的锁。它用于允许多个线程同时读取共享资源,同时确保一次只有一个线程可以写入该资源。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册