
上一篇我们讲了定时器的几种实现,分析了在大数据量高并发的场景下这几种实现方式就有点力不从心了,从而引出时间轮这种数据结构。在netty 和kafka 这两种优秀的中间件中,都有时间轮的实现。文章最后,我们模拟kafka 中scala 的代码实现java版的时间轮。
Netty 的时间轮实现
接口定义
Netty 的实现自定义了一个超时器的接口io.netty.util.Timer,其方法如下:
- publicinterfaceTimer
- {
- //新增一个延时任务,入参为定时任务TimerTask,和对应的延迟时间
- TimeoutnewTimeout(TimerTasktask,longdelay,TimeUnitunit);
- //停止时间轮的运行,并且返回所有未被触发的延时任务
- Set<Timeout>stop();
- }
- publicinterfaceTimeout
- {
- Timertimer();
- TimerTasktask();
- booleanisExpired();
- booleanisCancelled();
- booleancancel();
- }
Timeout接口是对延迟任务的一个封装,其接口方法说明其实现内部需要维持该延迟任务的状态。后续我们分析其实现内部代码时可以更容易的看到。
Timer接口有唯一实现HashedWheelTimer。首先来看其构造方法,如下:
- publicHashedWheelTimer(ThreadFactorythreadFactory,longtickDuration,TimeUnitunit,intticksPerWheel,booleanleakDetection,longmaxPendingTimeouts)
- {
- //省略代码,省略参数非空检查内容。
- wheel=createWheel(ticksPerWheel);
- mask=wheel.length-1;
- //省略代码,省略槽位时间范围检查,避免溢出以及小于1毫秒。
- workerThread=threadFactory.newThread(worker);
- //省略代码,省略资源泄漏追踪设置以及时间轮实例个数检查
- }
mask 的设计和HashMap一样,通过限制数组的大小为2的次方,利用位运算来替代取模运算,提高性能。
构建循环数组
首先是方法createWheel,用于创建时间轮的核心数据结构,循环数组。来看下其方法内容
- privatestaticHashedWheelBucket[]createWheel(intticksPerWheel)
- {
- //省略代码,确认ticksPerWheel处于正确的区间
- //将ticksPerWheel规范化为2的次方幂大小。
- ticksPerWheel=normalizeTicksPerWheel(ticksPerWheel);
- HashedWheelBucket[]wheel=newHashedWheelBucket[ticksPerWheel];
- for(inti=0;i<wheel.length;i++)
- {
- wheel[i]=newHashedWheelBucket();
- }
- returnwheel;
- }
数组的长度为 2 的次方幂方便进行求商和取余计算。
HashedWheelBucket内部存储着由HashedWheelTimeout节点构成的双向链表,并且存储着链表的头节点和尾结点,方便于任务的提取和插入。
新增延迟任务
方法HashedWheelTimer#newTimeout用于新增延迟任务,下面来看下代码:
- publicTimeoutnewTimeout(TimerTasktask,longdelay,TimeUnitunit)
- {
- //省略代码,用于参数检查
- start();
- longdeadline=System.nanoTime()+unit.toNanos(delay)-startTime;
- if(delay>0&&deadline<0)
- {
- deadline=Long.MAX_VALUE;
- }
- HashedWheelTimeouttimeout=newHashedWheelTimeout(this,task,deadline);
- timeouts.add(timeout);
- returntimeout;
- }
可以看到任务并没有直接添加到时间轮中,而是先入了一个 mpsc 队列,我简单说下 mpsc【多生产者单一消费者队列】 是 JCTools 中的并发队列,用在多个生产者可同时访问队列,但只有一个消费者会访问队列的情况。,采用这个模式主要出于提升并发性能考虑,因为这个队列只有线程workerThread会进行任务提取操作。
工作线程如何执行
- publicvoidrun()
- {
- {//代码块①
- startTime=System.nanoTime();
- if(startTime==0)
- {
- //使用startTime==0作为线程进入工作状态模式标识,因此这里重新赋值为1
- startTime=1;
- }
- //通知外部初始化工作线程的线程,工作线程已经启动完毕
- startTimeInitialized.countDown();
- }
- {//代码块②
- do{
- finallongdeadline=waitForNextTick();
- if(deadline>0)
- {
- intidx=(int)(tick&mask);
- processCancelledTasks();
- HashedWheelBucketbucket=wheel[idx];
- transferTimeoutsToBuckets();
- bucket.expireTimeouts(deadline);
- tick++;
- }
- }while(WORKER_STATE_UPDATER.get(HashedWheelTimer.this)==WORKER_STATE_STARTED);
- }
- {//代码块③
- for(HashedWheelBucketbucket:wheel)
- {
- bucket.clearTimeouts(unprocessedTimeouts);
- }
- for(;;)
- {
- HashedWheelTimeouttimeout=timeouts.poll();
- if(timeout==null)
- {
- break;
- }
- if(!timeout.isCancelled())
- {
- unprocessedTimeouts.add(timeout);
- }
- }
- processCancelledTasks();
- }
- }
看 waitForNextTick,是如何得到下一次执行时间的。
- privatelongwaitForNextTick()
- {
- longdeadline=tickDuration*(tick+1);//计算下一次需要检查的时间
- for(;;)
- {
- finallongcurrentTime=System.nanoTime()-startTime;
- longsleepTimeMs=(deadline-currentTime+999999)/1000000;
- if(sleepTimeMs<=0)//说明时间已经到了
- {
- if(currentTime==Long.MIN_VALUE)
- {
- return-Long.MAX_VALUE;
- }
- else
- {
- returncurrentTime;
- }
- }
- //windows下有bugsleep必须是10的倍数
- if(PlatformDependent.isWindows())
- {
- sleepTimeMs=sleepTimeMs/10*10;
- }
- try
- {
- Thread.sleep(sleepTimeMs);//等待时间到来
- }
- catch(InterruptedExceptionignored)
- {
- if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this)==WORKER_STATE_SHUTDOWN)
- {
- returnLong.MIN_VALUE;
- }
- }
- }
- }
简单的说就是通过 tickDuration 和此时已经滴答的次数算出下一次需要检查的时间,时候未到就sleep等着。
任务如何入槽的。
- privatevoidtransferTimeoutsToBuckets(){
- //最多处理100000怕任务延迟
- for(inti=0;i<100000;++i){
- //从队列里面拿出任务呢
- HashedWheelTimer.HashedWheelTimeouttimeout=(HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
- if(timeout==null){
- break;
- }
- if(timeout.state()!=1){
- longcalculated=timeout.deadline/HashedWheelTimer.this.tickDuration;
- //计算排在第几轮
- timeout.remainingRounds=(calculated-this.tick)/(long)HashedWheelTimer.this.wheel.length;
- longticks=Math.max(calculated,this.tick);
- //计算放在哪个槽中
- intstopIndex=(int)(ticks&(long)HashedWheelTimer.this.mask);
- HashedWheelTimer.HashedWheelBucketbucket=HashedWheelTimer.this.wheel[stopIndex];
- //入槽,就是链表入队列
- bucket.addTimeout(timeout);
- }
- }
- }
如何执行的
- publicvoidexpireTimeouts(longdeadline){
- HashedWheelTimer.HashedWheelTimeoutnext;
- //拿到槽的链表头部
- for(HashedWheelTimer.HashedWheelTimeouttimeout=this.head;timeout!=null;timeout=next){
- booleanremove=false;
- if(timeout.remainingRounds<=0L){//如果到这轮l
- if(timeout.deadline>deadline){
- thrownewIllegalStateException(String.format("timeout.deadline(%d)>deadline(%d)",timeout.deadline,deadline));
- }
- timeout.expire();//执行
- remove=true;
- }elseif(timeout.isCancelled()){
- remove=true;
- }else{
- --timeout.remainingRounds;//轮数-1
- }
- next=timeout.next;//继续下一任务
- if(remove){
- this.remove(timeout);//移除完成的任务
- }
- }
- }
就是通过轮数和时间双重判断,执行完了移除任务。
小结一下
总体上看 Netty 的实现就是上文说的时间轮通过轮数的实现,完全一致。可以看出时间精度由 TickDuration 把控,并且工作线程的除了处理执行到时的任务还做了其他操作,因此任务不一定会被精准的执行。
而且任务的执行如果不是新起一个线程,或者将任务扔到线程池执行,那么耗时的任务会阻塞下个任务的执行。
并且会有很多无用的 tick 推进,例如 TickDuration 为1秒,此时就一个延迟350秒的任务,那就是有349次无用的操作。出现空推。
但是从另一面来看,如果任务都执行很快(当然你也可以异步执行),并且任务数很多,通过分批执行,并且增删任务的时间复杂度都是O(1)来说。时间轮还是比通过优先队列实现的延时任务来的合适些。
Kafka 中的时间轮
上面我们说到 Kafka 中的时间轮是多层次时间轮实现,总的而言实现和上述说的思路一致。不过细节有些不同,并且做了点优化。
先看看添加任务的方法。在添加的时候就设置任务执行的绝对时间。
Kafka 中的时间轮
上面我们说到 Kafka 中的时间轮是多层次时间轮实现,总的而言实现和上述说的思路一致。不过细节有些不同,并且做了点优化。
先看看添加任务的方法。在添加的时候就设置任务执行的绝对时间。
- defadd(timerTaskEntry:TimerTaskEntry):Boolean={
- valexpiration=timerTaskEntry.expirationMs
- if(timerTaskEntry.cancelled){
- //Cancelled
- false
- }elseif(expiration<currentTime+tickMs){
- //如果已经到期返回false
- //Alreadyexpired
- false
- }elseif(expiration<currentTime+interval){//如果在本层范围内
- //Putinitsownbucket
- valvirtualId=expiration/tickMs
- valbucket=buckets((virtualId%wheelSize.toLong).toInt)//计算槽位
- bucket.add(timerTaskEntry)//添加到槽内双向链表中
- //Setthebucketexpirationtime
- if(bucket.setExpiration(virtualId*tickMs)){//更新槽时间
- //Thebucketneedstobeenqueuedbecauseitwasanexpiredbucket
- //Weonlyneedtoenqueuethebucketwhenitsexpirationtimehaschanged,i.e.thewheelhasadvanced
- //andthepreviousbucketsgetsreused;furthercallstosettheexpirationwithinthesamewheelcycle
- //willpassinthesamevalueandhencereturnfalse,thusthebucketwiththesameexpirationwillnot
- //beenqueuedmultipletimes.
- queue.offer(bucket)//将槽加入DelayQueue,由DelayQueue来推进执行
- }
- true
- }else{
- //如果超过本层能表示的延迟时间,则将任务添加到上层。这里看到上层是按需创建的。
- //Outoftheinterval.Putitintotheparenttimer
- if(overflowWheel==null)addOverflowWheel()
- overflowWheel.add(timerTaskEntry)
- }
- }
那么时间轮是如何推动的呢?Netty 中是通过固定的时间间隔扫描,时候未到就等待来进行时间轮的推动。上面我们分析到这样会有空推进的情况。
而 Kafka 就利用了空间换时间的思想,通过 DelayQueue,来保存每个槽,通过每个槽的过期时间排序。这样拥有最早需要执行任务的槽会有优先获取。如果时候未到,那么 delayQueue.poll 就会阻塞着,这样就不会有空推进的情况发送。
我们来看下推进的方法。
- defadvanceClock(timeoutMs:Long):Boolean={
- //从延迟队列中获取槽
- varbucket=delayQueue.poll(timeoutMs,TimeUnit.MILLISECONDS)
- if(bucket!=null){
- writeLock.lock()
- try{
- while(bucket!=null){
- //更新每层时间轮的currentTime
- timingWheel.advanceClock(bucket.getExpiration())
- //因为更新了currentTime,进行一波任务的重新插入,来实现任务时间轮的降级
- bucket.flush(reinsert)
- //获取下一个槽
- bucket=delayQueue.poll()
- }
- }finally{
- writeLock.unlock()
- }
- true
- }else{
- false
- }
- }
- //Trytoadvancetheclock
- defadvanceClock(timeMs:Long):Unit={
- if(timeMs>=currentTime+tickMs){
- //必须是tickMs整数倍
- currentTime=timeMs-(timeMs%tickMs)
- //推动上层时间轮也更新currentTime
- //Trytoadvancetheclockoftheoverflowwheelifpresent
- if(overflowWheel!=null)overflowWheel.advanceClock(currentTime)
- }
- }
从上面的 add 方法我们知道每次对比都是根据expiration < currentTime + interval 来进行对比的,而advanceClock 就是用来推进更新 currentTime 的。
小结一下
Kafka 用了多层次时间轮来实现,并且是按需创建时间轮,采用任务的绝对时间来判断延期,并且对于每个槽(槽内存放的也是任务的双向链表)都会维护一个过期时间,利用 DelayQueue 来对每个槽的过期时间排序,来进行时间的推进,防止空推进的存在。
每次推进都会更新 currentTime 为当前时间戳,当然做了点微调使得 currentTime 是 tickMs 的整数倍。并且每次推进都会把能降级的任务重新插入降级。
可以看到这里的 DelayQueue 的元素是每个槽,而不是任务,因此数量就少很多了,这应该是权衡了对于槽操作的延时队列的时间复杂度与空推进的影响。
模拟kafka的时间轮实现java版
定时器
- publicclassTimer{
- /**
- *底层时间轮
- */
- privateTimeWheeltimeWheel;
- /**
- *一个Timer只有一个delayQueue
- */
- privateDelayQueue<TimerTaskList>delayQueue=newDelayQueue<>();
- /**
- *过期任务执行线程
- */
- privateExecutorServiceworkerThreadPool;
- /**
- *轮询delayQueue获取过期任务线程
- */
- privateExecutorServicebossThreadPool;
- /**
- *构造函数
- */
- publicTimer(){
- timeWheel=newTimeWheel(1000,2,System.currentTimeMillis(),delayQueue);
- workerThreadPool=Executors.newFixedThreadPool(100);
- bossThreadPool=Executors.newFixedThreadPool(1);
- //20ms获取一次过期任务
- bossThreadPool.submit(()->{
- while(true){
- this.advanceClock(1000);
- }
- });
- }
- /**
- *添加任务
- */
- publicvoidaddTask(TimerTasktimerTask){
- //添加失败任务直接执行
- if(!timeWheel.addTask(timerTask)){
- workerThreadPool.submit(timerTask.getTask());
- }
- }
- /**
- *获取过期任务
- */
- privatevoidadvanceClock(longtimeout){
- try{
- TimerTaskListtimerTaskList=delayQueue.poll(timeout,TimeUnit.MILLISECONDS);
- if(timerTaskList!=null){
- //推进时间
- timeWheel.advanceClock(timerTaskList.getExpiration());
- //执行过期任务(包含降级操作)
- timerTaskList.flush(this::addTask);
- }
- }catch(Exceptione){
- e.printStackTrace();
- }
- }
- }
任务
- publicclassTimerTask{
- /**
- *延迟时间
- */
- privatelongdelayMs;
- /**
- *任务
- */
- privateMyThreadtask;
- /**
- *时间槽
- */
- protectedTimerTaskListtimerTaskList;
- /**
- *下一个节点
- */
- protectedTimerTasknext;
- /**
- *上一个节点
- */
- protectedTimerTaskpre;
- /**
- *描述
- */
- publicStringdesc;
- publicTimerTask(longdelayMs,MyThreadtask){
- this.delayMs=System.currentTimeMillis()+delayMs;
- this.task=task;
- this.timerTaskList=null;
- this.next=null;
- this.pre=null;
- }
- publicMyThreadgetTask(){
- returntask;
- }
- publiclonggetDelayMs(){
- returndelayMs;
- }
- @Override
- publicStringtoString(){
- returndesc;
- }
- }
时间槽
- publicclassTimerTaskListimplementsDelayed{
- /**
- *过期时间
- */
- privateAtomicLongexpiration=newAtomicLong(-1L);
- /**
- *根节点
- */
- privateTimerTaskroot=newTimerTask(-1L,null);
- {
- root.pre=root;
- root.next=root;
- }
- /**
- *设置过期时间
- */
- publicbooleansetExpiration(longexpire){
- returnexpiration.getAndSet(expire)!=expire;
- }
- /**
- *获取过期时间
- */
- publiclonggetExpiration(){
- returnexpiration.get();
- }
- /**
- *新增任务
- */
- publicvoidaddTask(TimerTasktimerTask){
- synchronized(this){
- if(timerTask.timerTaskList==null){
- timerTask.timerTaskList=this;
- TimerTasktail=root.pre;
- timerTask.next=root;
- timerTask.pre=tail;
- tail.next=timerTask;
- root.pre=timerTask;
- }
- }
- }
- /**
- *移除任务
- */
- publicvoidremoveTask(TimerTasktimerTask){
- synchronized(this){
- if(timerTask.timerTaskList.equals(this)){
- timerTask.next.pre=timerTask.pre;
- timerTask.pre.next=timerTask.next;
- timerTask.timerTaskList=null;
- timerTask.next=null;
- timerTask.pre=null;
- }
- }
- }
- /**
- *重新分配
- */
- publicsynchronizedvoidflush(Consumer<TimerTask>flush){
- TimerTasktimerTask=root.next;
- while(!timerTask.equals(root)){
- this.removeTask(timerTask);
- flush.accept(timerTask);
- timerTask=root.next;
- }
- expiration.set(-1L);
- }
- @Override
- publiclonggetDelay(TimeUnitunit){
- returnMath.max(0,unit.convert(expiration.get()-System.currentTimeMillis(),TimeUnit.MILLISECONDS));
- }
- @Override
- publicintcompareTo(Delayedo){
- if(oinstanceofTimerTaskList){
- returnLong.compare(expiration.get(),((TimerTaskList)o).expiration.get());
- }
- return0;
- }
- }
时间轮
- publicclassTimeWheel{
- /**
- *一个时间槽的范围
- */
- privatelongtickMs;
- /**
- *时间轮大小
- */
- privateintwheelSize;
- /**
- *时间跨度
- */
- privatelonginterval;
- /**
- *时间槽
- */
- privateTimerTaskList[]timerTaskLists;
- /**
- *当前时间
- */
- privatelongcurrentTime;
- /**
- *上层时间轮
- */
- privatevolatileTimeWheeloverflowWheel;
- /**
- *一个Timer只有一个delayQueue
- */
- privateDelayQueue<TimerTaskList>delayQueue;
- publicTimeWheel(longtickMs,intwheelSize,longcurrentTime,DelayQueue<TimerTaskList>delayQueue){
- this.currentTime=currentTime;
- this.tickMs=tickMs;
- this.wheelSize=wheelSize;
- this.interval=tickMs*wheelSize;
- this.timerTaskLists=newTimerTaskList[wheelSize];
- //currentTime为tickMs的整数倍这里做取整操作
- this.currentTime=currentTime-(currentTime%tickMs);
- this.delayQueue=delayQueue;
- for(inti=0;i<wheelSize;i++){
- timerTaskLists[i]=newTimerTaskList();
- }
- }
- /**
- *创建或者获取上层时间轮
- */
- privateTimeWheelgetOverflowWheel(){
- if(overflowWheel==null){
- synchronized(this){
- if(overflowWheel==null){
- overflowWheel=newTimeWheel(interval,wheelSize,currentTime,delayQueue);
- }
- }
- }
- returnoverflowWheel;
- }
- /**
- *添加任务到时间轮
- */
- publicbooleanaddTask(TimerTasktimerTask){
- longexpiration=timerTask.getDelayMs();
- //过期任务直接执行
- if(expiration<currentTime+tickMs){
- returnfalse;
- }elseif(expiration<currentTime+interval){
- //当前时间轮可以容纳该任务加入时间槽
- LongvirtualId=expiration/tickMs;
- intindex=(int)(virtualId%wheelSize);
- System.out.println("tickMs:"+tickMs+"------index:"+index+"------expiration:"+expiration);
- TimerTaskListtimerTaskList=timerTaskLists[index];
- timerTaskList.addTask(timerTask);
- if(timerTaskList.setExpiration(virtualId*tickMs)){
- //添加到delayQueue中
- delayQueue.offer(timerTaskList);
- }
- }else{
- //放到上一层的时间轮
- TimeWheeltimeWheel=getOverflowWheel();
- timeWheel.addTask(timerTask);
- }
- returntrue;
- }
- /**
- *推进时间
- */
- publicvoidadvanceClock(longtimestamp){
- if(timestamp>=currentTime+tickMs){
- currentTime=timestamp-(timestamp%tickMs);
- if(overflowWheel!=null){
- //推进上层时间轮时间
- System.out.println("推进上层时间轮时间time="+System.currentTimeMillis());
- this.getOverflowWheel().advanceClock(timestamp);
- }
- }
- }
- }
我们来模拟一个请求,超时和不超时的情况
首先定义一个Mythread 类,用于设置任务超时的值。
- publicclassMyThreadimplementsRunnable{
- CompletableFuture<String>cf;
- publicMyThread(CompletableFuture<String>cf){
- this.cf=cf;
- }
- publicvoidrun(){
- if(!cf.isDone()){
- cf.complete("超时");
- }
- }
- }
模拟超时
- publicstaticvoidmain(String[]args)throwsException{
- Timertimer=newTimer();
- CompletableFuture<String>base=CompletableFuture.supplyAsync(()->{
- try{
- Thread.sleep(3000);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- return"正常返回";
- });
- TimerTasktimerTask2=newTimerTask(1000,newMyThread(base));
- timer.addTask(timerTask2);
- System.out.println("base.get==="+base.get());
- }

模拟正常返回
- publicstaticvoidmain(String[]args)throwsException{
- Timertimer=newTimer();
- CompletableFuture<String>base=CompletableFuture.supplyAsync(()->{
- try{
- Thread.sleep(300);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- return"正常返回";
- });
- TimerTasktimerTask2=newTimerTask(2000,newMyThread(base));
- timer.addTask(timerTask2);
- System.out.println("base.get==="+base.get());
- }

原文地址:https://mp.weixin.qq.com/s/8uCN4OL3S1aoT8ff_2QXhQ








发表评论
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。