温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java如何实现时间轮算法

发布时间:2022-02-23 16:16:59 来源:亿速云 阅读:278 作者:iii 栏目:开发技术

这篇文章主要介绍“Java如何实现时间轮算法”,在日常操作中,相信很多人在Java如何实现时间轮算法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java如何实现时间轮算法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

考虑这样的一个场景,当前你有1000个任务,要让这1000个任务每隔几分钟触发某个操作。要是实现这样的需求,很多人第一想法就是弄一个定时器。但是1000个任务就是1000个定时器,一个定时器是一个线程。为了解决这个问题,就出现了时间轮算法。

时间轮

时间轮简介:时间轮方案将现实生活中的时钟概念引入到软件设计中,主要思路是定义一个时钟周期(比如时钟的12小时)和步长(比如时钟的一秒走一次),当指针每走一步的时候,会获取当前时钟刻度上挂载的任务并执行。

核心思想

  • 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度

  • 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)

  • 每个槽对应一个环形链表存储该时间应该被执行的任务

  • 需要一个线程去驱动指针运转,获取到期任务

以下给出java 简单手写版本实现

代码实现

时间轮主数据结构

/**  * @author apdoer  * @version 1.0  * @date 2021/3/22 19:31  */ @Slf4j public class TimeWheel {  /**   * 一个槽的时间间隔(时间轮最小刻度)   */  private long tickMs;  /**   * 时间轮大小(槽的个数)   */  private int wheelSize;  /**   * 一轮的时间跨度   */  private long interval;  private long currentTime;  /**   * 槽   */  private TimerTaskList[] buckets;  /**   * 上层时间轮   */  private volatile TimeWheel overflowWheel;  /**   * 一个timer只有一个delayqueue   */  private DelayQueue<TimerTaskList> delayQueue;  public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {   this.currentTime = currentTime;   this.tickMs = tickMs;   this.wheelSize = wheelSize;   this.interval = tickMs * wheelSize;   this.buckets = new TimerTaskList[wheelSize];   this.currentTime = currentTime - (currentTime % tickMs);   this.delayQueue = delayQueue;   for (int i = 0; i < wheelSize; i++) {    buckets[i] = new TimerTaskList();   }  }  public boolean add(TimerTaskEntry entry) {   long expiration = entry.getExpireMs();   if (expiration < tickMs + currentTime) {    //到期了    return false;   } else if (expiration < currentTime + interval) {    //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去    long virtualId = (expiration / tickMs);    int index = (int) (virtualId % wheelSize);    TimerTaskList bucket = buckets[index];    bucket.addTask(entry);    //设置bucket 过期时间    if (bucket.setExpiration(virtualId * tickMs)) {     //设好过期时间的bucket需要入队     delayQueue.offer(bucket);     return true;    }   } else {    //当前轮不能满足,需要扔到上一轮    TimeWheel timeWheel = getOverflowWheel();    return timeWheel.add(entry);   }   return false;  }  private TimeWheel getOverflowWheel() {   if (overflowWheel == null) {    synchronized (this) {     if (overflowWheel == null) {      overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);     }    }   }   return overflowWheel;  }  /**   * 推进指针   *   * @param timestamp   */  public void advanceLock(long timestamp) {   if (timestamp > currentTime + tickMs) {    currentTime = timestamp - (timestamp % tickMs);    if (overflowWheel != null) {     this.getOverflowWheel().advanceLock(timestamp);    }   }  } }

定时器接口

/**  * 定时器  * @author apdoer  * @version 1.0  * @date 2021/3/22 20:30  */ public interface Timer {  /**   * 添加一个新任务   *   * @param timerTask   */  void add(TimerTask timerTask);  /**   * 推动指针   *   * @param timeout   */  void advanceClock(long timeout);  /**   * 等待执行的任务   *   * @return   */  int size();  /**   * 关闭服务,剩下的无法被执行   */  void shutdown(); }

定时器实现

/**  * @author apdoer  * @version 1.0  * @date 2021/3/22 20:33  */ @Slf4j public class SystemTimer implements Timer {  /**   * 底层时间轮   */  private TimeWheel timeWheel;  /**   * 一个Timer只有一个延时队列   */  private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();  /**   * 过期任务执行线程   */  private ExecutorService workerThreadPool;  /**   * 轮询delayQueue获取过期任务线程   */  private ExecutorService bossThreadPool;  public SystemTimer() {   this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);   this.workerThreadPool = Executors.newFixedThreadPool(100);   this.bossThreadPool = Executors.newFixedThreadPool(1);   //20ms推动一次时间轮运转   this.bossThreadPool.submit(() -> {    for (; ; ) {     this.advanceClock(20);    }   });  }  public void addTimerTaskEntry(TimerTaskEntry entry) {   if (!timeWheel.add(entry)) {    //已经过期了    TimerTask timerTask = entry.getTimerTask();    log.info("=====任务:{} 已到期,准备执行============",timerTask.getDesc());    workerThreadPool.submit(timerTask);   }  }  @Override  public void add(TimerTask timerTask) {   log.info("=======添加任务开始====task:{}", timerTask.getDesc());   TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());   timerTask.setTimerTaskEntry(entry);   addTimerTaskEntry(entry);  }  /**   * 推动指针运转获取过期任务   *   * @param timeout 时间间隔   * @return   */  @Override  public synchronized void advanceClock(long timeout) {   try {    TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);    if (bucket != null) {     //推进时间     timeWheel.advanceLock(bucket.getExpiration());     //执行过期任务(包含降级)     bucket.clear(this::addTimerTaskEntry);    }   } catch (InterruptedException e) {    log.error("advanceClock error");   }  }  @Override  public int size() {   //todo   return 0;  }  @Override  public void shutdown() {   this.bossThreadPool.shutdown();   this.workerThreadPool.shutdown();   this.timeWheel = null;  } }

存储任务的环形链表

/**  * @author apdoer  * @version 1.0  * @date 2021/3/22 19:26  */ @Data @Slf4j class TimerTaskList implements Delayed {  /**   * TimerTaskList 环形链表使用一个虚拟根节点root   */  private TimerTaskEntry root = new TimerTaskEntry(null, -1);  {   root.next = root;   root.prev = root;  }  /**   * bucket的过期时间   */  private AtomicLong expiration = new AtomicLong(-1L);  public long getExpiration() {   return expiration.get();  }  /**   * 设置bucket的过期时间,设置成功返回true   *   * @param expirationMs   * @return   */  boolean setExpiration(long expirationMs) {   return expiration.getAndSet(expirationMs) != expirationMs;  }  public boolean addTask(TimerTaskEntry entry) {   boolean done = false;   while (!done) {    //如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止    entry.remove();    synchronized (this) {     if (entry.timedTaskList == null) {      //加到链表的末尾      entry.timedTaskList = this;      TimerTaskEntry tail = root.prev;      entry.prev = tail;      entry.next = root;      tail.next = entry;      root.prev = entry;      done = true;     }    }   }   return true;  }  /**   * 从 TimedTaskList 移除指定的 timerTaskEntry   *   * @param entry   */  public void remove(TimerTaskEntry entry) {   synchronized (this) {    if (entry.getTimedTaskList().equals(this)) {     entry.next.prev = entry.prev;     entry.prev.next = entry.next;     entry.next = null;     entry.prev = null;     entry.timedTaskList = null;    }   }  }  /**   * 移除所有   */  public synchronized void clear(Consumer<TimerTaskEntry> entry) {   TimerTaskEntry head = root.next;   while (!head.equals(root)) {    remove(head);    entry.accept(head);    head = root.next;   }   expiration.set(-1L);  }  @Override  public long getDelay(TimeUnit unit) {   return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));  }  @Override  public int compareTo(Delayed o) {   if (o instanceof TimerTaskList) {    return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());   }   return 0;  } }

存储任务的容器entry

/**  * @author apdoer  * @version 1.0  * @date 2021/3/22 19:26  */ @Data class TimerTaskEntry implements Comparable<TimerTaskEntry> {  private TimerTask timerTask;  private long expireMs;  volatile TimerTaskList timedTaskList;  TimerTaskEntry next;  TimerTaskEntry prev;  public TimerTaskEntry(TimerTask timedTask, long expireMs) {   this.timerTask = timedTask;   this.expireMs = expireMs;   this.next = null;   this.prev = null;  }  void remove() {   TimerTaskList currentList = timedTaskList;   while (currentList != null) {    currentList.remove(this);    currentList = timedTaskList;   }  }  @Override  public int compareTo(TimerTaskEntry o) {   return ((int) (this.expireMs - o.expireMs));  } }

任务包装类(这里也可以将工作任务以线程变量的方式去传入)

@Data @Slf4j class TimerTask implements Runnable {  /**   * 延时时间   */  private long delayMs;  /**   * 任务所在的entry   */  private TimerTaskEntry timerTaskEntry;  private String desc;  public TimerTask(String desc, long delayMs) {   this.desc = desc;   this.delayMs = delayMs;   this.timerTaskEntry = null;  }  public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {   // 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个   if (timerTaskEntry != null && timerTaskEntry != entry) {    timerTaskEntry.remove();   }   timerTaskEntry = entry;  }  public TimerTaskEntry getTimerTaskEntry() {   return timerTaskEntry;  }  @Override  public void run() {   log.info("============={}任务执行", desc);  } }

到此,关于“Java如何实现时间轮算法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI