Java 线程池的理论与实践

图片 3

前段时间公司里有个项目需要进行重构,目标是提高吞吐量和可用性,在这个过程中对原有的线程模型和处理逻辑进行了修改,发现有很多基础的多线程的知识已经模糊不清,如底层线程的运行情况、现有的线程池的策略和逻辑、池中线程的健康状况的监控等,这次重新回顾了一下,其中涉及大量java.util.concurrent包中的类。本文将会包含以下内容:

<关于并发框架>Java原生线程池原理及Guava与之的补充,javaguava

原创博客,转载请联系博主! 

 

 

  转眼快两个月没有更新自己的博客了。

  一来感觉自己要学的东西还是太多,与其花几个小时写下经验分享倒不如多看几点技术书。

  二来放眼网上已经有很多成熟的中文文章介绍这些用法,自己赘述无异重造车轮。

  所以,既然开始打算要写,就希望可以有一些与众不同的用法和新意,可以给大家一点启发。

 

  使用Java中成型的框架来帮助我们开发并发应用即可以节省构建项目的时间,也可以提高应用的性能。

 

  Java对象实例的锁一共有四种状态:无锁,偏向锁,轻量锁和重量锁。原始脱离框架的并发应用大部分都需要手动完成加锁释放,最直接的就是使用synchronized和volatile关键字对某个对象或者代码块加锁从而限制每次访问的次数,从对象之间的竞争也可以实现到对象之间的协作。但是这样手动实现出来的应用不仅耗费时间而且性能表现往往又有待提升。顺带一提,之前写过一篇文章介绍我基于Qt和Linux实现的一个多线程下载器(到这里不需要更多了解这个下载器,请直接继续阅读),就拿这个下载器做一次反例:

  首先,一个下载器最愚蠢的问题之一就是把下载线程的个数交由给用户去配置。比如一个用户会认为负责下载的线程个数是越多越好,干脆配置了50个线程去下载一份任务,那么这个下载器的性能表现甚至会不如一个单进程的下载程序。最直接的原因就是JVM花费了很多计算资源在线程之间的上下文切换上面,对于一个并发的应用:如果是CPU密集型的任务,那么良好的线程个数是实际CPU处理器的个数的1倍;如果是I/O密集型的任务,那么良好的线程个数是实际CPU处理器个数的1.5倍到2倍(具体记不清这句话是出于哪里了,但还是可信的)。不恰当的执行线程个数会给线程抖动,CPU抖动等隐患埋下伏笔。如果,重新开发那么我一定会使用这种线程池的方法使用生产者和消费者的关系模式,异步处理HTTP传输过来的报文。

  其次,由于HTTP报文的接受等待的时间可能需要等待很久,然而处理报文解析格式等等消耗的计算资源是相当较小的。同步地处理这两件事情必然会使下载进程在一段时间内空转或者阻塞,这样处理也是非常不合理的。如果重新开发,一定要解耦HTTP报文的接收和HTTP报文的解析,这里尽管也可以使用线程池去进行处理,显而易见由于这样去做的性能提升其实是很小的,所以没有必要去实现,单线程也可以快速完成报文的解析。

 

  Okay,回到主题,总而言之是线程之间的上下文切换导致了性能的降低。那么具体应该怎么样去做才可以减少上下文的切换呢?

 

 


 

 

  1. 无锁并发编程

    多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程去处理不同段的数据。

  2. CAS算法

    Java的Atomic包内使用CAS算法来更新数据,而不需要加锁(但是线程的空转还是存在)。

  3. 使用最少线程

    避免创建不需要的线程,比如任务很少,但是创建很多线程来处理,这样会造成大量线程都处于等待状态。

  4. 协程

    在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。

 


 

 

  总的来说使用Java线程池会带来以下3个好处:

 

  1.
降低资源消耗:      通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  2.
提高响应速度:      当任务到达时,任务可以不需要等到线程创建就能立即执行。

  3.
提高线程的可管理性:   线程是稀缺资源,如果无限制的创建。不仅仅会降低系统的稳定性,使用线程池可以统一分配,调优和监控。但是要做到合理的利用线程池。必须对于其实现原理了如指掌。

 

线程池的实现原理如下图所示:

 Executor框架的两级调度模型:

 

  在HotSpot
