Mushroom Notes Mushroom Notes
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档

kinoko

一位兴趣使然的热心码农
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
      • 什么是JUC?
      • 什么是线程?
        • 并发并行
        • Java线程的几种状态
        • ※ JAVA开启多线程的方式
      • ※ 锁
        • synchronized
        • Lock
        • Lock和synchronized的区别
        • 几种锁的概念
      • JMM
        • volatile
        • 原子类Atomic
        • synchronized与CAS比较
      • ※ 线程通信
        • wait和sleep
        • Lock.Condition
        • CountdownLatch
        • CyclicBarrier
        • Semaphore
      • ※ 线程池
        • 七大参数
        • 七大阻塞队列
        • 四种拒绝策略
      • ForkJoin
        • 分治法
        • 工作窃取算法
        • ForkJoin框架
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

  • JDK版本特性

  • JVM

  • Java
  • JavaSE
kinoko
2023-12-17
目录

JUC

# 什么是JUC?

JUC指的是java.util.concurrent包下的内容,包含三个并发编程包:

  • java.util.concurrent
  • java.util.concurrent.atomic
  • java.util.concurrent.locks

# 什么是线程?

进程(资源分配的最小单元):指在系统中正在运行的一个应用程序;程序一旦运行就是进程。

线程(程序执行的最小单元):系统分配处理器时间资源的基本单元,进程内有多个线程。

※ Java进程默认有两个线程:主线程和GC线程

管程:

Monitor 监视器(就是平常说的锁)

是一种同步机制,保证同一时间内,只有一个线程访问被保护的数据或者代码

jvm同步基于进入和退出,使用管程对象实现的

用户线程和守护线程

用户线程:自定义线程(new Thread())

守护线程:后台中一种特殊的线程,守护线程的运行依赖于用户线程。当所有的用户线程都终止时,如果还存在守护线程,则守护线程会自动终止。比如垃圾回收。

public static void main(String[] args) {
    Thread thread = new Thread(Main::println);
    println();
    thread.start();
}

private static void println() {
    System.out.println(Thread.currentThread().getName() +
            " is daemon: " + Thread.currentThread().isDaemon());
}

// 打印
main is daemon: false
Thread-0 is daemon: false
// 存在用户线程,JVM存活
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
    Thread thread = new Thread(Main::println);
    // 设置为守护线程
    thread.setDaemon(true);
    println();
    thread.start();
}

private static void println() {
    System.out.println(Thread.currentThread().getName() +
            " is daemon: " + Thread.currentThread().isDaemon());
}

// 打印
main is daemon: false
Thread-0 is daemon: true
// 无守护线程,JVM结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 并发并行

并发:多线程同时访问一个资源(资源共享)

秒杀、抢票

并行:多线程并行执行再汇总

异步获取信息组装视图

# Java线程的几种状态

Thread.State

 public enum State {
        NEW,//新建     
        RUNNABLE,//准备就绪       
        BLOCKED, //阻塞        
        WAITING,//一直等待
        TIMED_WAITING,//超时等待,过时不候
        TERMINATED;//终止
    }
1
2
3
4
5
6
7
8

# ※ JAVA开启多线程的方式

误区:Java本身是不能开启线程的,底层调用的是start0()方法,是native方法,由C++实现

public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
}
// native方法
private native void start0();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

java开启线程的几种方式

  1. 继承Thread类

    class MyThread extends Thread {
                @Override
                public void run() {
                    System.out.println("MyThread run");
                }
            }
    
    1
    2
    3
    4
    5
    6
  2. 实现Runnable接口

    class MyRunnable implements Runnable {
                @Override
                public void run() {
                    System.out.println("MyRunnable run");
                }
            }
    
    1
    2
    3
    4
    5
    6
  3. 实现Callable接口

    class MyCallable implements Callable<String> {
                @Override
                public String call() throws Exception {
                    System.out.println("MyCallable call");
                    return "MyCallable call";
                }
            }
    
    1
    2
    3
    4
    5
    6
    7

    Runnable接口和Callable接口区别:

    1. 是否有返回值:Runnable无返回值,Callable有返回值
    2. 是否抛出异常:Runnable不可以抛出异常,Callable可以抛出异常
    3. 实现方法名称不同,Runnable接口是run方法,Callable接口是call方法
  4. 直接调用start()方法

    new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Runnable run");
                }
            }).start();
    
    1
    2
    3
    4
    5
    6
  5. 线程池

    ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    1, // 核心线程数
                    1,// 最大线程数
                    0L,// 线程空闲时间
                    TimeUnit.MILLISECONDS, // 时间单位
                    new LinkedBlockingQueue<>(), // 任务队列
                    new ThreadPoolExecutor.DiscardPolicy() // 线程池拒绝策略
            );
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Runnable run");
                }
            });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

# ※ 锁

Java中的锁:Lock锁、synchronized同步代码块

**经典案例:**窗口卖票

超卖示例

public class TicketSaleExample {

    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                ticket.sale();
            }
        }, "售票员1").start();
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                ticket.sale();
            }
        }, "售票员2").start();
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                ticket.sale();
            }
        }, "售票员3").start();
    }

    public static class Ticket {

        private int count = 100;

        public void sale() {
            if (count > 0) {
                try {
                    // 睡10ms提高超卖的概率
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                count--;
                System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩" + count + "张");
            }
        }

    }
}

// 打印
...
售票员2卖出一张票,还剩5张
售票员1卖出一张票,还剩4张
售票员3卖出一张票,还剩3张
售票员2卖出一张票,还剩2张
售票员1卖出一张票,还剩1张
售票员3卖出一张票,还剩0张
售票员2卖出一张票,还剩-1张
售票员1卖出一张票,还剩-2张
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# synchronized

synchronized关键字解决的是多个线程之间访问资源的同步性,synchronized 翻译为中文的意思是同步,也称之为同步锁。 synchronized的作用是保证在同一时刻, 被修饰的代码块或方法只会有一个线程执行,以达到保证并发安全的效果。

特性:

  • 原子性:synchronized保证语句块内操作是原子的

    同步方法: ACC_SYNCHRONIZED 这是一个同步标识,对应的 16 进制值是 0x0020 这 10 个线程进入这个方法时,都会判断是否有此标识,然后开始竞争 Monitor 对象。

    同步代码:  monitorenter,在判断拥有同步标识 ACC_SYNCHRONIZED 抢先进入此方法的线程会优先拥有 Monitor 的 owner ,此时计数器 +1。  monitorexit,当执行完退出后,计数器 -1,归 0 后被其他进入的线程获得。

  • 可见性:synchronized保证可见性(通过“在执行unlock之前,必须先把此变量同步回主内存”实现)

    那么为什么添加 synchronized 也能保证变量的可见性呢?

    线程解锁前,必须把共享变量的最新值刷新到主内存中。 线程加锁前,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值。 volatile 的可见性都是通过内存屏障(Memnory Barrier)来实现的。 synchronized 靠操作系统内核的Mutex Lock(互斥锁)实现,相当于 JMM 中的 lock、unlock。退出代码块时刷新变量到主内存。

  • 有序性:synchronized保证有序性(通过“一个变量在同一时刻只允许一条线程对其进行lock操作”)

    as-if-serial,保证不管编译器和处理器为了性能优化会如何进行指令重排序,都需要保证单线程下的运行结果的正确性。也就是常说的:如果在本线程内观察,所有的操作都是有序的;

    如果在一个线程观察另一个线程,所有的操作都是无序的。

    为什么,synchronized 也有可见性的特点,还需要 volatile 关键字? 因为synchronized 的有序性,不是 volatile 的防止指令重排序。那如果不加 volatile 关键字可能导致的结果,就是第一个线程在初始化初始化对象,设置 instance 指向内存地址时。第二个线程进入时,有指令重排。在判断 if (instance == null) 时就会有出错的可能,因为这会可能 instance 可能还没有初始化成功。

  • 重入性:synchronized 是可重入锁,也就是说,允许一个线程二次请求自己持有对象锁

synchronized解决超卖问题

public static class Ticket {

        private int count = 30;
		
    	// 加上synchronized同步锁
        public synchronized void sale() {
            if (count > 0) {
                try {
                    // 睡10ms提高超卖的可能性
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                count--;
                System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩" + count + "张");
            }
        }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# Lock

Lock 实现提供了比使用 synchronized 方法和语句可以获得的更广泛的锁定操作。它们允许更灵活的结构,可能具有完全不同的属性,并且可能支持多个关联的 Condition (opens new window)对象。

Lock接口 说明
ReentrantLock (opens new window) 可重入锁(常用)
ReentrantReadWriteLock.ReadLock (opens new window) 读锁
ReentrantReadWriteLock.WriteLock (opens new window) 写锁

ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    // 可选公平锁、非公平锁
    sync = fair ? new FairSync() : new NonfairSync();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

公平锁:十分公平,先来后到(阳光普照,效率相对低一些)

非公平锁:十分不公平,可以插队(默认,可能会造成线程饿死,但是效率高)

解决超卖问题示例

public static class Ticket {

        private int count = 30;

        private Lock lock = new ReentrantLock();

        public void sale() {
            lock.lock();
            try {
                if (count > 0) {
                    try {
                        // 睡10ms提高超卖的可能性
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩" + count + "张");
                }
            } finally {
                lock.unlock();
            }
        }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

扩展读写锁 ReentrantReadWriteLock

ReadLock写锁:独占锁

ReadLock读锁:共享锁

一个资源可以被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读是共享的

  • 读读 √
  • 读写 x
  • 写写 x

未加锁:

public class ReadWriteLockExample {

    public static void main(String[] args) throws InterruptedException {
        Cache cache = new Cache();
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new Thread(() -> {
                // 放入元素
                cache.put("key" + finalI, "value" + finalI);
            }).start();
        }
        Thread.sleep(50); // 防止读线程走在写线程前面
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new Thread(() -> {
                // 获取元素
                cache.get("key" + finalI);
            }).start();
        }
    }

    static class Cache {
        private final Map<String, Object> cache = new HashMap<>();

        public void get(String key) {
            println("get " + key + " " + cache.get(key));
        }

        public void put(String key, Object value) {
            try {
                println("write... " + key);
                // 睡1s模拟写入时间
                Thread.sleep(1000);
                cache.put(key, value);
                println("put " + key + " " + value);
            } catch (Exception ignore) {} 
        }

        public void println(String msg) {
            System.out.println(Thread.currentThread().getName() + " " + msg);
        }
    }

}

// 打印
Thread-2 write... key2
Thread-1 write... key1
Thread-0 write... key0
Thread-3 get key0 null
Thread-4 get key1 null
Thread-5 get key2 null
Thread-1 put key1 value1
Thread-2 put key2 value2
Thread-0 put key0 value0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

未加锁,写线程还未写入数据时,读写线程读取数据 = null

加锁后:

public class ReadWriteLockExample {

    public static void main(String[] args) throws InterruptedException {
        Cache cache = new Cache();
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new Thread(() -> {
                // 放入元素
                cache.put("key" + finalI, "value" + finalI);
            }).start();
        }
        Thread.sleep(50); // 防止读线程走在写线程前面
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new Thread(() -> {
                // 获取元素
                cache.get("key" + finalI);
            }).start();
        }
    }

    static class Cache {
        private final Map<String, Object> cache = new HashMap<>();
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

        public void get(String key) {
            lock.readLock().lock();
            try {
                println("get " + key + " " + cache.get(key));
            } finally {
                lock.readLock().unlock();
            }
        }

        public void put(String key, Object value) {
            lock.writeLock().lock();
            try {
                println("write... " + key);
                Thread.sleep(1000);
                cache.put(key, value);
                println("put " + key + " " + value);
            } catch (Exception ignore) {} finally {
                lock.writeLock().unlock();
            }
        }

        public void println(String msg) {
            System.out.println(Thread.currentThread().getName() + " " + msg);
        }
    }

}

// 打印
Thread-0 write... key0
Thread-0 put key0 value0
Thread-2 write... key2
Thread-2 put key2 value2
Thread-1 write... key1
Thread-1 put key1 value1
Thread-4 get key1 value1
Thread-3 get key0 value0
Thread-5 get key2 value2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

写线程先执行获取写锁,其余写线程被阻塞,读线程获取到读锁,读写锁互斥也被阻塞,直至所有写锁释放,读线程才执行。

