项目中的定时任务使用到了@Schedule,但是因为单体项目下可靠性差,也就是说如果服务器宕机了,整个项目就无法访问了
@Schedule是Spring框架提供的一种轻量级的定时任务注解,可以方便地在方法上标注定时任务的执行规则
如果是在集群环境下使用@Schedule会出现问题,如果项目部署在多台服务器上,每台服务器都会执行相同的定时任务,导致任务重复触发或数据冲突
为了避免这个问题,可以使用分布式锁的机制,让只有一个服务器能够获取到锁并执行定时任务,其他服务器则等待或跳过
分布式锁的实现方式:
使用setNX:需要考虑锁过期、续期等功能
public class RedisDistributedLock {
private final RedisTemplate<String,Object> redisTemplate; // Redis操作模板
private final ScheduledExecutorService scheduledExecutorService; // 定时任务执行器,用于锁的续期操作
private final String lockKey; // 锁的key
private final String requestId; // 请求标识,用于锁的加锁和释放
private final long expireMillis; // 锁的过期时间
private volatile boolean isLocked = false; // 锁是否被持有的标识
public RedisDistributedLock(RedisTemplate<String,Object> redisTemplate, String lockKey, String requestId, long expireMillis) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.requestId = requestId;
this.expireMillis = expireMillis;
// 定时任务,每隔一定时间就对锁进行续期
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
if (isLocked) {
// 如果锁被持有,则对锁进行续期操作
redisTemplate.expire(lockKey, expireMillis, TimeUnit.MILLISECONDS);
}
}, expireMillis / 2, expireMillis / 2, TimeUnit.MILLISECONDS); // 每隔过期时间的一半对锁进行续期
}
/**
* 获取分布式锁
*
* @return 是否获取到锁
*/
public boolean acquireLock() {
// Lua脚本,用于尝试获取锁
String script = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId, expireMillis);
// 判断锁是否获取成功
if (result != null && result == 1) {
isLocked = true;
return true;
}
return false;
}
/**
* 释放分布式锁
*
* @return 是否释放成功
*/
public boolean releaseLock() {
// Lua脚本,用于尝试释放锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
// 判断锁是否释放成功
if (result != null && result == 1) {
isLocked = false;
scheduledExecutorService.shutdown(); // 关闭定时任务执行器
return true;
}
return false;
}
}
使用Redisson客户端,redisson
@Component
public class TaskScheduler {
@Autowired
private RedissonClient redissonClient;
// 每天凌晨0点执行一次
@Scheduled(cron = "0 0 0 * * ?")
public void doTask() {
// 获取分布式可重入锁
RLock lock = redissonClient.getLock("task_lock");
try {
// 尝试加锁,获取不到返回false
boolean res = lock.tryLock();
if (res) {
// 执行定时任务的逻辑
System.out.println("do task...");
} else {
// 获取不到锁就跳过本次任务
System.out.println("skip task...");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁(如果还持有)
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
对Spring的@Scheduled进行AOP增强
考虑到每一次都对定时任务手动加上分布式锁比较麻烦,所以使用AOP增强@Scheduled,在切面中进行加锁判断
创建一个NoneDistributedSchedule注解,表示不使用分布式任务调度
package cc.oolo.imgcdn.DistributedScheduled;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoneDistributedSchedule {
}
package cc.oolo.imgcdn.Aspect;
import cc.oolo.imgcdn.DistributedScheduled.NoneDistributedSchedule;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
@Component
@Aspect
public class ScheduleAspect {
@Autowired
private RedissonClient redissonClient;
@Pointcut("@annotation(org.springframework.scheduling.annotation.Scheduled)")
public void pointCut(){}
@Around("pointCut()")
public void around(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
String key = method.getName();
NoneDistributedSchedule noneDistributedSchedule = method.getAnnotation(NoneDistributedSchedule.class);
if (noneDistributedSchedule == null){
Scheduled scheduled = method.getAnnotation(Scheduled.class);
String cron = scheduled.cron();
Long time = getBetweenCron(cron);
RLock lock = redissonClient.getLock(key);
System.out.println(key);
if (!lock.tryLock(1,time, TimeUnit.SECONDS)){
return;
}
pjp.proceed();
}else {
pjp.proceed();
}
}
public Long getBetweenCron(String cron){
// 解析cron表达式
final CronExpression cronExpression = CronExpression.parse(cron);
// 获取当前时间
final LocalDateTime now = LocalDateTime.now();
// 获取下次执行时间
final LocalDateTime next = cronExpression.next(now);
// 计算两个时间之间的秒数
return ChronoUnit.SECONDS.between(now, next);
}
}