项目中的定时任务使用到了@Schedule,但是因为单体项目下可靠性差,也就是说如果服务器宕机了,整个项目就无法访问了

@Schedule是Spring框架提供的一种轻量级的定时任务注解,可以方便地在方法上标注定时任务的执行规则

如果是在集群环境下使用@Schedule会出现问题,如果项目部署在多台服务器上,每台服务器都会执行相同的定时任务,导致任务重复触发或数据冲突

为了避免这个问题,可以使用分布式锁的机制,让只有一个服务器能够获取到锁并执行定时任务,其他服务器则等待或跳过

分布式锁的实现方式:

使用setNX:需要考虑锁过期、续期等功能

public class RedisDistributedLock {

    private final RedisTemplate<String,Object> redisTemplate; // Redis操作模板
    private final ScheduledExecutorService scheduledExecutorService; // 定时任务执行器,用于锁的续期操作
    private final String lockKey; // 锁的key
    private final String requestId; // 请求标识,用于锁的加锁和释放
    private final long expireMillis; // 锁的过期时间
    private volatile boolean isLocked = false; // 锁是否被持有的标识

    public RedisDistributedLock(RedisTemplate<String,Object> redisTemplate, String lockKey, String requestId, long expireMillis) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.requestId = requestId;
        this.expireMillis = expireMillis;

        // 定时任务,每隔一定时间就对锁进行续期
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            if (isLocked) {
                // 如果锁被持有,则对锁进行续期操作
                redisTemplate.expire(lockKey, expireMillis, TimeUnit.MILLISECONDS);
            }
        }, expireMillis / 2, expireMillis / 2, TimeUnit.MILLISECONDS); // 每隔过期时间的一半对锁进行续期
    }

    /**
     * 获取分布式锁
     *
     * @return 是否获取到锁
     */
    public boolean acquireLock() {
        // Lua脚本,用于尝试获取锁
        String script = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";
        RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId, expireMillis);
        // 判断锁是否获取成功
        if (result != null && result == 1) {
            isLocked = true;
            return true;
        }
        return false;
    }

    /**
     * 释放分布式锁
     *
     * @return 是否释放成功
     */
    public boolean releaseLock() {
        // Lua脚本,用于尝试释放锁
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
        // 判断锁是否释放成功
        if (result != null && result == 1) {
            isLocked = false;
            scheduledExecutorService.shutdown(); // 关闭定时任务执行器
            return true;
        }
        return false;
    }

}

使用Redisson客户端,redisson

@Component
public class TaskScheduler {

    @Autowired
    private RedissonClient redissonClient;

    // 每天凌晨0点执行一次
    @Scheduled(cron = "0 0 0 * * ?")
    public void doTask() {
        // 获取分布式可重入锁
        RLock lock = redissonClient.getLock("task_lock");
        try {
            // 尝试加锁,获取不到返回false
            boolean res = lock.tryLock();
            if (res) {
                // 执行定时任务的逻辑
                System.out.println("do task...");
            } else {
                // 获取不到锁就跳过本次任务
                System.out.println("skip task...");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放锁(如果还持有)
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

对Spring的@Scheduled进行AOP增强

考虑到每一次都对定时任务手动加上分布式锁比较麻烦,所以使用AOP增强@Scheduled,在切面中进行加锁判断

创建一个NoneDistributedSchedule注解,表示不使用分布式任务调度

package cc.oolo.imgcdn.DistributedScheduled;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoneDistributedSchedule {
}
package cc.oolo.imgcdn.Aspect;

import cc.oolo.imgcdn.DistributedScheduled.NoneDistributedSchedule;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

@Component
@Aspect
public class ScheduleAspect {

    @Autowired
    private RedissonClient redissonClient;

    @Pointcut("@annotation(org.springframework.scheduling.annotation.Scheduled)")
    public void pointCut(){}

    @Around("pointCut()")
    public void around(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        Method method = methodSignature.getMethod();
        String key = method.getName();
        NoneDistributedSchedule noneDistributedSchedule = method.getAnnotation(NoneDistributedSchedule.class);
        if (noneDistributedSchedule == null){
            Scheduled scheduled = method.getAnnotation(Scheduled.class);
            String cron = scheduled.cron();
            Long time = getBetweenCron(cron);
            RLock lock = redissonClient.getLock(key);
            System.out.println(key);
            if (!lock.tryLock(1,time, TimeUnit.SECONDS)){
                return;
            }
            pjp.proceed();
        }else {
            pjp.proceed();
        }

    }

    public Long getBetweenCron(String cron){
        // 解析cron表达式
        final CronExpression cronExpression = CronExpression.parse(cron);
        // 获取当前时间
        final LocalDateTime now = LocalDateTime.now();
        // 获取下次执行时间
        final LocalDateTime next = cronExpression.next(now);
        // 计算两个时间之间的秒数
        return ChronoUnit.SECONDS.between(now, next);
    }

}
最后修改:2023 年 03 月 12 日
如果觉得我的文章对你有用,请随意赞赏