在使用kafka或者rabbitmq的时候,需要进行消费的时候我们只需要在方法上面加上@RabbitListener或者@KafkaListener,然后就可以从消息队列里面获取到消息了。

    @KafkaListener(topicPattern = "topic")
    public void consumerTest(ConsumerRecord<String,Object> record){
        String value = record.value().toString();
        System.out.println("xfz:"+value);
    }

但是因为某些原因,我们不能引入像rabbitmq或者kafka这样的中间件,目前只有一个redis,所以需要使用redis实现消息队列,但是redis没有这样的注解,所以需要自己实现

首先定义注解@RedisMQListener

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisMQListener {

    //队列名称
    String queue() default "";

    //拉取消息时间间隔(毫秒)
    long interval() default 1;

    //每次拉取多少条消息
    int quantity() default 1;

}

使用示例

@Component
public class RedisMQConsumer {
    @RedisMQListener(queue = "test-1",interval = 2000,quantity = 2)
    public void redisMQ(RedisMQMessage redisMQMessage) {
        System.out.println(redisMQMessage.getQueue()+"队列收到消息:"+redisMQMessage);
    }
}

由于后续会使用到Spring初始化机制中的BeanPostProcessor,所以需要加上@Component注解,然后需要在Spring初始化的时候,保存所有被@RedisMQListener注解标注的类的方法、方法名、参数等信息,然后通过BeanPostProcessor的postProcessBeforeInitialization方法将信息保存至ArrayList中,后续消息监听器需要使用

public class RedisMQListenerMethod {
    //队列名称
    private String queue;
    //间隔时间
    private Long interval;
    //bean
    private Object bean;
    //beanName
    private String beanName;
    //目标方法,后续需要反射执行
    private Method targetMethod;
    //方法参数
    private String methodParameterClassName;
    //每秒执行次数
    private Integer quantity;

    public String getQueue() {
        return queue;
    }

    public void setQueue(String queue) {
        this.queue = queue;
    }

    public Object getBean() {
        return bean;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }

    public void setBean(Object bean) {
        this.bean = bean;
    }

    public String getBeanName() {
        return beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    public Method getTargetMethod() {
        return targetMethod;
    }

    public void setTargetMethod(Method targetMethod) {
        this.targetMethod = targetMethod;
    }

    public String getMethodParameterClassName() {
        return methodParameterClassName;
    }

    public void setMethodParameterClassName(String methodParameterClassName) {
        this.methodParameterClassName = methodParameterClassName;
    }

    public Long getInterval() {
        return interval;
    }

    public void setInterval(Long interval) {
        this.interval = interval;
    }
}
@Component
public class RedisListenerScanAnnotationProcessor implements BeanPostProcessor {

    public static final List<RedisMQListenerMethod> redisMQListenerMethodList = new ArrayList<>();

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Class<?> clazz = bean.getClass();
        Method[] methods = clazz.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(RedisMQListener.class)){
                RedisMQListener annotation = method.getAnnotation(RedisMQListener.class);
                RedisMQListenerMethod redisMQListenerMethod = new RedisMQListenerMethod();
                redisMQListenerMethod.setBean(bean);
                redisMQListenerMethod.setTargetMethod(method);
                redisMQListenerMethod.setBeanName(beanName);
                redisMQListenerMethod.setQuantity(annotation.quantity());
                redisMQListenerMethod.setInterval(annotation.interval());
                redisMQListenerMethod.setQueue(annotation.queue());
                redisMQListenerMethod.setMethodParameterClassName(method.getParameterTypes()[0].getName());
                redisMQListenerMethodList.add(redisMQListenerMethod);
            }
        }
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }

    public static List<RedisMQListenerMethod> getRedisMQListenerMethodList(){
        return redisMQListenerMethodList;
    }
}

在项目中通过ApplicationRunner 来启动消费队列线程

@Component
public class RedisMQMessageRegister implements ApplicationRunner {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;


    List<RedisMQListenerMethod> redisMQListenerMethodList = new ArrayList<>();