VM线程模型中,Java线程被一对一的映射为本地操作系统线程,Java线程启动时会创建一个本地操作系统线程,当该Java线程终止时,这个操作系统也会被回收。操作系统会调度并将它们分配给可用的CPU。

  在上层,Java多线程程序通常把应用分解为若干个任务,然后把用户级的调度器(Executor框架)将这些映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这种两级调度模型实质是一种工作单元和执行机制的解偶。

 

Fork/Join框架的递归调度模型:

 

  要提高应用程序在多核处理器上的执行效率,只能想办法提高应用程序的本身的并行能力。常规的做法就是使用多线程,让更多的任务同时处理,或者让一部分操作异步执行,这种简单的多线程处理方式在处理器核心数比较少的情况下能够有效地利用处理资源,因为在处理器核心比较少的情况下,让不多的几个任务并行执行即可。但是当处理器核心数发展很大的数目,上百上千的时候,这种按任务的并发处理方法也不能充分利用处理资源,因为一般的应用程序没有那么多的并发处理任务(服务器程序是个例外)。所以,只能考虑把一个任务拆分为多个单元,每个单元分别得执行最后合并每个单元的结果。一个任务的并行拆分,一种方法就是寄希望于硬件平台或者操作系统,但是目前这个领域还没有很好的结果。另一种方案就是还是只有依靠应用程序本身对任务经行拆封执行。 

  Fork/Join模型乍看起来很像借鉴了MapReduce,但是具体不敢肯定是什么原因,实际用起来的性能提升是远不如Executor的。甚至在递归栈到了十层以上的时候,JVM会卡死或者崩溃,从计算机的物理原理来看,Fork/Join框架实际效能也没有想象中的那么美好,所以这篇只稍微谈一下,不再深究。

 


 

 

Executor框架主要由三个部分组成:任务,任务的执行,异步计算的结果。

 

主要的类和接口简介如下:

 

1. Executor是一个接口,它将任务的提交和任务的执行分离。

2. ThreadPoolExecutor是线程池的核心,用来执行被提交的类。

3. Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

4.
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或其他执行。

 

先看一个直接的例子(用SingleThreadExecutor来实现,具体原理下面会阐述):

 

 1 public class ExecutorDemo {
 2 
 3 
 4     public static void main(String[] args){
 5 
 6         //ExecutorService fixed= Executors.newFixedThreadPool(4);
 7         ExecutorService single=Executors.newSingleThreadExecutor();
 8         //ExecutorService cached=Executors.newCachedThreadPool();
 9         //ExecutorService sched=Executors.newScheduledThreadPool(4);
11         
12         Callable<String> callable=Executors.callable(new Runnable() {
13             @Override
14             public void run() {
15                 for(int i=0;i<100;i++){
16                     try{
17                         System.out.println(i);
18                     }catch(Throwable e){
19                         e.printStackTrace();
20                     }
21                 }
22             }
23         },"success");
24      //这里抖了个机灵,用Executors工具类的callable方法将一个匿名Runnable对象装饰为Callable对象作为参数
25         Future<String> f=single.submit(callable);
26         try {
27             System.out.println(f.get());
28             single.shutdown();
29         }catch(Throwable e){
30             e.printStackTrace();
31         }
32     }
33 }

 

如代码中所示,常用一共有四种Exector实现类通过Executors的工厂方法来创建Executor的实例,其具体差别及特点如下所示:

 

1. FixedThreadPool

 

  这个是我个人最常用的实现类,在Java中最直接的使用方法就是和 Runtime.getRuntime().availableProcessors()
一起使用分配处理器个数个的Executor。内部结构大致如下:

创造实例的函数为:  Executors.newFixedThreadPool(int nThread);

   在JDK1.7里java.util.concurrent包中的源码中队列使用的是new
LinkedBlockingQueue<Runnable>,这是一个无界的队列,也就是说任务有可能无限地积压在这个等待队列之中,实际使用是存在一定的隐患。但是构造起来相当比较容易,我个人建议在使用的过程之中不断查询size()来保证该阻塞队列不会无限地生长。

 

2. SingleThreadExecutor

和 Executors.newFixedThreadPool(1) 完全等价。

 

3. CachedThreadPool

  和之前两个实现类完全不同的是,这里使用SynchronousQueue替换LinkedBlockingQueue。简单提一下SynchronousQueue是一个没有容量的队列,一个offer必须对应一个poll,当然所谓poll操作是由实际JVM工作线程来进行的,所以对于使用开发者来讲,这是一个会因为工作线程饱和而阻塞的线程池。(这个和java.util.concurrent.Exchanger的作用有些相似,但是Exchanger只是对于两个JVM线程的,而SynchronousQueue的阻塞机制是多个生产者和多个消费者而言的。)

 