# Lock和synchronized的区别

  1. synchronized 是内置的Java关键字,Lock是Java的一个接口

  2. synchronized 无法判断获取锁的状态,Lock可以判断是否获取到了锁

  3. synchronized 会自动释放锁,Lock必须手动释放锁!如果不释放锁,会造成死锁

  4. synchronized 线程一在获得锁的情况下阻塞了,第二个线程就只能傻傻的等着;Lock就不一定会等待下去

  5. synchronized 可重入锁,不可以中断,非公平;Lock,可重入锁,可以判断锁,非公平/公平(可以自己设置,默认非公平锁)

  6. synchronized 适合锁少量的同步代码;Lock适合锁大量同步代码!

  7. Lock可以提高多个线程进行读操作的效率。

如何取舍?

JDK5的时候,Lock刚推出,性能是比synchronized更加强大的,所以JDK5的时候是推荐使用lock锁。

JDK6的时候JVM的开发者以及他的团队发现Java中很多源码使用的还是synchronized,无法替换,所以在JDK6的时候又对synchronized进行了升级,当需求对锁性能要求不高的时候,会使用synchronized的一些轻量级锁,当性能开销太大的时候则会进行锁升级,使用一些重量级锁。

JDK8后,已经是可以考虑根据使用习惯和特性来选择这两个锁了,性能上差异并不明显。

# 几种锁的概念

  • **公平锁:**非常公平,不能插队,必须先来后到,效率低。
  • **非公平锁:**非常不公平,可以插队,效率高。
  • **可重入锁:**持锁线程可重新获取自己持有的锁,拿到外边的锁后,会自动拿到里面的锁,synchronized和Lock都是可重入锁。
  • 自旋锁:CAS,修改时判断是否为目标期望值,否则一直重试
  • 死锁:资源竞争,不同线程各自持有其他线程所请求的锁

产生死锁的4个必要条件:

  1. 互斥条件:线程要求对所分配的资源进行排他性控制,即在一段时间内某资源仅为一个线程所占用。
  2. 请求和保持条件:当线程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 不剥夺条件:线程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
  4. 环路等待条件:在发生死锁时,必然存在一个线程--资源的环形链。

Java中产生死锁的3个条件:

  1. 多个线程
  2. 多把锁
  3. 多个同步代码块嵌套

# JMM


Java Memory Model: 简称JMM, Java内存模型

JMM是一套规范,描述了Java程序中线程共享变量的访问规则。

主内存:

  • 主内存是所有线程都共享的,都能访问的。所有的共享变量都存储于主内存。
  • 这里所说的变量指的是成员变量和静态成员变量。

工作内存:

  • 每一个线程有自己的工作内存,线程的工作内存只存储该线程对共享变量的副本。
  • 线程对变量的所有的操作(读,取)都必须在工作内存中完成,
  • 而不能直接读写主内存中的变量

关于JMM的一些同步的约定:

线程中分为 工作内存、主内存

  1. 线程解锁前,必须把共享变量立刻刷回主存;
  2. 线程加锁前,必须读取主存中的最新值到工作内存中;
  3. 加锁和解锁是同一把锁

image.png 不同线程对共享变量的读写流程:

  1. 获取共享变量副本
  2. 在工作内存中修改副本
  3. 返回主内存

# volatile

线程安全要考虑三个方面

  • 可见性:一个线程对共享变量的修改,另一个线程能够看到最新的结果
  • 有序性:一个线程内代码(CPU指令)按编写顺序执行
  • 原子性:一个线程内多行代码以一个整体运行,期间不能有其他线程的代码插队

volatile 能够保证共享变量的可见性与有序性,但并不能保证原子性

有序性深入

有序性指的是CPU指令的有序性,也就是防止CPU指令的重排。

volatile有一个变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;以及以下这个案例:

/*
	未使用volatile保护的DLC懒汉式单例
*/
public class Singleton4 {
    private Singleton4() {}
    private static Singleton4 instance;
    public static Singleton4 getInstance() { 
        if (instance == null) {
            synchronized(Singleton4.class){
                if(instance == null){
                	instance = new VideoPlayer();
                }
            }
        }
        return instance;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

首先需要知道instance = new Singleton4();这样一句代码在CPU中其实分为几个步骤:

  1. 创建对象(分配内存空间)
  2. 调用构造方法(初始化成员变量)
  3. 给静态变量赋值

然后CPU会对我们的代码执行顺序进行一个优化,对于没有因果先后关系的代码,可能会改变执行顺序,比如在这段代码中,调用构造方法和静态变量赋值就没有因果关系,这两个的执行顺序就有可能被颠倒。因此在多线程的环境下就有可能会出现以下的执行顺序:(注:橙蓝代表不同线程)

img

导致多线程的情况下出现实例化线程触发创建的对象还未初始化,这个对象就被返回了的问题,最终拿到的对象属性不完整,从而引发更多奇奇怪怪的错误。

而被volatile修饰的变量就会在该变量的赋值语句之后产生一个内存屏障,规定在这条语句之前的赋值语句不能越过屏障执行,从而保证了线程的有序性,防止CPU在进行创建对象分配内存空间及初始化对象,也就是调用构造方法时出现乱序。

可见性深入

public class Demo4_1 {
    // 加上volatile关键字保证可见性
    public static volatile boolean stop = true;
    public static void main(String[] args) throws InterruptedException {
        // 创建一个线程开启循环
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (stop){
                }
                System.out.println("循环结束...");
            }
        }).start();
		// 主线程尝试结束循环
        Thread.sleep(1000);
		stop = false;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

这种情况下当创建一个新的线程去改变stop的值企图停止循环失败时,是因为JIT视这个循环是**热点(当一段代码重复执行超过一定次数时就会被虚拟机视为热点)于是被JIT(即时编译器,将字节码文件解释成计算机语言,存在缓存中,用于优化代码在JVM中的运行)**进行了优化,导致源码已经被修改保存在了当前线程的工作内存(缓存)里,下次直接读取的是工作内存中优化后的代码,不再读取主内存的源码了,所以就算改写了主内存中的stop值这个循环也读取不到了。

而volatile关键字修饰的变量被修改了,会令其他工作内存的副本失效,下次使用时重新从主内存中读取。

补充:执行synchronized也可以让当前工作空间的副本失效

# 原子类Atomic


介绍

JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。注:Atomic包中有所有基本数据类型的原子操作类,这里只举例一个Integer

AtomicInteger原子类,它里面有一个成员变量保存一个int,并且提供一些方法对int值进行原子操作。

public class AtomicInteger {
    private volatile int value;
}
1
2
3

常用构造

构造器 详解
public AtomicInteger() 初始化一个默认值为0的原子型Integer
public AtomicInteger(int initialValue) 初始化一个指定值的原子型Integer

常用方法

构造器 详解
int get() 获取值
int getAndIncrement() 以原子方式将当前值加1,先取后加。
int incrementAndGet() 以原子方式将当前值加1,先加后取。
int addAndGet(int data) 以原子方式将当前值增量加,先加后取。
int getAndSet(int value) 以原子方式将当前值增量加,先取后加。

使用原子类解决原子性问题

原子性深入

public class Testcode {

    public static void main(String[] args) {
        new MyThread().start();
        new MyThread().start();
    }

}

class MyThread extends Thread{
    private static int count = 0;

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
        System.out.println(count);
    }
}

结果:
11799
15598
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

发现结果无法达到20000,原因是在一个时刻,两个线程同时读取到共享内存中的变量,假设同时读到100的副本,在线程A中进行了自加,然后返回主内存,此时主内存是101,而线程B是和线程A同时读到的100,所以在线程B的工作内存进行的也是100+1=101,然后返回101到主内存,此时两个线程的返回值是一致的,出现了数据覆盖/操作丢失的问题。

public class Testcode {

    public static void main(String[] args) {
        new MyThread().start();
        new MyThread().start();
    }

}

class MyThread extends Thread{
    private static AtomicInteger count = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            count.getAndIncrement();
        }
        System.out.println(count);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

AtomicInteger使用CAS保证了原子性,在获取共享变量时会保存一个旧值,将修改的数据返回主内存前会再获取一次共享变量的值,若发现此时共享变量的值与旧值不相等,则认为共享变量被其他线程修改了,然后重新进行以上操作,直到新值与旧值相等,才会进行一个主内存的返回。

**补充:**synchronized也能保证原子性

CAS


介绍

CAS全称是Compare And Swap(比较并交换),AtomicInteger使用了CAS保证原子性,也是乐观锁的代表。

  • CAS有3个值,旧的预估值,最新值,要修改的值,旧的预估值和新的值相同,就改成要修改的值
  • 其核心思想是【无需加锁,每次只有一个线程能成功修改共享变量,其它失败的线程不需要停止,不断重试直至成功】
  • 由于线程一直运行,不需要阻塞,因此不涉及线程上下文切换
  • 需要多核cpu支持,且线程数不应超过cpu核数

原子性

**概念:**一个线程内多行代码以一个整体运行,期间不能有其他线程的代码插队

# synchronized与CAS比较


共同点

  • 在多线程情况下,都可以保证修改共享数据的原子性。

不同点

  • synchronized总是从最坏的角度出发,每次获取数据的时候,都认为别人有可能修改数据。所以在每次操作共享数据之前,都会上锁。(悲观锁)
  • CAS是从乐观的角度出发,假设每次获取数据别人都不会修改,所以不会上锁。只不过在修改共享数据的时候,会检查一下,别人有没有修改过这个数据。如果别人没有修改过,那么直接修改共享数据的值。如果别人修改过,那么再循环一次。 (乐观锁)

# ※ 线程通信

# wait和sleep

Object方法

修饰符 方法 描述
final void notifyAll() 唤醒在此对象的监视器上等待的所有线程。
final void notify() 唤醒在此对象的监视器上等待的单个线程。
final void wait() 导致当前线程等待直到被唤醒,通常是 notified 或 interrupted 。
final void wait(long timeoutMillis) 导致当前线程等待直到它被唤醒,通常是被 notified 或 interrupted 唤醒,或者直到经过一定的实时时间。
final void wait(long timeoutMillis, int nanos) 导致当前线程等待直到它被唤醒,通常是被 notified 或 interrupted 唤醒,或者直到经过一定的实时时间。

简单示例:a/b轮流打印

public class WaitExample {

    /**
     * a/b轮流打印
     */
    public static void main(String[] args) {
        // 锁
        Object lock = new Object();
        // a线程
        new Thread(() -> loop(lock, "a"), "aThread").start();
        // b线程
        new Thread(() -> loop(lock, "b"), "bThread").start();
    }

    private static void loop(Object lock, String name) {
        while (true) {
            synchronized (lock) {
                try {
                    System.out.println(Thread.currentThread().getName() + " -- " +name);
                    Thread.sleep(1000);
                    // 唤醒另一个被阻塞的线程
                    lock.notify();
                    // 当前线程进入阻塞
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

区别

  1. 来自不同的类

    wait => Object,任何对象实例都能调用

    sleep => Thread,Thread的静态方法

  2. 关于锁的释放

    wait会释放锁;

    sleep不会释放锁,它也不需要占用锁

  3. 使用范围不同

    wait:必须在同步代码块中使用

    sleep:可以在任何地方使用

生产者消费者案例(wait版本)

public class PCExample {

    public static void main(String[] args) {
        Source source = new Source();
        // 生产者1
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(500);
                    source.add();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, "生产者1").start();
        // 消费者1
        new Thread(() -> {
            while (true) {
                try {
                    source.sub();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, "消费者1").start();
    }

    public static class Source {

        private final int maxPool = 10;
        private final AtomicInteger source = new AtomicInteger(0);

        private synchronized void add() throws InterruptedException {
            while (source.get() == maxPool) {
                println(":source is full");
                // 等待消费
                wait();
            }
            int count = source.incrementAndGet();
            println(":source +1 now pool = " + count);
            // 唤醒消费者
            notifyAll();
        }

        private synchronized void sub() throws InterruptedException {
            while (source.get() <= 0) {
                println(":source is empty");
                // 等待生产
                wait();
            }
            int count = source.decrementAndGet();
            println(":source -1 now pool = " + count);
            // 唤醒生产者
            notifyAll();
        }

        private void println(String msg) {
            System.out.println(Thread.currentThread().getName() + msg);
        }

    }

}

// 打印
消费者1:source is empty
生产者1:source +1 now pool = 1
消费者1:source -1 now pool = 0
消费者1:source is empty
生产者1:source +1 now pool = 1
消费者1:source -1 now pool = 0
消费者1:source is empty
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

为防止虚假唤醒问题,在 Source.add() 和 Source.sub() 方法中加入while方法块,如果用if代码块判断,在多任务下,有可能会出现source 大于10 或小于 1的情况,甚至出现死锁

# Lock.Condition

**经典案例:**线程顺序执行

public class SequenceThreadExample {

    private static int num = 1;

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition aCondition = lock.newCondition();
        Condition bCondition = lock.newCondition();
        Condition cCondition = lock.newCondition();
        new Thread(() -> {
            while (true) {
                println(1, 2, "A", aCondition, bCondition, lock);
            }
        }, "A").start();
        new Thread(() -> {
            while (true) {
                println(2, 3, "B", bCondition, cCondition, lock);
            }
        }, "B").start();
        new Thread(() -> {
            while (true) {
                println(3, 1, "C", cCondition, aCondition, lock);
            }
        }, "C").start();
    }

    public static void println(int sort, int next, String msg, Condition nowCondition, Condition nextCondition, ReentrantLock lock) {
        lock.lock();
        try {
            while (num != sort) {
                nowCondition.await();
            }
            num = next;
            System.out.println(Thread.currentThread().getName() + "-->" + msg);
            // 唤醒下一个线程
            nextCondition.signal();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

}

// 打印
A-->A
B-->B
C-->C
A-->A
B-->B
C-->C
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# CountdownLatch

java.util.concurrent.CountDownLatch 减量计数器

一种同步辅助工具,允许一个或多个线程等待,直到其他线程中执行的一组操作完成。

CountDownLatch 使用给定的 count 进行初始化。 await (opens new window)方法会阻塞,直到当前计数由于调用 countDown() (opens new window)方法而变为零,之后所有等待的线程都会被释放,并且任何后续的 await (opens new window)调用都会立即返回。这是一种一次性现象——无法重置计数。

构造方法 描述
CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch。
修饰符和类型 方法 描述
void await (opens new window)() 导致当前线程等待,直到锁存器倒计时到零,除非线程是 interrupted (opens new window) 。
boolean await (opens new window)(long timeout, TimeUnit (opens new window) unit) 导致当前线程等待,直到锁存器倒计时为零,除非线程为 interrupted (opens new window) 或指定的等待时间已过。
void countDown (opens new window)() 减少闩锁的计数,如果计数达到零,则释放所有等待的线程。
long getCount (opens new window)() 返回当前计数。
String toString (opens new window)() 返回标识此锁存器及其状态的字符串。
public class CountDownLatchExample {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);
        try {
            for (int i = 0; i < 4; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName() + " is finish");
                    latch.countDown();// -1
                }).start();
            }
            // 阻塞,直至计数归零
            latch.await();
            System.out.println(Thread.currentThread().getName() + " is finish");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

// 打印
Thread-1 is finish
Thread-0 is finish
Thread-3 is finish
main is finish
Thread-2 is finish
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# CyclicBarrier

java.util.concurrent.CyclicBarrier 增量计数器

允许一组线程全部等待彼此到达公共屏障点的同步辅助工具。

构造方法 描述
CyclicBarrier (opens new window)(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,并且在障碍触发时不执行预定义的操作。
CyclicBarrier (opens new window)(int parties, Runnable (opens new window) barrierAction) 创建一个新的 CyclicBarrier,当给定数量的参与方(线程)等待它时,它将触发,并在触发障碍时执行给定的障碍操作,由进入障碍的最后一个线程执行。
修饰符和类型 方法 描述
int await (opens new window)() 等待所有 parties (opens new window) 在此屏障上调用 await。
int await (opens new window)(long timeout, TimeUnit (opens new window) unit) 等待直到所有 parties (opens new window) 在此屏障上调用 await,或者指定的等待时间过去。
int getNumberWaiting (opens new window)() 返回当前在屏障处等待的参与方数量。
int getParties (opens new window)() 返回触发此障碍所需的参与方数量。
boolean isBroken (opens new window)() 查询此屏障是否处于损坏状态。
void reset (opens new window)() 将屏障重置为其初始状态。
public class CyclicBarrierExample {

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(7, () -> System.out.println("召唤神龙!"));
        for (int i = 0; i < 7; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 找到了第" + (finalI + 1) + "颗龙珠");
                    barrier.await();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }

}
// 打印
Thread-5 找到了第6颗龙珠
Thread-0 找到了第1颗龙珠
Thread-1 找到了第2颗龙珠
Thread-6 找到了第7颗龙珠
Thread-3 找到了第4颗龙珠
Thread-4 找到了第5颗龙珠
Thread-2 找到了第3颗龙珠
召唤神龙!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# Semaphore

java.util.concurrent.Semaphore 信号量

计数信号量。从概念上讲,信号量维护一组许可。如果有必要,每个 acquire() (opens new window)都会阻塞,直到获得许可,然后再获取许可。每个 release() (opens new window)添加一个许可,可能会释放一个阻塞的获取者。但是,没有使用实际的许可对象; Semaphore 只是保持可用数量的计数并相应地采取行动。

构造方法 描述
Semaphore (opens new window)(int permits) 创建具有给定数量的许可和非公平公平设置的 Semaphore。
Semaphore (opens new window)(int permits, boolean fair) 使用给定的许可数量和给定的公平设置创建一个 Semaphore。
修饰符和类型 方法 描述
void acquire() 从此信号量获取许可,阻塞直到一个可用,或者线程为 interrupted (opens new window) 。
void acquire(int permits) 从此信号量获取给定数量的许可,阻塞直到所有许可都可用,或者线程为 interrupted (opens new window) 。
boolean tryAcquire() 从该信号量获取许可,前提是在调用时许可可用。
boolean tryAcquire(int permits) 仅当调用时所有许可都可用时,才从此信号量获取给定数量的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 如果在给定的等待时间内所有许可都可用并且当前线程尚未 interrupted (opens new window) ,则从该信号量获取给定数量的许可。
boolean tryAcquire(long timeout, TimeUnit unit) 如果一个在给定的等待时间内变得可用并且当前线程尚未 interrupted (opens new window) ,则从该信号量获取许可。
void release() 释放许可,将其返回给信号量。
void release(int permits) 释放给定数量的许可,将它们返回给信号量。

找车位案例

public class SemaphoreExample {

    public static void main(String[] args) {
        // 三个车位
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    println("进入停车场...waiting");
                    // 找车位
                    semaphore.acquire();// 获取信号量
                    println("停车...");
                    Thread.sleep(new Random().nextInt(1000, 2000));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    println("离开停车场...");
                    semaphore.release();// 释放信号量
                }
            }).start();
        }
    }

    public static void println(String msg) {
        System.out.println(Thread.currentThread().getName() + " " + msg);
    }

}

// 打印
Thread-1 进入停车场...waiting
Thread-0 进入停车场...waiting
Thread-0 停车...
Thread-1 停车...
Thread-2 进入停车场...waiting
Thread-2 停车...
Thread-3 进入停车场...waiting
Thread-4 进入停车场...waiting
Thread-1 离开停车场...
Thread-3 停车...
Thread-0 离开停车场...
Thread-4 停车...
Thread-2 离开停车场...
Thread-3 离开停车场...
Thread-4 离开停车场...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# ※ 线程池

程序运行,本质:占用系统资源!优化资源的使用!=> 池化技术

线程池、连接池、内存池、对象池…

创建、销毁,十分浪费资源。

池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。

线程池的好处

  1. 降低资源的消耗;
  2. 提高响应的速度;
  3. 方便线程管理。

**线程池:**七大参数、七大阻塞队列、四种拒绝策略

# 七大参数

public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
                          int maximumPoolSize, //最大核心线程池大小
                          long keepAliveTime, //超时了没有人调用就会释放
                          TimeUnit unit, //超时单位
                          BlockingQueue<Runnable> workQueue, //阻塞队列
                          ThreadFactory threadFactory, //线程工厂:创建线程,一般不用动
                          RejectedExecutionHandler handler) { //拒绝策略
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 七大阻塞队列

接口 BlockingQueue<E>

实现类:

  1. 数组阻塞队列(ArrayBlockingQueue) :底层基于数组的有界阻塞队列,初始化时需要指定队列大小;
  2. 优先级无界阻塞队列(PriorityBlockingQueue):底层基于数组的无界队列,支持队列内部按照指定元素排序;
  3. 延迟无界阻塞队列(DelayQueue):底层是基于数组的无界延迟队列,它是在PriorityQueue基础上实现的,先按延迟优先级排序,延迟时间短的排在队列前面;
  4. 链表阻塞队列(LinkedBlockingQueue) :以链表来存储元素,理论上只要存储空间够大,就是无界的;
  5. 链表阻塞双端队列(LinkedBlockingDeque):底层基于链表的有界双端阻塞队列;
  6. 链表阻塞队列与同步阻塞队列结合(LinkedTransferQueue):基于链表的无界阻塞队列;
  7. 同步阻塞队列(SynchronousQueue):队列中不存储元素,队列中放入元素后,只有该元素被消费完成,才能重修放入元素;

BlockingQueue<E> 方法总结

操作类型 抛出异常 阻塞 有返回值 超时返回
插入 add(e) (opens new window) put(e) (opens new window) offer(e) (opens new window) offer(e, time, unit) (opens new window)
消除 remove() (opens new window) take() (opens new window) poll() (opens new window) poll(time, unit) (opens new window)
检查 element() (opens new window) 不适用 peek() (opens new window) 不适用

分别阐述一下上述三组api

  • 抛出异常:指的是当存入元素时队列已满、取出元素时队列中为空时,均会抛出异常;
  • 阻塞线程:指的是当存入元素时队列已满、取出元素时队列中为空时,线程会一直等待,直至获取到数据为止;
  • 有返回值:指的是当存入元素或取出元素时,会返回特定的值(offer成功时会返回true,失败会返回false;poll成功时会返回队列中元素,失败时会返回null);
  • 超时返回:指的是在指定时间内未能存入或取出元素时,会返回一个指定值(offer时会返回false,poll时会返回null);

# 四种拒绝策略

**官方说明:**在方法 execute(Runnable) (opens new window)中提交的新任务将是 rejected,并且当 Executor 对最大线程和工作队列容量使用有限边界并且饱和时。在任何一种情况下,execute 方法都会调用其 RejectedExecutionHandler (opens new window)的 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) (opens new window)方法。

提供了四种预定义的处理程序策略:

  1. 在默认的 ThreadPoolExecutor.AbortPolicy (opens new window)中,处理程序在拒绝时抛出运行时 RejectedExecutionException (opens new window)。
  2. 在 ThreadPoolExecutor.CallerRunsPolicy (opens new window)中,调用 execute 的线程本身运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。
  3. 在 ThreadPoolExecutor.DiscardPolicy (opens new window)中,无法执行的任务会被简单地丢弃。此策略仅适用于从不依赖任务完成的极少数情况。
  4. 在 ThreadPoolExecutor.DiscardOldestPolicy (opens new window)中,如果执行器未关闭,工作队列头部的任务将被丢弃,然后重试执行(可能会再次失败,导致重复。)这种策略很少被接受。在几乎所有情况下,您还应该取消任务以在任何等待其完成的组件中导致异常,和/或记录失败,如 ThreadPoolExecutor.DiscardOldestPolicy (opens new window)文档中所示。

人话:

策略 说明
new ThreadPoolExecutor.AbortPolicy() 丢弃任务,抛出异常【默认策略】
new ThreadPoolExecutor.CallerRunsPolicy() 还给提交任务的线程去执行(推荐)
new ThreadPoolExecutor.DiscardPolicy() 丢弃任务,不抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy() 丢弃排最前的任务,尝试处理新任务,不抛异常

调优

什么是CPU密集型和IO密集型服务器? CPU密集型服务器是指需要大量的CPU处理能力来完成任务的服务器。这些服务器通常会运行计算密集型应用程序,例如数学计算、编码解码、3D建模等。这些应用程序需要大量的CPU计算能力来完成任务,因此CPU利用率较高。

CPU密集型最大线程数计算公式:核心数 + 1

​ IO密集型服务器是指需要大量的磁盘I/O或网络I/O操作来完成任务的服务器。这些服务器通常会运行文件服务器、数据库服务器、Web服务器等应用程序。这些应用程序需要大量的磁盘I/O和网络I/O操作,而不需要太多的CPU计算能力。

IO密集型最大线程数计算公式:核心数 * 2

如何区分? 1.CPU密集型服务器通常具有高CPU利用率,而IO密集型服务器通常具有低CPU利用率。如果服务器的CPU利用率非常高(例如,超过80%),则它可能是CPU密集型的。相反,如果CPU利用率较低,但磁盘I/O利用率较高,则服务器可能是IO密集型的。

2.如果服务器处理大量的数据库查询、文件读写、网络通信等,它可能是IO密集型的。这些操作需要大量的磁盘I/O和网络I/O,而不需要太多的CPU计算能力。相反,如果服务器运行大量的CPU密集型应用程序,例如视频编码、图像处理、数学计算等,它可能是CPU密集型的。

public class ThreadPoolExample {

    public static void main(String[] args) {
        // 什么是CPU密集型和IO密集型服务器?
        // CPU密集型服务器是指需要大量的CPU处理能力来完成任务的服务器。这些服务器通常会运行计算密集型应用程序,例如数学计算、编码解码、3D建模等。这些应用程序需要大量的CPU计算能力来完成任务,因此CPU利用率较高。
        // IO密集型服务器是指需要大量的磁盘I/O或网络I/O操作来完成任务的服务器。这些服务器通常会运行文件服务器、数据库服务器、Web服务器等应用程序。这些应用程序需要大量的磁盘I/O和网络I/O操作,而不需要太多的CPU计算
        // 如何区分?
        // 1.CPU密集型服务器通常具有高CPU利用率,而IO密集型服务器通常具有低CPU利用率。如果服务器的CPU利用率非常高(例如,超过80%),则它可能是CPU密集型的。相反,如果CPU利用率较低,但磁盘I/O利用率较高,则服务器可能是IO密集型的。
        // 2.如果服务器处理大量的数据库查询、文件读写、网络通信等,它可能是IO密集型的。这些操作需要大量的磁盘I/O和网络I/O,而不需要太多的CPU计算能力。相反,如果服务器运行大量的CPU密集型应用程序,例如视频编码、图像处理、数学计算等,它可能是CPU密集型的。
        // 最大核心数
        // CPU密集型 = CPU核心数 + 1
        // IO密集型 = CPU核心数 * 2
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1,
                Runtime.getRuntime().availableProcessors() + 1,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                new ThreadPoolExecutor.CallerRunsPolicy());
        try {
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                pool.execute(() -> System.out.println(Thread.currentThread().getName() + " " + finalI));
            }
        } finally {
            // 执行shutdown,将会拒绝新任务提交到线程池;
            // 待执行的任务不会取消,正在执行的任务也不会取消,将会继续执行直到结束
            pool.shutdown();
            try {
                // 超时等待线程池完毕
                if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
                    pool.shutdownNow();
                    if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
                        System.out.println("线程池未能正常关闭,等待GC回收");
                    }
                }
            } catch (InterruptedException e) {
                pool.shutdownNow();
                System.out.println("关闭线程池捕获异常");
            }
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# ForkJoin

ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之,就是所谓的分治法,就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。

Java并发史:

  • Java 1 支持thread,synchronized。
  • Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
  • Java 7 加入了fork-join库。
  • Java 8 加入了 parallel streams。

# 分治法

步骤:

  1. 分割原问题;
  2. 求解子问题;
  3. 合并子问题的解为原问题的解。

逻辑示例

if(任务很小){
    直接计算得到结果
}else{
    分拆成N个子任务
    调用子任务的fork()进行计算
    调用子任务的join()合并计算结果
}
1
2
3
4
5
6
7

在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。

典型应用

  • 二分搜索
  • 大整数乘法
  • Strassen矩阵乘法
  • 棋盘覆盖
  • 合并排序
  • 快速排序
  • 线性时间选择
  • 汉诺塔

# 工作窃取算法

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点: 充分利用线程进行并行计算,并减少了线程间的竞争。

工作窃取算法的缺点: 在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

# ForkJoin框架

Java 1.7 引入了一种新的并发框架—— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。

ForkJoin框架的本质是一个用于并行执行任务的框架, 能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是要替换ThreadPool。

ThreadPoolExecutor和ForkJoinPool的使用场景差异

使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然不合适。

所以在处理少量、或复杂任务时,建议使用ThreadPoolExecutor处理,而面对量大简单的任务,推荐使用ForkJoinPool。

1.ForkJoinPool类

实现了ForkJoin框架中的线程池。ForkJoinPool中提供了如下提交任务的方法:

public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
1
2
3
4
5
6
7
8

可以使用Executors.newWorkStealPool()方法创建ForkJoinPool

2.ForkJoinWorkerThread类

实现ForkJoin框架中的线程。

3.ForkJoinTask<V>类

ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。

ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。

fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。

ForkJoinTask有三个子类

  • RecursiveAction:无返回值的任务,实现Runnable。
  • RecursiveTask:有返回值的任务,实现Callable。
  • CountedCompleter:完成任务后将触发执行一个自定义的钩子函数。

数字累加示例

public class SumForkJoinTask extends RecursiveTask<Long> {

    // 子任务拆分阈值
    private final int threshold;
    private final long start;
    private final long end;

    public SumForkJoinTask(long start, long end, int threshold) {
        this.threshold = threshold;
        this.start = start;
        this.end = end;
    }


    /**
     * The main computation performed by this task.
     *
     * @return sum
     */
    @Override
    protected Long compute() {
        if (end - start <= threshold) { // 如果任务足够小,直接计算结果
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else { // 否则,将任务拆分成更小的子任务
            long mid = (start + end) / 2;
            SumForkJoinTask leftTask = new SumForkJoinTask(start, mid, threshold);
            SumForkJoinTask rightTask = new SumForkJoinTask(mid + 1, end, threshold);

            // 并行执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待子任务完成,并获取结果
            long leftResult = leftTask.join();
            long rightResult = rightTask.join();

            // 合并子任务的结果
            return leftResult + rightResult;
        }
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 累加1-10亿
        long maxValue = 10_0000_0000L;

        // 普通for循环
        long start1 = System.currentTimeMillis();
        long sum1 = 0;
        for (int i = 0; i <= maxValue; i++) {
            sum1 += i;
        }
        System.out.println("普通for: " + (System.currentTimeMillis() - start1) + "ms");
        //System.out.println("普通for: " + sum1);

        // stream流
        long start3 = System.currentTimeMillis();
        //Stream并行流计算 []
        long sum3 = LongStream.range(1, maxValue + 1).parallel().reduce(0L, Long::sum);
        System.out.println("stream流: " + (System.currentTimeMillis() - start3) + "ms");
        //System.out.println("stream流: " + sum3);

        // ForkJoin
        long start2 = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        SumForkJoinTask task = new SumForkJoinTask(1, maxValue, 1_0000_0000);
        // 提交任务
        Long sum2 = forkJoinPool.invoke(task);
        System.out.println("ForkJoin: " + (System.currentTimeMillis() - start2) + "ms");
        //System.out.println("ForkJoin: " + sum2);
    }

}

// 打印
普通for: 239ms
stream流: 67ms
ForkJoin: 52ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

※注意:子任务拆分阈值 threshold 的选值会影响执行效率,根据不同CPU配置需要进行调优尝试

乱序数组排序示例

public class QuickSortForkJoinTask extends RecursiveAction {
    private int threshold;
    private int[] array;
    private int start;
    private int end;

