串行程序如何并行化,串行和并行的区别

电子常识

2640人已加入

描述

  1、串行程序

  串行程序是基于嵌入式Linux串行通信GUI终端设计及实现。传统意义上的写法,我们得到的往往会是串行执行的程序形态,程序的总的执行时间是method1的执行时间time1加上method2的执行时间time2,这样总的执行时间time=time1+time2。我们得到的是串行的程序形态。

  import com.yang.domain.BaseResult;

  import org.junit.Test;

  import java.util.concurrent.TimeUnit;

  /**

  * @Description:

  * @Author: yangzl2008

  * @Date: 2016/1/9 19:06

  */

  public class Serial {

  @Test

  public void test() {

  long start = System.currentTimeMillis();

  BaseResult baseResult1 = method1();// 耗时操作1,时间 time1

  BaseResult baseResult2 = method2();// 耗时操作2,时间 time2

  long end = System.currentTimeMillis();

  //总耗时 time = time1 + time2

  System.out.println(“baseResult1 is ” + baseResult1 + “\nbaseResult2 is ” + baseResult2 + “\ntime cost is ” + (end - start));

  }

  private BaseResult method1() {

  BaseResult baseResult = new BaseResult();

  try {

  TimeUnit.SECONDS.sleep(1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  baseResult.setCode(1);

  baseResult.setMsg(“method1”);

  return baseResult;

  }

  private BaseResult method2() {

  BaseResult baseResult = new BaseResult();

  try {

  TimeUnit.SECONDS.sleep(1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  baseResult.setCode(1);

  baseResult.setMsg(“method2”);

  return baseResult;

  }

  }

  执行结果:

  [plain]

  BaseResult{code=1, msg=‘method1’}

  baseResult2 is BaseResult{code=1, msg=‘method2’}

  time cost is 2000

  2、串行程序的多线程过渡

  而这种代码是不是可优化的地方呢?加快程序的执行效率,降低程序的执行时间。在这里method1和method2是相互不关联的,即method1的执行和method2的执行位置可以调整,而不影响程序的执行结果,我们可不可以为建立线程1执行method1然后建立线程2来执行method2呢,因为method1和method2都需要得到结果,因此我们需要使用Callable接口,然后使用Future.get()得到执行的结果,但实际上Future.get()在程序返回结果之前是阻塞的,即,线程1在执行method1方式时,程序因为调用了Future.get()会等待在这里直到method1返回结果result1,然后线程2才能执行method2,同样,Future.get()也会一直等待直到method2的结果result2返回,这样,我们开启了线程1,开启了线程2并没有得到并发执行method1,method2的效果,反而会因为程序开启线程而多占用了程序的执行时间,这样程序的执行时间time=time1+time2+time(线程开启时间)。于是我们得到了串行程序的过渡态。

  [java] view plain copyimport com.yang.domain.BaseResult;

  import org.junit.Test;

  import java.lang.reflect.Method;

  import java.util.concurrent.*;

  /**

  * @Description:

  * @Author: yangzl2008

  * @Date: 2016/1/9 19:13

  */

  public class SerialCallable {

  @Test

  public void test01() throws Exception {

  // 两个线程的线程池

  ExecutorService executorService = Executors.newFixedThreadPool(2);

  long start = System.currentTimeMillis();

  // 开启线程执行

  Future《BaseResult》 future1 = executorService.submit(new Task(this, “method1”, null));

  // 阻塞,直到程序返回。耗时time1

  BaseResult baseResult1 = future1.get();

  // 开启线程执行

  Future《BaseResult》 future2 = executorService.submit(new Task(this, “method1”, null));

  // 阻塞,直到程序返回。耗时time2

  BaseResult baseResult2 = future2.get();

  long end = System.currentTimeMillis();

  // 总耗时 time = time1 + time2 + time(线程执行耗时)

  System.out.println(“baseResult1 is ” + baseResult1 + “\nbaseResult2 is ” + baseResult2 + “\ntime cost is ” + (end - start));

  }

  public BaseResult method1() {

  BaseResult baseResult = new BaseResult();

  try {

  TimeUnit.SECONDS.sleep(1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  baseResult.setCode(1);

  baseResult.setMsg(“method1”);

  return baseResult;

  }

  public BaseResult method2() {

  BaseResult baseResult = new BaseResult();

  try {

  TimeUnit.SECONDS.sleep(1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  baseResult.setCode(1);

  baseResult.setMsg(“method2”);

  return baseResult;

  }

  class Task《T》 implements Callable《T》 {

  private Object object;

  private Object[] args;

  private String methodName;

  public Task(Object object, String methodName, Object[] args) {

  this.object = object;

  this.args = args;

  this.methodName = methodName;

  }

  @Override

  public T call() throws Exception {

  Method method = object.getClass().getMethod(methodName);

  return (T) method.invoke(object, args);

  }

  }

  }

  执行结果:

  [plain]

  BaseResult{code=1, msg=‘method1’}

  baseResult2 is BaseResult{code=1, msg=‘method1’}

  time cost is 2001

  3、并行程序

  有没有方法可以在执行method1的时候同时执行method2,最后得到结果再进行处理?我们回到问题的出处,程序首先执行完method1得到结果result1之后,在执行method2获得结果result2,然后再按照result1和result2的结果来判定程序下一步的执行,最终我们得到的结果是result1和result2,然后再进行下一步操作,那么在我们得到result1和result2的时候,method1和method2其实是可以并发执行的,即我首先执行method1然后再执行mothod2,我不管他们的返回结果,只有在我要拿result1和result2进行操作的时候,程序才会调用Future.get()方法(这个方法会一直等待,直到结果返回),这是一种延迟加载的思想,与Hibernate中属性的延迟加载是一致的,即对于属性A,平时我是不用时不会进行赋值,只有我在用的时候,才执行SQL查询对其进行赋值操作。于是,我们得到了并发执行的程序形态。

  Hibernate亦使用CGLIB来实现延迟加载,因此,我们可以考虑使用CGLIB的延迟加载类,将串行的程序并行化!

  [java] view plain copyimport com.yang.domain.BaseResult;

  import net.sf.cglib.proxy.Enhancer;

  import net.sf.cglib.proxy.LazyLoader;

  import org.junit.Test;

  import java.lang.reflect.Method;

  import java.util.concurrent.*;

  /**

  * @Description:

  * @Author: yangzl2008

  * @Date: 2016/1/9 19:39

  */

  public class Parallel {

  @Test

  public void test02() throws Exception {

  ExecutorService executorService = Executors.newFixedThreadPool(2);

  long start = System.currentTimeMillis();

  // 开启线程执行

  Future《BaseResult》 future1 = executorService.submit(new Task(this, “method1”, null));

  // 不阻塞,正常执行,baseResult1是cglib的代理类,采用延迟加载,只有在使用的时候才调用方法进行赋值。

  BaseResult baseResult1 = futureGetProxy(future1, BaseResult.class);

  // 开启线程执行

  Future《BaseResult》 future2 = executorService.submit(new Task(this, “method2”, null));

  // 不阻塞,正常执行,baseResult1是cglib的代理类,采用延迟加载,只有在使用的时候才调用方法进行赋值。

  BaseResult baseResult2 = futureGetProxy(future2, BaseResult.class);

  // 这里要使用baseResult1和baseResult2

  System.out.println(“baseResult1 is ” + baseResult1 + “\nbaseResult2 is ” + baseResult2);

  long end = System.currentTimeMillis();

  // 总耗时time = max(time1,time2)

  System.out.println(“time cost is ” + (end - start));

  }

  private 《T》 T futureGetProxy(Future《T》 future, Class clazz) {

  Enhancer enhancer = new Enhancer();

  enhancer.setSuperclass(clazz);

  return (T) enhancer.create(clazz, new FutureLazyLoader(future));

  }

  /**

  * 延迟加载类

  * @param 《T》

  */

  class FutureLazyLoader《T》 implements LazyLoader {

  private Future《T》 future;

  public FutureLazyLoader(Future《T》 future) {

  this.future = future;

  }

  @Override

  public Object loadObject() throws Exception {

  return future.get();

  }

  }

  public BaseResult method1() {

  BaseResult baseResult = new BaseResult();

  try {

  TimeUnit.SECONDS.sleep(1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  baseResult.setCode(1);

  baseResult.setMsg(“method1”);

  return baseResult;

  }

  public BaseResult method2() {

  BaseResult baseResult = new BaseResult();

  try {

  TimeUnit.SECONDS.sleep(1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  baseResult.setCode(1);

  baseResult.setMsg(“method2”);

  return baseResult;

  }

  class Task《T》 implements Callable《T》 {

  private Object object;

  private Object[] args;

  private String methodName;

  public Task(Object object, String methodName, Object[] args) {

  this.object = object;

  this.args = args;

  this.methodName = methodName;

  }

  @Override

  public T call() throws Exception {

  Method method = object.getClass().getMethod(methodName);

  return (T) method.invoke(object, args);

  }

  }

  }

  执行结果:

  [plain] view plain copybaseResult1 is BaseResult{code=1, msg=‘method1’}

  baseResult2 is BaseResult{code=1, msg=‘method2’}

  time cost is 1057

  4、串行程序并行化

  考虑这样一个问题:统计某个工程的代码行数。首先想到的思路便是,递归文件树,每层递归里,循环遍历父文件夹下的所有子文件,如果子文件是文件夹,那么再对这个文件夹进行递归调用。于是问题很轻松的解决了。这个方案可以优化吗? 

  再回想这个问题,可以发现,循环里的递归调用其实相互之间是独立的,互不干扰,各自统计自己路径下的代码文件的行数。于是,发现了这个方案的可优化点——利用线程池进行并行处理。于是一个串行的求解方案被改进成了并行方案。

  不能光说不练,写了一个Demo,对串行方案和并行方案进行了量化对比。代码如下:

  [java] view plain copyimport java.io.*;

  import java.util.Queue;

  import java.util.concurrent.*;

  /**

  * Created by cdlvsheng on 2016/5/16.

  */

  public class ParallelSequentialContrast {

  int coreSize = Runtime.getRuntime().availableProcessors();

  ThreadPoolExecutor exec = new ThreadPoolExecutor(coreSize * 4, coreSize * 5, 0, TimeUnit.SECONDS,

  new LinkedBlockingQueue《Runnable》(10000), new ThreadPoolExecutor.CallerRunsPolicy());

  Queue《Future《Integer》》 queue = new ConcurrentLinkedQueue《Future《Integer》》();

  private int countLineNum(File f) {

  if (!f.getName().endsWith(“java”) && !f.getName().endsWith(“.js”) && !f.getName().endsWith(“.vm”)) return 0;

  int sum = 0;

  try {

  BufferedReader br = new BufferedReader(new FileReader(f));

  String str = null;

  while ((str = br.readLine()) != null) sum++;

  } catch (FileNotFoundException e) {

  e.printStackTrace();

  } catch (IOException e) {

  e.printStackTrace();

  }

  return sum;

  }

  private class Task implements Callable《Integer》 {

  File f;

  public Task(File f) {

  this.f = f;

  }

  public Integer call() throws Exception {

  int sum = 0;

  if (f.isDirectory()) {

  File[] fs = f.listFiles();

  for (File file : fs) {

  if (file.isDirectory()) queue.add(exec.submit(new Task(file)));

  else sum += countLineNum(file);

  }

  } else sum += countLineNum(f);

  return sum;

  }

  }

  public int parallelTraverse(File f) {

  queue.add(exec.submit(new Task(f)));

  int sum = 0;

  while (!queue.isEmpty()) {

  try {

  Future《Integer》 future = queue.poll();

  sum += future.get();

  } catch (InterruptedException e) {

  e.printStackTrace();

  } catch (ExecutionException e) {

  e.printStackTrace();

  }

  }

  exec.shutdown();

  return sum;

  }

  public int sequentialTraverse(File f) {

  int sum = 0;

  if (f.isDirectory()) {

  File[] fs = f.listFiles();

  for (File file : fs) {

  if (file.isDirectory()) sum += sequentialTraverse(file);

  else sum += countLineNum(file);

  }

  } else sum += countLineNum(f);

  return sum;

  }

  public void parallelTest(ParallelSequentialContrast psc, String pathname) {

  long start = System.currentTimeMillis();

  int sum = psc.parallelTraverse(new File(pathname));

  long duration = System.currentTimeMillis() - start;

  System.out.println(String.format(“parallel test, %d lines of code were found, time cost is %d ms”, sum, duration));

  }

  public void sequentialTest(ParallelSequentialContrast psc, String pathname) {

  long start = System.currentTimeMillis();

  int sum = psc.sequentialTraverse(new File(pathname));

  long duration = System.currentTimeMillis() - start;

  System.out.println(String.format(“sequential test, %d lines of code were found, time cost is %d ms”, sum, duration));

  }

  public static void main(String[] args) {

  ParallelSequentialContrast psc = new ParallelSequentialContrast();

  String pathname = “D:\\Code_Git”;

  psc.sequentialTest(psc, pathname);

  psc.parallelTest(psc, pathname);

  }

  }

  因为要不断的扫磁盘(虽然我的是固态硬盘),所以并行方案的线程池开的很大。IO密集型程序的相对CPU密集型程序的线程池会更大。

  程序运行结果如下:

  [plain] view plain copysequential test, 415079 lines of code were found, time cost is 364 ms

  parallel test, 415079 lines of code were found, time cost is 163 ms

  可以发现,在结果同等精确的情况下,串行方案耗时是并行方案的两倍多。这个是在我个人PC上做的测试,如果是线上服务器运行,恐怕差距只会更加明显。

  如果一个大任务,由许多个相互独立的子任务组成,我们就可以在这里找突破点,把一个串行程序并行化,榨干多和服务器的性能!

  5、串行和并行的区别

  串行通信是指 使用一条数据线,将数据一位一位地依次传输,每一位数据占据一个固定的时间长度。其只需要少数几条线就可以在系统间交换信息,特别使用于计算机与计算机、计算机与外设之间的远距离通信。终端与其他设备(例如其他终端、计算机和外部设备)通过数据传输进行通信。数据传输可以通过两种方式进行:并行通信和串行通信。 在计算机和终端之间的数据传输通常是靠电缆或信道上的电流或电压变化实现的。如果一组数据的各数据位在多条线上同时被传输,这种传输方式称为并行通信。 并行通信时数据的各个位同时传送,可以字或字节为单位并行进行。并行通信速度快,但用的通信线多、成本高,故不宜进行远距离通信。计算机或plc各种内部总线就是以并行方式传送数据的。另外,在PLC底板上,各种模块之间通过底板总线交换数据也以并行方式进行。

  并行通信传输中有多个数据位,同时在两个设备之间传输。发送设备将这些数据位通过 对应的数据线传送给接收设备,还可附加一位数据校验位。接收设备可同时接收到这些数据,不需要做任何变换就可直接使用。并行方式主要用于近距离通信。计算 机内的总线结构就是并行通信的例子。这种方法的优点是传输速度快,处理简单。

并行

  串行数据传输时,数据是一位一位地在通信线上传输的,先由具有几位总线的计算机内的发送设备,将几位并行数据经并--串转换硬件转换成串行方式,再逐位经 传输线到达接收站的设备中,并在接收端将数据从串行方式重新转换成并行方式,以供接收方使用。串行数据传输的速度要比并行传输慢得多,但对于覆盖面极其广 阔的公用电话系统来说具有更大的现实意义。

并行

  串行数据通信的方向性结构有三种,即单工、半双工和全双工。

并行

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分