虚拟线程
1. 概念
1.1. 平台线程
我们常用的Java线程与系统内核线程是一一对应的,系统内核的线程调度程序负责调度 Java线程。为了增加应用程序的性能,我们会增加越来越多的Java线程,而由于多种因素,平台线程的数量受到了很大的约束。
- 资源有限导致系统线程总量有限,进而导致与系统线程一一对应的平台线程有限。
- 平台线程的调度依赖于系统的线程调度程序,当平台线程创建过多,会消耗大量资源用于处理线程上下文切换。
- 每个平台线程都会开辟一块私有的栈空间,大量平台线程会占据大量内存,每个平台线程需要占用约1MB左右的内存。
这些限制导致开发者不能极大量地创建平台线程,为了满足性能需要,需要引入池化技术、添加任务队列构建消费者-生产者模式等方案去让平台线程适配多变的现实场景。
1.2. 虚拟线程
虚拟线程(Virtual Thread)是JDK而不是OS实现的轻量级线程(Lightweight Process,LWP),由JVM调度。许多虚拟线程共享同一个操作系统线程,虚拟线程的数量可以远大于操作系统线程的数量。
- 可以大量创建,例如十万级别、百万级别,而不会占据大量内存
- 由
JVM进行调度和状态切换,并且与系统线程"松绑" - 用法与原来平台线程差不多,或者说尽量兼容平台线程现存的
API
JVM 调度程序通过平台线程(载体线程)来管理虚拟线程,一个平台线程可以在不同的时间执行不同的虚拟线程(多个虚拟线程挂载在一个平台线程上),当虚拟线程被阻塞或等待时,平台线程可以切换到执行另一个虚拟线程。
相比较于平台线程来说,虚拟线程是廉价且轻量级的,使用完后立即被销毁,因此它们不需要被重用或池化,每个任务可以有自己专属的虚拟线程来运行。虚拟线程暂停和恢复来实现线程之间的切换,避免了上下文切换的额外耗费,兼顾了多线程的优点,简化了高并发程序的复杂,可以有效减少编写、维护和观察高吞吐量并发应用程序的工作量。
2. 虚拟线程调度
基于操作系统线程实现的平台线程,JDK依赖于操作系统中的线程调度程序来进行调度。而对于虚拟线程,JDK有自己的调度器。JDK的调度器没有直接将虚拟线程分配给系统线程,而是将虚拟线程分配给平台线程(类似于go语言中G-M-P模型,虚拟线程和平台线程是M:N调度)。JDK 的虚拟线程调度器是一个在FIFO模式下运行的类似ForkJoinPool的线程池。调度器的并行数量取决于调度器虚拟线程的平台线程数量。默认情况下是CPU可用核心数量,但可以使用系统属性jdk.virtualThreadScheduler.parallelism进行调整。
JDK 的虚拟线程调度器是一个在 FIFO 模式下运行的类似ForkJoinPool的线程池。调度器的并行数量取决于调度器虚拟线程的平台线程数量。默认情况下是 CPU 可用核心数量,但可以使用系统属性jdk.virtualThreadScheduler.parallelism进行调整。注意,这里的ForkJoinPool与ForkJoinPool.commonPool()不同,ForkJoinPool.commonPool()用于实现并行流,并在 LIFO 模式下运行。
ForkJoinPool和ExecutorService的工作方式不同,ExecutorService有一个等待队列来存储它的任务,其中的线程将接收并处理这些任务。而ForkJoinPool的每一个线程都有一个等待队列,当一个由线程运行的任务生成另一个任务时,该任务被添加到该线程的等待队列中,当我们运行Parallel Stream,一个大任务划分成两个小任务时就会发生这种情况。
为了防止线程饥饿问题,当一个线程的等待队列中没有更多的任务时,ForkJoinPool还实现了另一种模式,称为任务窃取, 也就是说:饥饿线程可以从另一个线程的等待队列中窃取一些任务。这和 Go G-M-P 模型中 work stealing 机制有异曲同工之妙。
3. 虚拟线程执行
当虚拟线程执行I/O或JDK中的其他阻止操作(如BlockingQueue.take()时,虚拟线程会从平台线程上卸载。当阻塞操作准备完成时(例如,网络 IO 已收到字节数据),调度程序将虚拟线程挂载到平台线程上以恢复执行。
JDK中的绝大多数阻塞操作会将虚拟线程从平台线程上卸载,使平台线程能够执行其他工作任务。但是,JDK中的少数阻塞操作不会卸载虚拟线程,因此会阻塞平台线程。因为操作系统级别(例如许多文件系统操作)或 JDK级别(例如Object.wait())的限制。这些阻塞操作阻塞平台线程时,将通过暂时增加平台线程的数量来补偿其他平台线程阻塞的损失。因此,调度器的ForkJoinPool中的平台线程数量可能会暂时超过CPU可用核心数量。调度器可用的平台线程的最大数量可以使用系统属性jdk.virtualThreadScheduler.maxPoolSize进行调整。这个阻塞补偿机制与Go G-M-P模型中hand off机制有异曲同工之妙。
在以下两种情况下,虚拟线程会被固定到运行它的平台线程,在阻塞操作期间无法卸载虚拟线程:
- 当在
synchronized块或方法中执行代码时。 - 当执行
native方法或foreign function时。
虚拟线程被固定不会影响程序运行的正确性,但它可能会影响系统的并发度和吞吐量。如果虚拟线程在被固定时执行I/O或BlockingQueue.take()等阻塞操作,则负责运行它的平台线程在操作期间会被阻塞。如果虚拟线程没有被固定,那会执行I/O等阻塞操作时会从平台线程上卸载。
4. 优缺点
4.1. 优点
- 非常轻量级:可以在单个线程中创建成百上千个虚拟线程而不会导致过多的线程创建和上下文切换。
- 简化异步编程: 虚拟线程可以简化异步编程,使代码更易于理解和维护。它可以将异步代码编写得更像同步代码,避免了回调地狱(Callback Hell)。
- 减少资源开销: 由于虚拟线程是由 JVM 实现的,它能够更高效地利用底层资源,例如 CPU 和内存。虚拟线程的上下文切换比平台线程更轻量,因此能够更好地支持高并发场景。
4.2. 缺点
- 不适用于计算密集型任务: 虚拟线程适用于I/O密集型任务,但不适用于计算密集型任务,因为密集型计算始终需要CPU资源作为支持。
- 与某些第三方库不兼容: 虽然虚拟线程设计时考虑了与现有代码的兼容性,但某些依赖平台线程特性的第三方库可能不完全兼容虚拟线程。
5. 创建虚拟线程
Thread thread1 = Thread.ofVirtual().start(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
System.out.println("Virtual Thread 1 is running");
});
Thread thread2 = Thread.startVirtualThread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ignored) {
}
System.out.println("Virtual Thread 2 is running");
});
try (var virtualThreadPerTaskExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
virtualThreadPerTaskExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ignored) {
}
System.out.println("Virtual Thread 3 is running");
});
}
ThreadFactory factory = Thread.ofVirtual().factory();
Thread thread4 = factory.newThread(() -> {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ignored) {
}
System.out.println("Virtual Thread 4 is running");
});
thread4.start();
thread1.join();
thread2.join();
thread4.join();
6. 虚拟线程卸载实验
6.1. sleep卸载
var threads = IntStream.range(0, 5).mapToObj(index -> Thread.ofVirtual().unstarted(() -> {
System.out.println(Thread.currentThread());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread());
})).toList();
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
可以看到虚拟线程在sleep前后都切换了实际工作的平台线程
VirtualThread[<span class="tag">#32</span>]/runnable@ForkJoinPool-1-worker-1
VirtualThread[<span class="tag">#34</span>]/runnable@ForkJoinPool-1-worker-3
VirtualThread[<span class="tag">#33</span>]/runnable@ForkJoinPool-1-worker-2
VirtualThread[<span class="tag">#35</span>]/runnable@ForkJoinPool-1-worker-4
VirtualThread[<span class="tag">#36</span>]/runnable@ForkJoinPool-1-worker-5
VirtualThread[<span class="tag">#34</span>]/runnable@ForkJoinPool-1-worker-4
VirtualThread[<span class="tag">#35</span>]/runnable@ForkJoinPool-1-worker-8
VirtualThread[<span class="tag">#32</span>]/runnable@ForkJoinPool-1-worker-1
VirtualThread[<span class="tag">#36</span>]/runnable@ForkJoinPool-1-worker-7
VirtualThread[<span class="tag">#33</span>]/runnable@ForkJoinPool-1-worker-7
虚拟线程 sleep 时真正调用的方法是 Continuation.yield,会将当前虚拟线程的堆栈由平台线程的堆栈转移到Java堆内存,然后将其他就绪虚拟线程的堆栈由Java堆中拷贝到当前平台线程的堆栈中继续执行。
[!note]
执行 IO 或BlockingQueue.take()等阻塞操作时会跟 sleep 一样导致虚拟线程切换。虚拟线程的切换也是一个相对耗时的操作,但是与平台线程的上下文切换相比,还是轻量很多的
在synchronized代码块中运行的虚拟线程不会发生卸载
var threads = IntStream.range(0, 5).mapToObj(index -> Thread.ofVirtual().unstarted(() -> {
synchronized (JDK21FeatureTest.class) {
System.out.println(Thread.currentThread());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread());
}
})).toList();
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
执行结果
VirtualThread[<span class="tag">#35</span>]/runnable@ForkJoinPool-1-worker-3
VirtualThread[<span class="tag">#35</span>]/runnable@ForkJoinPool-1-worker-3
VirtualThread[<span class="tag">#37</span>]/runnable@ForkJoinPool-1-worker-5
VirtualThread[<span class="tag">#37</span>]/runnable@ForkJoinPool-1-worker-5
VirtualThread[<span class="tag">#36</span>]/runnable@ForkJoinPool-1-worker-4
VirtualThread[<span class="tag">#36</span>]/runnable@ForkJoinPool-1-worker-4
VirtualThread[<span class="tag">#34</span>]/runnable@ForkJoinPool-1-worker-2
VirtualThread[<span class="tag">#34</span>]/runnable@ForkJoinPool-1-worker-2
VirtualThread[<span class="tag">#33</span>]/runnable@ForkJoinPool-1-worker-1
VirtualThread[<span class="tag">#33</span>]/runnable@ForkJoinPool-1-worker-1
synchronized代码块借助内置锁(监视器锁)来保证线程安全。当一个线程进入synchronized代码块时,它会获取对应的锁,要是锁被其他线程占用,该线程就会被阻塞。这种阻塞是由操作系统内核来处理的,会使线程进入等待状态,直到锁被释放。可以<font color="#ff0000">使用ReentrantLock替代。
6.2. IO卸载
var threads = IntStream.range(0, 5).mapToObj(index -> Thread.ofVirtual().unstarted(() -> {
System.out.println(Thread.currentThread());
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://localhost:5000/account"))
.GET()
.build();
MTLSHttpClientTest.getHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
} catch (Exception ignored) {
}
System.out.println(Thread.currentThread());
})).toList();
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
执行结果
VirtualThread[<span class="tag">#35</span>]/runnable@ForkJoinPool-1-worker-2
VirtualThread[<span class="tag">#34</span>]/runnable@ForkJoinPool-1-worker-1
VirtualThread[<span class="tag">#36</span>]/runnable@ForkJoinPool-1-worker-3
VirtualThread[<span class="tag">#37</span>]/runnable@ForkJoinPool-1-worker-4
VirtualThread[<span class="tag">#38</span>]/runnable@ForkJoinPool-1-worker-4
VirtualThread[<span class="tag">#37</span>]/runnable@ForkJoinPool-1-worker-2
VirtualThread[<span class="tag">#34</span>]/runnable@ForkJoinPool-1-worker-3
VirtualThread[<span class="tag">#38</span>]/runnable@ForkJoinPool-1-worker-2
VirtualThread[<span class="tag">#36</span>]/runnable@ForkJoinPool-1-worker-10
VirtualThread[<span class="tag">#35</span>]/runnable@ForkJoinPool-1-worker-11
7. 性能测试
7.1. IO密集型
应用程序符合下面两点特征,使用虚拟线程可以显著提高程序吞吐量:
- 程序并发任务数量很高。
- IO密集型、工作负载不受 CPU 约束。
虚拟线程有助于提高服务端应用程序的吞吐量,因为此类应用程序大部分都是CRUD业务,且有大量并发,而且这些任务通常会有大量的IO等待。
@Test
public void testPerformance() {
AtomicInteger threadNum = new AtomicInteger(0);
// 开启线程 统计平台线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
if (threadInfo.length > threadNum.get()) {
threadNum.set(threadInfo.length);
}
}, 10, 10, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
// 限制虚拟线程使用的平台线程数量
// System.setProperty("jdk.virtualThreadScheduler.parallelism", "4");
// 虚拟线程
// ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// 使用平台线程
ExecutorService executor = Executors.newFixedThreadPool(1000);
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
try {
// 线程睡眠 0.5 s,模拟业务处理
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignored) {
}
});
}
executor.close();
System.out.printf("totalMillis:%dms\tmax platform thread/os thread num: %d\n", System.currentTimeMillis() - start, threadNum.get());
}
虚拟线程测试结果
- 不同平台线程数量对结果影响不大
# 限制虚拟线程使用的平台线程数量
# System.setProperty("jdk.virtualThreadScheduler.parallelism", "4");
# ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
totalMillis:850ms max platform thread/os thread num: 19
totalMillis:980ms max platform thread/os thread num: 27
平台线程测试结果
totalMillis:25362ms max platform thread/os thread num: 214
totalMillis:5247ms max platform thread/os thread num: 1014
totalMillis:2895ms max platform thread/os thread num: 2014
totalMillis:8038ms max platform thread/os thread num: 9895
7.2. 计算密集型
将上述代码替换成cpu密集型的指数运算,可以发现计算密集型任务使用和CPU核心数2倍左右的平台线程数会有更好的效果。如果我们限制虚拟线程使用的平台线程数量,会看到任务处理时间大大增加,说明计算密集型任务还是依赖底层平台线程。
如果平台线程数增加,可能会加剧线程上下文切换,反而适得其反。
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < 100000; j++) {
// 模拟一些工作
Math.pow(random.nextInt(100), random.nextInt(10));
}
});
}
虚拟线程测试结果
- 不同平台线程数量对测试结果影响很大
# 限制虚拟线程使用的平台线程数量
# System.setProperty("jdk.virtualThreadScheduler.parallelism", "4");
# ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
totalMillis:23099ms max platform thread/os thread num: 19
totalMillis:3657ms max platform thread/os thread num: 26
平台线程测试结果
# ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
totalMillis:3703ms max platform thread/os thread num: 26
# ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
totalMillis:3147ms max platform thread/os thread num: 38
totalMillis:3138ms max platform thread/os thread num: 1014
totalMillis:9340ms max platform thread/os thread num: 9404
8. ref
https://zhuanlan.zhihu.com/p/669327999
https://www.zhihu.com/question/536743167
https://javaguide.cn/java/new-features/java19.html