4. ScheduledThreadPoolExecutor

  这个实现类内部使用的是DelayQueue。DelayQueue实际上是一个优先级队列的封装。时间早的任务会拥有更高的优先级。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor比Timer更加灵活,而且可以有多个后台线程在构造函数之中指定。

 

 

Future接口和ListenableFurture接口

 

  Future接口为异步计算取回结果提供了一个存根(stub),然而这样每次调用Future接口的get方法取回计算结果往往是需要面临阻塞的可能性。这样在最坏的情况下,异步计算和同步计算的消耗是一致的。Guava库中因此提供一个非常强大的装饰后的Future接口,使用观察者模式为在异步计算完成之后马上执行addListener指定一个Runnable对象,从实现“完成立即通知”。这里提供一个有效的Tutorial

 

 

 

 

 

  

 

原创博客,转载请联系博主! 转眼快两个月没有更新自己的博客了。
一来感觉…

  1. Java中的Thread与操作系统中的线程的关系
  2. 线程切换的各种开销
  3. ThreadGroup存在的意义
  4. 使用线程池减少线程开销
  5. Executor的概念
  6. ThreadPoolExecutor中的一些具体实现
  7. 如何监控线程的健康
  8. 参考ThreadPoolExecutor来设计适合自己的线程模型

一、问题描述

这个项目所在系统的软件架构(从开发到运维)基本上采用的是微服务架构,微服务很好地解决了我们系统的复杂性问题,但是随之也带来了一些问题,比如在此架构中大部分的服务都拥有自己单独的数据库,而有些(很重要的)业务需要做跨库查询。相信这种「跨库查询」的问题很多实践微服务的公司都碰到过,通常这类问题有以下几种解决方案(当然,还有更多其他的方案,这里就不一一叙述了):

  1. 严格通过服务提供的API查询。这样做的好处是将服务完全当做黑盒,可以最大限度得减少服务间的依赖与耦合关系,其次还能根据实际需求服务之间使用不同的数据库类型;缺点是则代价太大。
  2. 将关心的信息冗余到自己的库中,并提供API让其他服务来主动修改。优点是信息更新十分实时,缺点是增加了服务间的依赖。
  3. 指令与查询分离(CQRS)。将可能被其他服务关心的数据放入数据仓库(或者做成类似于物化视图、搜索引擎等),数据仓库只提供读的功能。优点是对主库不会有压力,服务只要关心实现自己的业务就好,缺点是数据的实时性会受到了挑战。

图片 1

指令与查询分离

结合实际情况,我们使用的是第3种方案。然而随着越来越多的业务依赖读库,甚至依赖其中一些状态的变化,所以读库的数据同步如果出现高延时,则会直接影响业务的进行。出了几次这种事情后,于是下决心要改善这种情况。首先想到的就是使用线程池来进行消息的消费(写入读库),JDK自从1.5开始提供了实用而强大的线程池工具——Executor框架。

二、Executor框架

Executor框架在Java1.5中引入,大部分的类都在包java.util.concurrent中,由大神Doug
Lea写成,其中常用到的有以下几个类和接口:

  1. java.util.concurrent.Executor一个只包含一个方法的接口,它的抽象含义是:用来执行一个Runnable任务的执行器。
  2. java.util.concurrent.ExecutorService对Executor的一个扩展,增加了很多对于任务和执行器的生命周期进行管理的接口,也是通常进行多线程开发最常使用的接口。
  3. java.util.concurrent.ThreadFactory一个生成新线程的接口。用户可以通过实现这个接口管理对线程池中生成线程的逻辑
  4. java.util.concurrent.Executors提供了很多不同的生成执行器的实用方法,比如基于线程池的执行器的实现。

三、为什么要用线程池

Java从最开始就是基于线程的,线程在Java里被封装成一个类java.lang.Thread。在面试中很多面试官都会问一个很基础的关于线程问题:

Java中有几种方法新建一个线程?

所有人都知道,标准答案是两种:继承Thread或者实现Runnable,在JDK源代码中Thread类的注释中也是这么写的。

然而在我看来这两种方法根本就是一种,所有想要开启线程的操作,都必须生成了一个Thread类(或其子类)的实例,执行其中的native方法start0()

Java中的线程

Java中将线程抽象为一个普通的类,这样带来了很多好处,譬如可以很简单的使用面向对象的方法实现多线程的编程,然而这种程序写多了容易会忘记,这个对象在底层是实实在在地对应了一个OS中的线程。

图片 2

操作系统中的线程和进程

上图中的进程(Process)可以看做一个JVM,可以看出,所有的进程有自己的私有内存,这块内存会在主存中有一段映射,而所有的线程共享JVM中的内存。在现代的操作系统中,线程的调度通常都是集成在操作系统中的,操作系统能通过分析更多的信息来决定如何更高效地进行线程的调度,这也是为什么Java中会一直强调,线程的执行顺序是不会得到保证的,因为JVM自己管不了这个,所以只能认为它是完全无序的。

另外,类java.lang.Thread中的很多属性也会直接映射为操作系统中线程的一些属性。Java的Thread中提供的一些方法如sleep和yield其实依赖于操作系统中线程的调度算法。

关于线程的调度算法可以去读操作系统相关的书籍,这里就不做太多叙述了。

线程的开销

通常来说,操作系统中线程之间的上下文切换大约要消耗1到10微秒

从上图中可以看出线程中包含了一些上下文信息:

  • CPU栈指针(Stack)、
  • 一组寄存器的值(Registers),
  • 指令计数器的值(PC)等,

它们都保存在此线程所在的进程所映射的主存中,而对于Java来说,这个进程就是JVM所在的那个进程,JVM的运行时内存可以简单的分为如下几部分:

  1. 若干个栈(Stack)。每个线程有自己的栈,JVM中的栈是不能存储对象的,只能存储基础变量和对象引用。
  2. 堆(Heap)。一个JVM只有一个堆,所有的对象都在堆上分配。
  3. 方法区(Method
    Area)。一个JVM只有一个方法区,包含了所有载入的类的字节码和静态变量。

其中#1中的栈可以认为是这个线程的上下文,创建线程要申请相应的栈空间,而栈空间的大小是一定的,所以当栈空间不够用时,会导致线程申请不成功。在Thread的源代码中可以看到,启动线程的最后一步是执行一个本地方法private native void start0(),代码1是OpenJDK中start0最终调用的方法:

//代码1
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;
  bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event
  // in Thread::start.
  {
    MutexLocker mu(Threads_lock);

    //省略一些代码

      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);
  }

  if (native_thread->osthread() == NULL) {
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              "unable to create new native thread");
  }

  Thread::start(native_thread);

JVM_END

从代码1中可以看到,线程的创建首先需要栈空间,所以过多的线程创建可能会导致OOM。

同时,线程的切换会有以下开销:

  1. CPU中执行上下文的切换,导致CPU中的「指令流水线(Instruction
    Pipeline)」的中断和CPU缓存的失效。
  2. 如果线程太多,线程切换的时间会比线程执行的时间要长,严重浪费了CPU资源。
  3. 对于共享资源的竞争(锁)会导致线程切换开销急剧增加。

根据以上的描述,所以通常建议尽可能创建较少的线程,减少锁的使用(尤其是synchronized),尽量使用JDK提供的同步工具。而为了减少线程上下文切换带来的开销,通常使用线程池是一个有效的方法。

Java中的线程池

Executor框架中最常用的大概就是java.util.concurrent.ThreadPoolExecutor了,对于它的描述,简单的说就是「它维护了一个线程池,对于提交到此Executor中的任务,它不是创建新的线程而是使用池内的线程进行执行」。对于「数量巨大但执行时间很小」的任务,可以显著地减少对于任务执行的开销。java.util.concurrent.ThreadPoolExecutor中包含了很多属性,通过这些属性开发者可以定制不同的线程池行为,大致如下:

1. 线程池的大小:corePoolSizemaximumPoolSize

ThreadPoolExecutor中线程池的大小由这两个属性决定,前者指当线程池正常运行起来后的最小(核心)线程数,当一个任务到来时,若当前池中线程数小于corePoolSize,则会生成新的线程;后者指当等待队列满了之后可生成的最大的线程数。在例1中返回的对象中这两个值相等,均等于用户传入的值。

2.
用户可以通过调用java.util.concurrent.ThreadPoolExecutor上的实例方法来启动核心线程(core
pool)

3. 可定制化的线程生成方式:threadFactory

默认线程由方法Executors.defaultThreadFactory()返回的ThreadFactory进行创建,默认创建的线程都不是daemon,开发者可以传入自定义的ThreadFactory进行对线程的定制化。

5. 非核心线程的空闲等待时间:keepAliveTime

6. 任务等待队列:workQueue

