[隐藏]
1.线程池
并发是伴随着多核处理器的诞生而产生的,为了充分 利用硬件资源,诞生了多线程技术。但是多线程又存在资源竞争的问题,引发了同步和互斥的问题,JDK 1.5推出的java.util.concurrent(并发工具包)来解决 这些问题。
1.1.new Thread的弊端
- new Thread()新建对象,性能差
- 线程缺乏统一管理,可能无限制的新建线程,相互竞争,严重时会占用过多系统资源导致死机或OOM
1.2.ThreadPool — 线程池
- 重用存在的线程,减少新建对象、消亡的开销
- 线程总数可控,提高资源的利用率
- 避免过多资源竞争,避免阻塞
- 提供额外功能,定时执行、定期执行、监控等
1.3.线程池的种类
在java.util.concurrent中,提供了工具类Executors(调度器)对象来创建线程池,可创建的线程池有四种:
- CachedThreadPool – 可缓存线程池
- FixedThreadPool – 定长线程池
- SingleThreadExecutor – 单线程池
- ScheduledThreadPool – 调度线程池
可缓存线程池示例:
public class ThreadPoolSample1 { public static void main(String[] args) { //调度器对象 //ExecutorService用于管理线程池 ExecutorService threadPool = Executors.newCachedThreadPool();//创建一个可缓存线程池 //可缓存线程池的特点是,无限大,如果线程池中没有可用的线程则创建,有空闲线程则利用起来 for(int i = 1 ; i <= 1000 ; i++) { final int index = i; threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + index); } }); } try { Thread.sleep(1000); //跟线程足够的运行时间 } catch (InterruptedException e) { e.printStackTrace(); } //shutdown() 代表关闭线程池(等待所有线程完成) //shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用 threadPool.shutdown(); } }
定长线程池示例:
public class ThreadPoolSample2 { public static void main(String[] args) { //调度器对象 //ExecutorService用于管理线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10);//创建一个可创建一个定长线程池 //定长线程池的特点是固定线程总数,空间线程用于执行任务,如果线程都在使用后续任务则处于等待状态,在线程池中的线程 //如果任务处于等待的状态,备选的等待算法默认为FIFO(先进先出) LIFO(后进先出) //执行任务后再执行后续的任务。 for(int i = 1 ; i <= 1000 ; i++) { final int index = i; threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + index); } }); } try { Thread.sleep(1000); //跟线程足够的运行时间 } catch (InterruptedException e) { e.printStackTrace(); } //shutdown() 代表关闭线程池(等待所有线程完成) //shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用 threadPool.shutdown(); } }
单线程池示例:通常用来当守护线程,企业中一切线程往线程池上靠,是最简单无脑的选择
public class ThreadPoolSample3 { public static void main(String[] args) { //调度器对象 //ExecutorService用于管理线程池 ExecutorService threadPool = Executors.newSingleThreadExecutor();//单线程线程池 for(int i = 1 ; i <= 1000 ; i++) { final int index = i; threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + index); } }); } try { Thread.sleep(1000); //跟线程足够的运行时间 } catch (InterruptedException e) { e.printStackTrace(); } //shutdown() 代表关闭线程池(等待所有线程完成) //shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用 threadPool.shutdown(); } }
调度线程池示例:
public class ThreadPoolSample4 { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);//可调度线程池 /*//延迟三秒执行一次Run方法 scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("延迟3秒执行"); } } , 3 , TimeUnit.SECONDS);*/ //Timer , 项目实际开发中scheduledThreadPool与Timer都不会用到,因为有成熟的调度框架Quartz,或者Spring自带调度, //程序的调度框架支持一种表达式叫做Cron表达式,有兴趣的童鞋可以了解一下。 scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(new Date() + "延迟1秒执行,每三秒执行一次"); } }, 1, 3, TimeUnit.SECONDS); } }
2.
- CountDownLatch倒计时锁特别适合”总-分任务”, 例如多线程计算后的数据汇总
- CountDownLatch类位于java.util.concurrent (J.U.C)包下,利用它可以实现类似计数器的功能。 比如有一个任务A,它要等待其他3个任务执行完毕之 后才能执行,此时就可以利用CountDownLatch来实 现这种功能了。
代码示例:
public class CountDownSample { private static int count = 0; //初始化累加的总数 public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(100); //定长线程池启动100个线程 CountDownLatch cdl = new CountDownLatch(10000); //CDL总数和操作数保持一致 for(int i = 1 ; i <= 10000 ; i++) { //执行一万次累加操作 final int index = i; threadPool.execute(new Runnable() { @Override public void run() { synchronized (CountDownSample.class) { try { count = count + index; //计数器减一 }catch(Exception e){ e.printStackTrace(); }finally { cdl.countDown(); } } } }); } /* try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }*/ try { cdl.await(); //堵塞当前线程,直到cdl=0的时候再继续往下走 //为了避免程序一直挂起,我们可以设置一个timeout时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(count); threadPool.shutdown(); } }
3.
代码示例:
public class SemaphoreSample1 { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); //定义一个可缓存线程池 Semaphore semaphore = new Semaphore(5);//定义5个信号量,也就是说服务器只允许5个人在里面玩 for(int i = 1 ; i <= 20 ; i++) { final int index = i; threadPool.execute(new Runnable() { @Override public void run() { try { semaphore.acquire();//获取一个信号量,“占用一个跑到” play(); semaphore.release();//执行完成后释放这个信号量,“从跑道出去” } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } public static void play(){ try { System.out.println(new Date() + " " + Thread.currentThread().getName() + ":获得紫禁之巅服务器进入资格"); Thread.sleep(2000); System.out.println(new Date() + " " + Thread.currentThread().getName() + ":退出服务器"); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }
4.CyclicBarrier循环屏障
CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同 的是该barrier在释放等待线程后可以重用,所以称它为循 环(Cyclic)的屏障(Barrier)
CyclicBarrier适用于多线程必须同时开始的场景,比如跑分和秒杀以及抢票
代码示例:
public class CyclicBarrierSample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for(int i = 1 ; i<=20 ; i++) { final int index = i; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } executorService.execute(new Runnable() { @Override public void run() { go(); } }); } executorService.shutdown(); } private static void go(){ System.out.println(Thread.currentThread().getName() + ":准备就绪" ); try { cyclicBarrier.await();//设置屏障点,当累计5个线程都准备好后,才运行后面的代码 System.out.println(Thread.currentThread().getName() + ":开始运行"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
5.
- 重入锁是指任意线程在获取到锁之后,再次获取该锁而不会被该锁 所阻塞
- ReentrantLock设计的目标是用来替代synchronized关键字
6.
Condition条件唤醒:
- 我们在并行程序中,避免不了某些线程要预先规定好的顺序执行, 例如:先新增再修改,先买后卖,先进后出……,对于这类场景,使 用JUC的Condition对象再合适不过了。
- JUC中提供了Condition对象,用于让指定线程等待与唤醒,按预期 顺序执行。它必须和ReentrantLock重入锁配合使用。
- Condition用于替代wait()/notify()方法
– notify只能随机唤醒等待的线程,而Condition可以唤醒指定的线程,这有利于更好 的控制并发程序。
- await() – 阻塞当前线程,直到singal唤醒
- signal() – 唤醒被await的线程,从中断处继续执行
- signalAll() – 唤醒所有被await()阻塞的线程
执行过程:
public class ConditionSample { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); //Condition对象必须配合Lock一起使用 Condition c1 = lock.newCondition(); //创建Condition Condition c2 = lock.newCondition(); Condition c3 = lock.newCondition(); new Thread(new Runnable() { @Override public void run() { lock.lock(); //加锁 try { c1.await(); //阻塞当前线程,直到有人调用c1.singal的时候线程激活继续执行 Thread.sleep(1000); System.out.println("粒粒皆辛苦"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); //解锁 } } }).start(); new Thread(new Runnable() { @Override public void run() { lock.lock(); //加锁 try { c2.await(); Thread.sleep(1000); System.out.println("谁知盘中餐"); c1.signal(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }).start(); new Thread(new Runnable() { @Override public void run() { lock.lock(); try { c3.await(); Thread.sleep(1000); System.out.println("汗滴禾下土"); c2.signal(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }).start(); new Thread(new Runnable() { @Override public void run() { lock.lock(); try { Thread.sleep(1000); System.out.println("锄禾日当午"); c3.signal(); //T3线程继续执行 } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }).start(); } }
7.Callable_Future
- Callable和Runnable一样代表着任务,区别在于 Callable有返回值并且可以抛出异常。
- Future 是一个接口。它用于表示异步计算的结果。提 供了检查计算是否完成的方法,以等待计算的完成, 并获取计算的结果。
代码示例:
public class FutureSample { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for(int i = 2 ; i <= 10000 ; i++){ Computor c = new Computor(); c.setNum(i); //Future是对用于计算的线程进行监听,因为计算是在其他线程中执行的,所以这个返回结果的过程是异步的 Future<Boolean> result = executorService.submit(c);//将c对象提交给线程池,如有空闲线程立即执行里面的call方法 try { Boolean r = result.get(); //用于获取返回值,如果线程内部的call没有执行完成,则进入等待状态,直到计算完成 if(r == true){ System.out.println(c.getNum()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } executorService.shutdown(); } } class Computor implements Callable<Boolean>{ private Integer num; public Integer getNum() { return num; } public void setNum(Integer num) { this.num = num; } @Override public Boolean call() throws Exception { boolean isprime = true; for(int i = 2 ; i < num ; i++) { if (num % i == 0) { isprime = false; break; } } return isprime; } }
8.
- ArrayList -> CopyOnWriteArrayList – 写复制列表
- HashSet -> CopyOnWriteArraySet – 写复制集合
- HashMap -> ConcurrentHashMap – 分段锁映射
8.1.
CopyOnWriteArrayList通过“副本”解决并发问题
代码示例:
public class CopyOnWriteArrayListSample { public static void main(String[] args) { //写复制列表 List<Integer> list = new CopyOnWriteArrayList<>(); for(int i = 0 ; i < 1000 ; i++){ list.add(i); } Iterator<Integer> itr = list.iterator(); while (itr.hasNext()) { Integer i = itr.next(); list.remove(i); } System.out.println(list); } }
8.2.ConcurrentHashMap
ConcurrentHashMap 采用”分段锁“的方式
代码示例:
public class ConcurrentHashMapSample { public static int users = 100; //同时模拟的并发访问用户数量 public static int downTotal = 50000; //用户下载的真实总数 public static ConcurrentHashMap count = new ConcurrentHashMap() ;//计数器 public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(users); for(int i = 0 ; i < downTotal ; i++){ final Integer index = i; executorService.execute(()->{ //通过多线程模拟N个用户并发访问并下载 try { semaphore.acquire(); count.put(index, index); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } }); } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown();//关闭调度服务 System.out.println("下载总数:" + count.size()); } }
转载请注明:西门飞冰的博客 » JAVA-线程池与JUC