    @Override
    public void run(ApplicationArguments args) throws Exception {

        this.redisMQListenerMethodList = RedisListenerScanAnnotationProcessor.getRedisMQListenerMethodList();

        for (RedisMQListenerMethod redisMQListenerMethod : redisMQListenerMethodList) {
            new Thread(new Worker(
                    redisMQListenerMethod.getQueue(),
                    redisMQListenerMethod.getInterval(),
                    redisMQListenerMethod.getQuantity())).start();
        }
    }

    private class Worker implements Runnable {

        private String queueName = "";
        private Long time;
        private Integer quantity;

        @SneakyThrows
        @Override
        public void run() {
            while (true) {
                List<String> msg = multiRPopPipeline(queueName, quantity);
                List<RedisMQMessage> redisMQMessages = new ArrayList<>();
                for (String s : msg) {
                    redisMQMessages.add(JSON.parseObject(s,RedisMQMessage.class));
                }
                if (redisMQMessages.size() > 0){
                    for (RedisMQListenerMethod redisMQListenerMethod : redisMQListenerMethodList) {
                        if (redisMQListenerMethod.getQueue().equals(queueName)){
                            for (RedisMQMessage redisMQMessage : redisMQMessages) {
                                redisMQListenerMethod.getTargetMethod().invoke(redisMQListenerMethod.getBean(),redisMQMessage);
                            }
                        }
                    }
                }
                Thread.sleep(time);
            }
        }

        public Worker(String queueName, Long time, Integer quantity) {
            this.queueName = queueName;
            this.time = time;
            this.quantity = quantity;
        }
    }

    /**
     * 一次性pop出指定数量的数据
     * @param key       键
     * @param size      需要取出的元素个数
     * @return          返回取出的元素集合
     */
    public List<String> multiRPopPipeline(String key, int size) {
        // 获取当前队列里的值
        int curSize = Math.toIntExact(redisTemplate.opsForList().size(key));
        if (curSize == 0) {
            return Collections.emptyList();
        }
        // 判断操作次数
        List<String> collect = redisTemplate.executePipelined(new SessionCallback<String>() {
            @Override
            public String execute(RedisOperations redisOperations) throws DataAccessException {
                final int finalSize = Math.toIntExact(Math.min(curSize, size));
                for (int i = 0; i < finalSize; i++) {
                    redisOperations.opsForList().rightPop(key);
                }
                return null;
            }
        }).stream().map(obj -> (String) obj).collect(Collectors.toList());
        return collect;
    }
}

定义入队工具类

public class RedisMQSender {
    private RedisTemplate<String,Object> redisTemplate;

    public RedisMQSender(RedisTemplate<String,Object> redisTemplate){
        this.redisTemplate = redisTemplate;
    }

    public void send(String queue, Object msg){
        redisTemplate.opsForList().leftPush(queue,
                JSON.toJSONString(new RedisMQMessage()
                        .setQueue(queue)
                        .setLocalDateTime(LocalDateTime.now())
                        .setData(msg)));
    }
}

测试:

@Test
    void mqTest(){
        RedisMQSender redisMQSender = new RedisMQSender(redisTemplate);
        CommonInterfaceDTO commonInterfaceDTO = new CommonInterfaceDTO();
        commonInterfaceDTO.setMethod("测试发送消息");
        commonInterfaceDTO.setDepict("测试发送消息");
        commonInterfaceDTO.setTitle("测试发送消息");
        for (int i = 0; i < 2000; i++) {
            redisMQSender.send("test-1",commonInterfaceDTO);
            redisMQSender.send("test-2",commonInterfaceDTO);
        }

    }

消费者测试

@Component
public class RedisMQConsumer {
    @RedisMQListener(queue = "test-1",interval = 2000,quantity = 2)
    public void redisMQ(RedisMQMessage redisMQMessage) {
        System.out.println(redisMQMessage.getQueue()+"队列收到消息:"+redisMQMessage);
    }

    @RedisMQListener(queue = "test-2",interval = 2000,quantity = 2)
    public void redisMQ1(RedisMQMessage redisMQMessage) {
        System.out.println(redisMQMessage.getQueue()+"队列收到消息:"+ redisMQMessage);
    }


}

最后修改:2023 年 02 月 12 日
如果觉得我的文章对你有用,请随意赞赏