在使用kafka或者rabbitmq的时候,需要进行消费的时候我们只需要在方法上面加上@RabbitListener或者@KafkaListener,然后就可以从消息队列里面获取到消息了。
@KafkaListener(topicPattern = "topic")
public void consumerTest(ConsumerRecord<String,Object> record){
String value = record.value().toString();
System.out.println("xfz:"+value);
}
但是因为某些原因,我们不能引入像rabbitmq或者kafka这样的中间件,目前只有一个redis,所以需要使用redis实现消息队列,但是redis没有这样的注解,所以需要自己实现
首先定义注解@RedisMQListener
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisMQListener {
//队列名称
String queue() default "";
//拉取消息时间间隔(毫秒)
long interval() default 1;
//每次拉取多少条消息
int quantity() default 1;
}
使用示例
@Component
public class RedisMQConsumer {
@RedisMQListener(queue = "test-1",interval = 2000,quantity = 2)
public void redisMQ(RedisMQMessage redisMQMessage) {
System.out.println(redisMQMessage.getQueue()+"队列收到消息:"+redisMQMessage);
}
}
由于后续会使用到Spring初始化机制中的BeanPostProcessor,所以需要加上@Component注解,然后需要在Spring初始化的时候,保存所有被@RedisMQListener注解标注的类的方法、方法名、参数等信息,然后通过BeanPostProcessor的postProcessBeforeInitialization方法将信息保存至ArrayList中,后续消息监听器需要使用
public class RedisMQListenerMethod {
//队列名称
private String queue;
//间隔时间
private Long interval;
//bean
private Object bean;
//beanName
private String beanName;
//目标方法,后续需要反射执行
private Method targetMethod;
//方法参数
private String methodParameterClassName;
//每秒执行次数
private Integer quantity;
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public Object getBean() {
return bean;
}
public Integer getQuantity() {
return quantity;
}
public void setQuantity(Integer quantity) {
this.quantity = quantity;
}
public void setBean(Object bean) {
this.bean = bean;
}
public String getBeanName() {
return beanName;
}
public void setBeanName(String beanName) {
this.beanName = beanName;
}
public Method getTargetMethod() {
return targetMethod;
}
public void setTargetMethod(Method targetMethod) {
this.targetMethod = targetMethod;
}
public String getMethodParameterClassName() {
return methodParameterClassName;
}
public void setMethodParameterClassName(String methodParameterClassName) {
this.methodParameterClassName = methodParameterClassName;
}
public Long getInterval() {
return interval;
}
public void setInterval(Long interval) {
this.interval = interval;
}
}
@Component
public class RedisListenerScanAnnotationProcessor implements BeanPostProcessor {
public static final List<RedisMQListenerMethod> redisMQListenerMethodList = new ArrayList<>();
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Class<?> clazz = bean.getClass();
Method[] methods = clazz.getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(RedisMQListener.class)){
RedisMQListener annotation = method.getAnnotation(RedisMQListener.class);
RedisMQListenerMethod redisMQListenerMethod = new RedisMQListenerMethod();
redisMQListenerMethod.setBean(bean);
redisMQListenerMethod.setTargetMethod(method);
redisMQListenerMethod.setBeanName(beanName);
redisMQListenerMethod.setQuantity(annotation.quantity());
redisMQListenerMethod.setInterval(annotation.interval());
redisMQListenerMethod.setQueue(annotation.queue());
redisMQListenerMethod.setMethodParameterClassName(method.getParameterTypes()[0].getName());
redisMQListenerMethodList.add(redisMQListenerMethod);
}
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
public static List<RedisMQListenerMethod> getRedisMQListenerMethodList(){
return redisMQListenerMethodList;
}
}
在项目中通过ApplicationRunner 来启动消费队列线程
@Component
public class RedisMQMessageRegister implements ApplicationRunner {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
List<RedisMQListenerMethod> redisMQListenerMethodList = new ArrayList<>();
@Override
public void run(ApplicationArguments args) throws Exception {
this.redisMQListenerMethodList = RedisListenerScanAnnotationProcessor.getRedisMQListenerMethodList();
for (RedisMQListenerMethod redisMQListenerMethod : redisMQListenerMethodList) {
new Thread(new Worker(
redisMQListenerMethod.getQueue(),
redisMQListenerMethod.getInterval(),
redisMQListenerMethod.getQuantity())).start();
}
}
private class Worker implements Runnable {
private String queueName = "";
private Long time;
private Integer quantity;
@SneakyThrows
@Override
public void run() {
while (true) {
List<String> msg = multiRPopPipeline(queueName, quantity);
List<RedisMQMessage> redisMQMessages = new ArrayList<>();
for (String s : msg) {
redisMQMessages.add(JSON.parseObject(s,RedisMQMessage.class));
}
if (redisMQMessages.size() > 0){
for (RedisMQListenerMethod redisMQListenerMethod : redisMQListenerMethodList) {
if (redisMQListenerMethod.getQueue().equals(queueName)){
for (RedisMQMessage redisMQMessage : redisMQMessages) {
redisMQListenerMethod.getTargetMethod().invoke(redisMQListenerMethod.getBean(),redisMQMessage);
}
}
}
}
Thread.sleep(time);
}
}
public Worker(String queueName, Long time, Integer quantity) {
this.queueName = queueName;
this.time = time;
this.quantity = quantity;
}
}
/**
* 一次性pop出指定数量的数据
* @param key 键
* @param size 需要取出的元素个数
* @return 返回取出的元素集合
*/
public List<String> multiRPopPipeline(String key, int size) {
// 获取当前队列里的值
int curSize = Math.toIntExact(redisTemplate.opsForList().size(key));
if (curSize == 0) {
return Collections.emptyList();
}
// 判断操作次数
List<String> collect = redisTemplate.executePipelined(new SessionCallback<String>() {
@Override
public String execute(RedisOperations redisOperations) throws DataAccessException {
final int finalSize = Math.toIntExact(Math.min(curSize, size));
for (int i = 0; i < finalSize; i++) {
redisOperations.opsForList().rightPop(key);
}
return null;
}
}).stream().map(obj -> (String) obj).collect(Collectors.toList());
return collect;
}
}
定义入队工具类
public class RedisMQSender {
private RedisTemplate<String,Object> redisTemplate;
public RedisMQSender(RedisTemplate<String,Object> redisTemplate){
this.redisTemplate = redisTemplate;
}
public void send(String queue, Object msg){
redisTemplate.opsForList().leftPush(queue,
JSON.toJSONString(new RedisMQMessage()
.setQueue(queue)
.setLocalDateTime(LocalDateTime.now())
.setData(msg)));
}
}
测试:
@Test
void mqTest(){
RedisMQSender redisMQSender = new RedisMQSender(redisTemplate);
CommonInterfaceDTO commonInterfaceDTO = new CommonInterfaceDTO();
commonInterfaceDTO.setMethod("测试发送消息");
commonInterfaceDTO.setDepict("测试发送消息");
commonInterfaceDTO.setTitle("测试发送消息");
for (int i = 0; i < 2000; i++) {
redisMQSender.send("test-1",commonInterfaceDTO);
redisMQSender.send("test-2",commonInterfaceDTO);
}
}
消费者测试
@Component
public class RedisMQConsumer {
@RedisMQListener(queue = "test-1",interval = 2000,quantity = 2)
public void redisMQ(RedisMQMessage redisMQMessage) {
System.out.println(redisMQMessage.getQueue()+"队列收到消息:"+redisMQMessage);
}
@RedisMQListener(queue = "test-2",interval = 2000,quantity = 2)
public void redisMQ1(RedisMQMessage redisMQMessage) {
System.out.println(redisMQMessage.getQueue()+"队列收到消息:"+ redisMQMessage);
}
}