Code前端首页关于Code前端联系我们

Redis如何实现延迟处理的例子

terry 2年前 (2023-09-26) 阅读数 49 #数据库

在开发过程中,经常会遇到一些延迟任务的需求。例如

• 如果订单 30 分钟未付款,则会自动取消

• 订单生成 60 秒后,将向用户发送短信

对于上述任务,我们给个专业的名字来形容吧,即:延迟任务。

最近需要实现一个延迟处理功能。主要是消费来自Kafka的消息后,根据消息中的某个延迟字段进行延迟处理。在实际实施过程中,有很多事情需要考虑。记录如下。

实现过程

说到Java中的定时功能,首先想到的是Timer和ScheduledThreadPoolExecutor,但相比之下可以排除Timer。主要原因如下:

  • Timer使用的是绝对时间,系统时间的变化会对Timer产生一定的影响;ScheduledThreadPoolExecutor使用的是相对时间,所以不会存在这样的问题。
  • Timer 使用单线程来处理任务。长时间运行的任务会减慢其他任务的处理速度,而ScheduledThreadPoolExecutor可以调整线程数量。
  • Timer 不处理运行时异常。一旦某个任务触发了运行时异常,就会导致整个Timer崩溃。不过ScheduledThreadPoolExecutor已经捕获了运行时异常(可以在afterExecute()回调方法中处理),所以更安全。

1。ScheduledThreadPoolExecutor决定用ScheduledThreadPoolExecutor来实现,下一步就是编写代码(通用流程代码)。

主要延迟实现如下:

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new 
ThreadPoolExecutor.AbortPolicy());
//从消息中取出延迟时间及相关信息的代码略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
  @Override
  public void run() {
   //具体操作逻辑
  }},0,delayTime, TimeUnit.SECONDS);

其中NamedThreadFactory是我自定义的一个线程工厂。主要定义线程池名称并打印相关日志,方便后续问题分析。这里我就不介绍了。拒绝策略也继承了标准拒绝策略。

然后我测试了一下,发现满足目标要求的功能经过一定的延迟后才能执行。至此,该功能看起来已经完成了。

你可能会想:这太简单了。有什么好说的呢?然而,这种方法虽然易于实现,但存在一个潜在的问题。有什么问题?我们看一下ScheduledThreadPoolExecutor的源码:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
 super(corePoolSize, Integer.MAX_VALUE, 0, 
 TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

ScheduledThreadPoolExecutor因为他本身的延迟和循环特性,默认使用的是DelayWorkQueue。与我们通常使用的SingleThreadExecutor之类的结构不同,我们可以使用自己定义的LinkedBlockingQueue并设置队列大小。这就是问题所在。

DelayWrokQueue 是一个无界队列,我们​​的目标数据源是 kafka,一个高并发、高吞吐量的消息队列。很可能一段时间内会有大量的消息到达,导致OOM。在使用多线程的时候,我们绝对应该考虑OOM的可能性,因为OOM的后果往往是很严重的。系统OOM的解决方法通常是重新启动,这可能会导致用户数据丢失等无法挽回的问题。从编码阶段开始,必须使用尽可能最安全的手段来避免这些问题。

2。结合Redis和线程

这次我们改变思路,用Redis来帮我们缓冲,避免了OOM消息过多的问题。

相关redis zset api:

//添加元素
ZADD key score member [[score member] [score member] …]
//根据分值及限制数量查询
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//从zset中删除指定成员
ZREM key member [member …]

我们使用redis基础数据结构中的zset结构,并使用score来存储我们的目标发送时间的值。整体处理流程如下:

  • 数据存储第一步:9:10分钟 收到Kafka发来的一条订单消息,要求30分钟内有到货通知,我们在当前时间基础上加上30分钟,将其转换为时间戳作为a的分数。键为a的订单号存储在redis中。代码如下:
public void onMessage(String topic, String message) {
  String orderId;
		int delayTime = 0;
  try {
   Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
   }.getType());
   if (msgMap.isEmpty()) {
    return;
   }
   LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	 orderId = msgMap.get("orderId");
   if(StringUtils.isNotEmpty(orderId)){
    delayTime = Integer.parseInt(msgMap.get("delayTime"));
    Calendar calendar = Calendar.getInstance();
    //计算出预计发送时间
    calendar.add(Calendar.MINUTE, delayTime);
    long sendTime = calendar.getTimeInMillis();
    RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
    LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime);
   }
  } catch (Exception e) {
   LOGGER.info("onMessage 延时发送异常:{}", e);
  }
 }
  • 数据处理第二步:启动一个新线程。具体规划时间根据业务需求确定。我每 3 分钟运行一次。内部逻辑:从redis中获取一定量的zset数据。如何获得?使用 zrangeByScore 方法根据数据的分数对 zset 进行排序。当然你可以把这个时间段,这里从0到现在,随身携带去消费。需要注意的一点是,检索数据后,我们需要使用 zrem 方法从 zset 中删除检索到的数据。删除它以防止其他线程重复消耗数据。之后,执行下面的调度通知和其他相关逻辑。代码如下:
public void run(){
  //获取批量大小
  int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
  try {
   //批量获取离发送时间最近的orderNum条数据
	 Calendar calendar = Calendar.getInstance();
	 long now = calendar.getTimeInMillis();
	 //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况)
	 Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
	 LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
   if (CollectionUtils.isNotEmpty(orders)){
    //删除key 防止重复发送
    for (String orderId : orderIds) {
     RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
    }
	  //接下来执行发送等业务逻辑     
   }
  } catch (Exception e) {
   LOGGER.warn("task.run exception:{}", e);
  }
 }

至此依赖redis和线程的延迟发送功能就完成了。

结论

我们来对比一下以上两种不同实现方式的优缺点:

  • 第一种方式实现简单,不依赖外部元件,可以快速实现目标功能,但缺点也很清楚。应该在特定场景下使用。如果在像我这样有大量消息的情况下使用,可能会出现问题。如果数据源不包含很多消息,这显然是一个不错的选择。
  • 第二种方法实现起来稍微复杂一些,但是可以适应消息量较大的场景。它利用Redis的zset作为“中间件”作用,帮助我们实现能够更好适应高并发的延迟功能。在这种场景下,缺点是写入过程中需要考虑很多实际因素,比如线程执行周期时间、发送可能存在一定延迟、批量数据大小设置等。

综上所述,这是对本次延迟功能的两种实现方式的总结。使用哪种方法取决于实际情况。我希望它对每个人都有用。

版权声明

本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

热门