这个队列是java.util.concurrent.BlockingQueue<E>的一个实例。当池中当前没有空闲的线程来执行任务,就会将此任务放入等待队列,根据其具体实现类的不同,又可分为3种不同的队列策略:

  1. 容量为0。如:java.util.concurrent.SynchronousQueue等待队列容量为0,所有需要阻塞的任务必须等待池内的某个线程有空闲,才能继续执行,否则阻塞。调用Executors.newCachedThreadPool的两个函数生成的线程池是这个策略。
  2. 不限容量。如:不指定容量的java.util.concurrent.LinkedBlockingQueue等待队列的长度无穷大,根据上文中的叙述,在这种策略下,不会有多于corePoolSize的线程被创建,所以maximumPoolSize也就没有任何意义了。调用Executors.newFixedThreadPool生成的线程池是这个策略。
  3. 限制容量。如:指定容量的任何java.util.concurrent.BlockingQueue<E>在某些场景下(本文中将描述这种场景),需要指定等待队列的容量,以防止过多的资源消耗,比如如果使用不限容量的等待队列,当有大量的任务到来而池内又无空闲线程执行任务时,会有大量的任务堆积,这些任务都是某个类的对象,是要消耗内存的,就可能导致OOM。如何去平衡等待队列和线程池的大小要根据实际场景去断定,如果配置不当,可能会导致资源耗尽、线程上下文切换消耗、或者线程调度消耗。这些都会直接影响系统的吞吐。

7. 任务拒绝处理器:defaultHandler

如果任务被拒绝执行,则会调用这个对象上的RejectedExecutionHandler.rejectedExecution()方法,JDK定义了4种处理策略,用户可以自定义自己的任务处理策略。

8. 允许核心线程过期:allowCoreThreadTimeOut

上面说的所有情况都是基于这个变量为false(默认值)来说的,如果你的线程池已经不使用了(不被引用),但是其中还有活着的线程时,这个线程池是不会被回收的,这种情况就造成了内存泄漏——一块永远不会被访问到的内存却无法被GC回收。
用户可以通过在抛弃线程池引用的时候显式地调用shutdown()来释放它,或者将allowCoreThreadTimeOut设置为true,则在过期时间后,核心线程会被释放,则其会被GC回收。

四、如果线程死掉了怎么办

几乎所有Executors中生成线程池的方法的注释上,都有代表相同意思的一句话,表示如果线程池中的某个线程死掉了,线程池会生成一个新的线程代替它。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的注释。

If any thread terminates due to a failure during execution prior to
shutdown, a new one will take its place if needed to execute
subsequent tasks.

线程死亡的原因

我们都知道守护线程(daemon)会在所有的非守护线程都死掉之后也死掉,除此之外导致一个非守护线程死掉有以下几种可能:

  1. 自然死亡,Runnable.run()方法执行完后返回。
  2. 执行过程中有未捕获异常,被抛到了Runnable.run()之外,导致线程死亡。
  3. 其宿主死亡,进程关闭或者机器死机。在Java中通常是System.exit()方法被调用
  4. 其他硬件问题。

线程池要保证其高可用性,就必须保证线程的可用。如一个固定容量的线程池,其中一个线程死掉了,它必须要能监控到线程的死亡并生成一个新的线程来代替它。ThreadPoolExecutor中与线程相关的有这样几个概念:

  1. java.util.concurrent.ThreadFactory,在Executors中有两种ThreadFactory,但其提供的线程池只使用了一种java.util.concurrent.Executors.DefaultThreadFactory,它是简单的使用ThreadGroup来实现。
  2. java.lang.ThreadGroup,从Java1开始就存在的类,用来建立一个线程的树形结构,可以用它来组织线程间的关系,但其并没有对其包含的子线程的监控。
  3. java.util.concurrent.ThreadPoolExecutor.Worker,ThreadPoolExecutor对线程的封装,其中还包含了一些统计功能。

ThreadPoolExecutor中如何保障线程的可用

在ThreadPoolExecutor中使用了一个很巧妙的方法实现了对线程池中线程健康状况的监控,代码2是从ThreadPoolExecutor类源码中截取的一段代码,它们在一起说明了其对线程的监控。

可以看到,在ThreadPoolExecutor中的线程被封装成一个对象Worker,而将其中的run()代理到ThreadPoolExecutor中的runWorker(),在runWorker()方法中是一个获取任务并执行的死循环。如果任务的运行出了什么问题(如抛出未捕获异常),processWorkerExit()方法会被执行,同时传入的completedAbruptly参数为true,会重新添加一个初始任务为null的Worker,并随之启动一个新的线程。

//代码2
//ThreadPoolExecutor的动态内部类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    /** 对象中封装的线程 */
    final Thread thread;
    /** 第一个要运行的任务,可能为null. */
    Runnable firstTask;
    /** 任务计数器 */
    volatile long completedTasks;

    //省略其他代码

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

五、回到我的问题

由于各种各样的原因,我们并没有使用数据库自带的主从机制来做数据的复制,而是将主库的所有DML语句作为消息发送到读库(DTS),同时自己实现了数据的重放。第一版的数据同步服务十分简单,对于主库的DML消息处理和消费(写入读库)都是在一个线程内完成的.这么实现的优点是简单,但缺点是直接导致了表与表之间的数据同步会受到影响,如果有一个表A忽然来了很多的消息(往往是批量修改数据造成的),则会占住消息处理通道,影响其他业务数据的及时同步,同时单线程写库吞吐太小。

上文说到,首先想到的是使用线程池来做消息的消费,但是不能直接套用上边说的Executor框架,由于以下几个原因:

  1. ThreadPoolExecutor中默认所有的任务之间是不互相影响的,然而对于数据库的DML来说,消息的顺序不能被打乱,至少单表的消息顺序必须有序,不然会影响最终的数据一致。
  2. ThreadPoolExecutor中所有的线程共享一个等待队列,然而为了防止表与表之间的影响,每个线程应该有自己的任务等待队列。
  3. 写库操作的吞吐直接受到提交事务数的影响,所以此多线程框架要可以支持任务的合并。

重复造轮子是没有意义的,但是在我们这种场景下JDK中现有的Executor框架不符合要求,只能自己造轮子。

我的实现

首先把线程抽象成「DML语句的执行器(Executor)」。其中包含了一个Thread的实例,维护了自己的等待队列(限定容量的阻塞队列),和对应的消息执行逻辑。

除此之外还包含了一些简单的统计、线程健康监控、合并事务等处理。

Executor的对象实现了Thread.UncaughtExceptionHandler接口,并绑定到其工作线程上。同时ExecutorGroup也会再生成一个守护线程专门来守护池内所有线程,作为额外的保险措施。

把线程池的概念抽象成执行器组(ExecutorGroup),其中维护了执行器的数组,并维护了目标表到特定执行器的映射关系,并对外提供执行消息的接口,其主要代码如下:

//代码3
public class ExecutorGroup {

    Executor[] group = new Executor[NUM];
    Thread boss = null;
    Map<String, Integer> registeredTables = new HashMap<>(32);
//    AtomicInteger cursor = new AtomicInteger();
    volatile int cursor = 0;

    public ExecutorGroup(String name) {
        //init group
        for(int i = 0; i < NUM; i++) {
            logger.debug("启动线程{},{}", name, i);
            group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS);

        }
        startDaemonBoss(String.format("sync-executor-%s-boss", name));
    }

    //额外的保险
    private void startDaemonBoss(String name) {
        if (boss != null) {
            boss.interrupt();
        }
        boss = new Thread(() -> {
            while(true) {
                //休息一分钟。。。

                if (this.group != null) {
                    for (int i = 0; i < group.length; i++) {
                        Executor executor = group[i];
                        if (executor != null) {
                            executor.checkThread();
                        }
                    }
                }
            }

        });
        boss.setName(name);
        boss.setDaemon(true);
        boss.start();
    }
    public void execute(Message message){
        logger.debug("执行消息");

        //省略消息合法性验证

        if (!registeredTables.containsKey(taskKey)) {
            //已注册
//          registeredTables.put(taskKey, cursor.getAndIncrement());
            registeredTables.put(taskKey, cursor++ % NUM);
        }
        int index = registeredTables.get(taskKey);
        logger.debug("执行消息{},注册索引{}", taskKey, index);
        try {
            group[index].schedule(message);
        } catch (InterruptedException e) {
            logger.error("准备消息出错", e);
        }

    }

}

完成后整体的线程模型如下图所示:

图片 3

新的线程模型

Java1.7新加入的TransferQueue

Java1.7中提供了新的队列类型TransferQueue,但只提供了一个它的实现java.util.concurrent.LinkedTransferQueue<E>,它有更好的性能表现,可它是一个无容量限制的队列,而在我们的这个场景下必须要限制队列的容量,所以要自己实现一个有容量限制的队列。

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图