fix: 码全代码添加
This commit is contained in:
38
template/cvbp/cvbp-public/cvbp-mq-core/.gitignore
vendored
Normal file
38
template/cvbp/cvbp-public/cvbp-mq-core/.gitignore
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea/modules.xml
|
||||
.idea/jarRepositories.xml
|
||||
.idea/compiler.xml
|
||||
.idea/libraries/
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### Eclipse ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
|
||||
### Mac OS ###
|
||||
.DS_Store
|
||||
38
template/cvbp/cvbp-public/cvbp-mq-core/pom.xml
Normal file
38
template/cvbp/cvbp-public/cvbp-mq-core/pom.xml
Normal file
@@ -0,0 +1,38 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>com.codvision</groupId>
|
||||
<artifactId>cvbp-public</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>cvbp-mq-core</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.codvision</groupId>
|
||||
<artifactId>cvbp-cache-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.swagger</groupId>
|
||||
<artifactId>swagger-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.annotation;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 消息队列消费处理器
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
@Target({ElementType.TYPE})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@Component
|
||||
public @interface MQHandler {
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 消息队列消费者监听
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface MQListener {
|
||||
|
||||
/**
|
||||
* 队列名称
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
String[] queues() default {};
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.bean;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 公共消息
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
@Data
|
||||
public class CommonMessage<T> implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 3155238535887126819L;
|
||||
|
||||
@ApiModelProperty(value = "业务类型")
|
||||
private String bizType;
|
||||
|
||||
@ApiModelProperty(value = "业务ID【通常情况下,传递业务实体的ID可以了】")
|
||||
private Long bizId;
|
||||
|
||||
@ApiModelProperty(value = "业务ID列表【有些个别情况,是批量处理的,传递业务实体的ID列表就可以了】")
|
||||
private List<Long> bizIdList;
|
||||
|
||||
@ApiModelProperty(value = "传输的业务数据")
|
||||
private T data;
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 消息队列模块属性配置
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
@Data
|
||||
@Configuration
|
||||
@ConfigurationProperties("cvbp.mq")
|
||||
public class MQProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
private Boolean enabled = false;
|
||||
|
||||
/**
|
||||
* 消息队列中间件类型:redis/rabbitmq/rocketmq
|
||||
*/
|
||||
private String type = "redis";
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.config;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.codvision.mqcore.service.MQSender;
|
||||
import com.codvision.mqcore.service.impl.MQSenderByRedis;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 启动redis延迟队列
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/23
|
||||
*/
|
||||
@Slf4j
|
||||
public class RedisDelayQueueRunner implements CommandLineRunner {
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Resource
|
||||
private MQSender mqSender;
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
new Thread(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Set<String> keys = redisTemplate.keys(MQSenderByRedis.DELAYED_QUEUE_MSG_PREFIX + "*");
|
||||
if (CollectionUtil.isEmpty(keys)) {
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
for (String key : keys) {
|
||||
Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis());
|
||||
if (messages != null && !messages.isEmpty()) {
|
||||
Object message = messages.iterator().next();
|
||||
mqSender.sendMessage(StrUtil.subAfter(key, MQSenderByRedis.DELAYED_QUEUE_MSG_PREFIX, true), message);
|
||||
redisTemplate.opsForZSet().remove(key, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("(Redis延迟队列异常中断) {}", e.getMessage());
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException ex) {
|
||||
log.info(ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.config;
|
||||
|
||||
import com.codvision.mqcore.annotation.MQHandler;
|
||||
import com.codvision.mqcore.annotation.MQListener;
|
||||
import com.codvision.mqcore.service.MQSender;
|
||||
import com.codvision.mqcore.service.impl.MQSenderByRedis;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.listener.PatternTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
|
||||
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 消息队列配置类
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ConditionalOnProperty(value = "cvbp.mq.type", havingValue = "redis", matchIfMissing = true)
|
||||
public class RedisMQConfig {
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Bean
|
||||
public MQSender mq() {
|
||||
return new MQSenderByRedis();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RedisDelayQueueRunner redisDelayQueueRunner() {
|
||||
return new RedisDelayQueueRunner();
|
||||
}
|
||||
|
||||
/**
|
||||
* redis消息监听器容器
|
||||
*
|
||||
* @param connectionFactory redis连接工厂
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
container.setConnectionFactory(connectionFactory);
|
||||
|
||||
try {
|
||||
Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(MQHandler.class);
|
||||
for (Map.Entry<String, Object> entry : beanMap.entrySet()) {
|
||||
Class<?> clazz = entry.getValue().getClass();
|
||||
for (Method method : clazz.getMethods()) {
|
||||
if (method.isAnnotationPresent(MQListener.class)) {
|
||||
MQListener mqListener = AnnotationUtils.getAnnotation(method, MQListener.class);
|
||||
for (String queue : mqListener.queues()) {
|
||||
MessageListenerAdapter messageListenerAdapter
|
||||
= new MessageListenerAdapter(entry.getValue(), method.getName());
|
||||
messageListenerAdapter.afterPropertiesSet();
|
||||
// 使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化)
|
||||
Jackson2JsonRedisSerializer<?> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
|
||||
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
|
||||
jackson2JsonRedisSerializer.setObjectMapper(om);
|
||||
messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
|
||||
container.addMessageListener(messageListenerAdapter, new PatternTopic(queue));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
/**
|
||||
* 通用消息队列模块
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
package com.codvision.mqcore;
|
||||
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.service;
|
||||
|
||||
import com.codvision.mqcore.bean.CommonMessage;
|
||||
|
||||
/**
|
||||
* 消息队列发送接口
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
public interface MQSender {
|
||||
|
||||
/**
|
||||
* 根据公共消息体发送消息
|
||||
*
|
||||
* @param queueName 队列名称
|
||||
* @param commonMessage 公共消息
|
||||
*/
|
||||
void sendMessage(String queueName, CommonMessage commonMessage);
|
||||
|
||||
/**
|
||||
* 根据消息字符串发送消息
|
||||
*
|
||||
* @param queueName 队列名称
|
||||
* @param message 消息
|
||||
*/
|
||||
void sendMessage(String queueName, Object message);
|
||||
|
||||
/**
|
||||
* 发送延时消息
|
||||
*
|
||||
* @param queueName 队列名称
|
||||
* @param message 消息
|
||||
* @param delaySeconds 延时时长(秒)
|
||||
*/
|
||||
void sendDelayMessage(String queueName, Object message, long delaySeconds);
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright (c) 2023 codvision.com All Rights Reserved.
|
||||
*/
|
||||
|
||||
package com.codvision.mqcore.service.impl;
|
||||
|
||||
import com.codvision.mqcore.bean.CommonMessage;
|
||||
import com.codvision.mqcore.service.MQSender;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 消息队列redis实现
|
||||
*
|
||||
* @author lingee
|
||||
* @date 2023/8/22
|
||||
*/
|
||||
@Slf4j
|
||||
public class MQSenderByRedis implements MQSender {
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
public static final String DELAYED_QUEUE_MSG_PREFIX = "delayed:";
|
||||
|
||||
@Override
|
||||
public void sendMessage(String queueName, CommonMessage commonMessage) {
|
||||
try {
|
||||
String msg = objectMapper.writeValueAsString(commonMessage);
|
||||
log.info("【MQ消息生产者】queueName:{},message:{}", queueName, msg);
|
||||
sendMessage(queueName, commonMessage);
|
||||
} catch (Exception e) {
|
||||
log.error("【MQ消息生产者】发送消息失败", e);
|
||||
throw new RuntimeException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(String queueName, Object message) {
|
||||
try {
|
||||
log.info("【MQ消息生产者】queueName:{},message:{}", queueName, message);
|
||||
redisTemplate.convertAndSend(queueName, message);
|
||||
} catch (Exception e) {
|
||||
log.error("【MQ消息生产者】发送消息失败", e);
|
||||
throw new RuntimeException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendDelayMessage(String queueName, Object message, long delaySeconds) {
|
||||
try {
|
||||
String msg = objectMapper.writeValueAsString(message);
|
||||
log.info("【MQ延时消息生产者】queueName:{},message:{},delaySeconds:{}", queueName, msg, delaySeconds);
|
||||
String key = DELAYED_QUEUE_MSG_PREFIX + queueName;
|
||||
redisTemplate.opsForZSet().add(key, message, System.currentTimeMillis() + delaySeconds * 1000);
|
||||
} catch (Exception e) {
|
||||
log.error("【MQ延时消息生产者】发送延时消息失败", e);
|
||||
throw new RuntimeException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.codvision.mqcore.config.RedisMQConfig
|
||||
Reference in New Issue
Block a user