Java多线程--并行模式与算法
Java多线程--并行模式与算法
单例模式
虽然单例模式和并行没有直接关系,但是我们经常会在多线程中使用到单例。单例的好处有:
- 对于频繁使用的对象可以省去new操作花费的时间;
- new操作的减少,随之带来的好处就是缩短了GC停顿时间,减轻了GC压力。
public class Singleton { private static Singleton ourInstance = new Singleton(); public static Singleton getInstance() { return ourInstance; } private Singleton() { } }
但是这样的单例模式还是有些不足,如果上面的实现中还有其他的静态属性,比如public static int FLAG = 1;
当我们调用Singleton.FLAG
时会导致类的加载,从而使得new Singleton()
被调用。
上面是单例模式的一种实现,对象不是按需创建的,俗称“饿汉式”。如果要求对象在需要的时候才被创建,可以使用“懒汉式”。
public class Singleton { private static Singleton ourInstance; public static Singleton getInstance() { if (ourInstance == null) { ourInstance = new Singleton(); } return ourInstance; } private Singleton() { } }
但这个做法又带来了新的问题,getInstance并不是线程安全的,如果有多个线程调用了这个方法,同时进行了ourInstance的判空,就有可能多次创建对象。
可行的做法是加锁,我们加锁是很费时的,这带来了额外的性能问题。
public class Singleton { private static Singleton ourInstance = new Singleton(); public static Singleton getInstance() { if (ourInstance == null) { synchronized (Singleton.class) { ourInstance = new Singleton(); } } return ourInstance; } private Singleton() { } }
比较好的做法是使用内部类。
public class Singleton { private static class SingletonHolder { private static Singleton ourInstance = new Singleton(); } public static Singleton getInstance() { return SingletonHolder.ourInstance; } private Singleton() { } }
这样即使有其他的静态字段先于getInstance方法调用也不会自动创建对象,只有在第一次调用getInstacne方法时,对象才会被创建。
不变模式
在多线程中如果数据是可变的,如普通的int型,被多个线程修改就会产生线程不安全,为了保证线程安全需要进行同步操作,而同步是很费时的。使用不变对象,可以确保在没有同步的多线程下,依然能保持数据内部状态的一致性和正确性。
不变模式的核心思想:一个对象一旦被创建,不能被任何线程修改,也不会自行发生改变。不变模式和只读还是有一定的区别的,只读的属性可能自行发生改变。
不变模式的注意以下几点:
- 去除setter方法以及所有能修改自身属性的方法;
- 将属性设置为private和final,一是确保属性不能被随意访问,二是确保属性被创建后不能被修改;
- 确保子类没有可以重载修改它的行为;
- 有一个能创建完整对象的构造函数。
Java中典型的不变对象就是String了,任何对String的修改都会产生一个新的对象,而原来的对象保持不变。除此之外,还有基本类型的包装类也是不变模式实现的。比如Integer、Long、Double等。
生产者-消费者模式
生产者-消费中是通过内存缓冲区进行通信的,这就避免了两者的直接通信,从而将两者进行解耦,生产者不需要知道消费者的存在,消费者也不需要知道生产者的存在。
JDK中的BlockingQueue可以充当内存缓冲区的作用,使用阻塞队列实现生产者-消费者模式的样例如下
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumer { public static class Producer implements Runnable { private BlockingQueue<Integer> blockingQueue; private Random random = new Random(); public Producer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (true) { Integer a = makeProduct(); try { blockingQueue.put(a); System.out.println(Thread.currentThread().getName() + "生产了" + a + " 队列大小" + blockingQueue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } public Integer makeProduct() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return random.nextInt(10); } } public static class Consumer implements Runnable { private BlockingQueue<Integer> blockingQueue; public Consumer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (true) { Integer a = useProduct(); System.out.println(Thread.currentThread().getName() + "消费产品" + a + " 队列大小" + blockingQueue.size()); } } public Integer useProduct() { Integer a = null; try { a = blockingQueue.take(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return a; } } public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5); ExecutorService exec = Executors.newCachedThreadPool(); int producerNum = 3; int consumerNum = 5; for (int i = 0; i < producerNum; i++) { exec.submit(new Producer(blockingQueue)); } for (int i = 0; i < consumerNum; i++) { exec.submit(new Consumer(blockingQueue)); } } }
运行上面的代码,生产-消费可以正常运行,且我们指定了队列的大小为5,所以输出中队列的size不会超过5.这一切都是阻塞队列的put()/take()
方法实现的,take()
方法会阻塞,直到队列不为空;put()
也会阻塞,直到队列不满。
Future模式
Future模式,核心思想是异步调用。和同步调用的区别在于:如果某个任务A非常耗时,异步调用下,被调者可以立即返回,然后着手处理其他任务,不用在这个任务A上等待。等到真正需要任务A的结果了再尝试去获取。
Future模式,有点类似淘宝购物,加入买一部手机,从付款到收到货可能需要三天,这三天不需要傻傻等待,因为付款后会有一个订单,而这个订单是我买了这件东西、可以凭这个单号取东西的一个凭证。所以你一旦得到了订单,完全可以放下这件事取忙别的,到时候去拿快递就行了。Future模式的异步调用中,立即返回的一个值就类似于这里说的订单了(而不是你买的手机),然后就可以着手处理其他任务了,这就充分利用了等待时间。等其他任务处理完了,再根据拿到的这个类似订单的值,取到真实的数据(手机)。
Data接口
package exercise; public interface Data { String getResult(); }
类似于订单的凭证,FutureData
package exercise; public class FutureData implements Data { private RealData realData; private boolean isReady = false; public synchronized void setRealData(RealData realData) { if (!isReady) { isReady = true; this.realData = realData; notifyAll(); } } @Override public synchronized String getResult() { while (!isReady) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return realData.getResult(); } }
存放真实数据的RealData
package exercise; public class RealData implements Data { private final String result; public RealData(String result) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < 10; i++) { sb.append(result); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } this.result = sb.toString(); } @Override public String getResult() { return result; } }
Client
package exercise; public class Client { public Data request(final String str) { final FutureData futureData = new FutureData(); // 耗时任务新开一个线程,只要5s,在这5s中并不是干等,也完成了任务B new Thread(() -> futureData.setRealData(new RealData(str))).start(); // 如果没有新开线程就要花10s // futureData.setRealData(new RealData(str)); // 直接返回futureData,无需等待 System.out.println("立即返回FutureData"); return futureData; } public static void main(String[] args) { Client client = new Client(); long start = System.currentTimeMillis(); Data data = client.request("ha"); System.out.println("请求完毕"); try { System.out.println("无需等待,开始处理任务B"); Thread.sleep(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("得到数据 "+ data.getResult()); long end = System.currentTimeMillis(); System.out.println((end-start)/ ); } }
运行程序将会打印
立即返回FutureData 请求完毕 无需等待,开始处理任务B 得到数据 hahahahahahahahahaha
上面是一个Future模式的简单例子,主要功能是对输入的字符串重复10次叠加后返回,每次耗时500ms,总共花费5s。Client的request方法,新开一个线程完成RealData的构造,注意开启新线程才能做到两个任务同时执行,所以最后只花5s,就可以完成这两个任务;如果不开启新线程,将会花费10s(两个任务都花费5s)。
Data data = client.request("ha");
这个返回的是FutureData,然后data.getResult()
,当我们需要取真实数据时可以调用该方法,如果数据准备好了就直接返回,否则会一直阻塞直到数据准备完毕。
JDK内置的Future模式
JDK内部已经实现好了一套Future模式,实现和上面FutureData相同的功能,稍微调整下代码,如下:
package exercise; import java.util.concurrent.Callable; public class RealDataDemo implements Callable<String> { private String str; public RealDataDemo(String str) { this.str = str; } @Override public String call() throws Exception { StringBuilder sb = new StringBuilder(); for (int i = 0; i < 10; i++) { sb.append(str); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } return sb.toString(); } }
package exercise; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(new RealDataDemo("ha")); ExecutorService service = Executors.newFixedThreadPool(1); // 相当于上例中的client.request("ha") long start = System.currentTimeMillis(); service.submit(futureTask); System.out.println("请求完毕"); try { System.out.println("无需等待,开始处理任务B"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("得到数据 "+ futureTask.get()); long end = System.currentTimeMillis(); System.out.println((end-start)/ 1000); service.shutdown(); } }
运行上述代码,会打印如下内容:
请求完毕 无需等待,开始处理任务B 得到数据 hahahahahahahahahaha
有现成的当然是直接拿来用了啊!
by @sunhaiyu
2018.5.18