    public QuickSortForkJoinTask() {
    }

    public QuickSortForkJoinTask(int[] array, int start, int end, int threshold) {
        this.threshold = threshold;
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= threshold) {
            // 使用快速排序算法对小数组进行排序
            quickSort(array, start, end);
        } else {
            // 将大数组拆分为两个子任务
            int pivotIndex = partition(array, start, end);
            QuickSortForkJoinTask leftTask = new QuickSortForkJoinTask(array, start, pivotIndex - 1, threshold);
            QuickSortForkJoinTask rightTask = new QuickSortForkJoinTask(array, pivotIndex + 1, end, threshold);

            // 并行执行子任务
            invokeAll(leftTask, rightTask);
        }
    }

    /**
     * 交换数组中的两个元素
     * @param array 数组
     * @param i 索引 i
     * @param j 索引 j
     */
    private void swap(int[] array, int i, int j) {
        int temp = array[i];
        array[i] = array[j];
        array[j] = temp;
    }

    /**
     * 划分数组
     * 1.选取数组最左端元素作为基准数,初始化两个指针 i 和 j 分别指向数组的两端。
     * 2.设置一个循环,在每轮中使用 i(j)分别寻找第一个比基准数大(小)的元素,然后交换这两个元素。
     * 3.循环执行步骤 2. ,直到 i 和 j *相遇时*停止,最后将基准数交换至两个子数组的分界线。
     * @param nums 数组
     * @param left 左边界
     * @param right 右边界
     * @return 基准数的索引
     */
    int partition(int[] nums, int left, int right) {
        // 以 nums[left] 为基准数
        int i = left, j = right;
        while (i < j) {
            // 从右向左找首个小于基准数的元素
            while (i < j && nums[j] >= nums[left]) {
                j--;
            }
            // 从左向右找首个大于基准数的元素
            while (i < j && nums[i] <= nums[left]) {
                i++;
            }
            swap(nums, i, j); // 交换这两个元素
        }
        // 将基准数交换至两子数组的分界线
        swap(nums, i, left);
        // 返回基准数的索引
        return i;
    }

    /**
     * 快速排序算法
     * @param array 数组
     * @param start 数组起始索引
     * @param end 数组结束索引
     */
    private void quickSort(int[] array, int start, int end) {
        if (start >= end) {
            return;
        }
        int pivotIndex = partition(array, start, end);
        // 排序基准数左边的子数组
        quickSort(array, start, pivotIndex - 1);
        // 排序基准数右边的子数组
        quickSort(array, pivotIndex + 1, end);
    }

    public static void main(String[] args) {
        // 100w随机数组排序
        int[] array1 = generateRandomArray(1_00_0000);
        QuickSortForkJoinTask quickSort = new QuickSortForkJoinTask();
        long start1 = System.currentTimeMillis();
        quickSort.quickSort(array1, 0, array1.length - 1);
        System.out.println("QuickSort: " + (System.currentTimeMillis() - start1) + "ms");

        int[] array2 = generateRandomArray(1_00_0000);
        //System.out.println("Before sorting:");
        //printArray(array2);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        QuickSortForkJoinTask task = new QuickSortForkJoinTask(array2, 0, array2.length - 1, 12_7500);
        long start2 = System.currentTimeMillis();
        forkJoinPool.invoke(task);
        System.out.println("ForkJoin: " + (System.currentTimeMillis() - start2) + "ms");
        //System.out.println("After sorting:");
        //printArray(array2);

        int[] array3 = generateRandomArray(1_00_0000);
        long start3 = System.currentTimeMillis();
        int[] array4 = Arrays.stream(array3).parallel().sorted().toArray();
        System.out.println("Stream: " + (System.currentTimeMillis() - start3) + "ms");
    }

    private static void printArray(int[] array) {
        System.out.println(Arrays.toString(array));
    }

    /**
     * 生成一个随机数组
     * @param size 数组大小
     * @return 随机数组
     */
    private static int[] generateRandomArray(int size) {
        return ThreadLocalRandom.current().ints(1, size + 1).distinct().limit(size).toArray();
    }

}

// 打印
QuickSort: 62ms
ForkJoin: 16ms
Stream: 274ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

快排就是最典型的分治策略应用,与forkjoin的适配度很高。

为什么这里stream的会比单线程快排慢?首先是因为stream内部使用的排序算法不确定,只能做个参考,以及他的子任务拆分阈值未知,我测试了一下当数组容量变为1kw的时候是要比原生快排更快的。

#java#并发#多线程
上次更新: 2023/12/29 11:32:56
函数式接口
反射

← 函数式接口 反射→

最近更新
01
JVM 底层
09-13
02
JVM 理论
09-13
03
JVM 应用
09-13
更多文章>
Theme by Vdoing | Copyright © 2022-2024 kinoko | MIT License | 粤ICP备2024165634号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式