Spring Cloud Alibaba + Mirai Core の使用蕉♂流
-
写在开头
写这篇教程的目的是为了给那些有毕设需求而准备的,
实际开放中应该没多少人会把mirai拉进微服务这个圈子吧?
本篇中,Mirai Core充当着用户终端的角色,把用户输入传出、系统信息传送至用户;对分布式环境做了初步适配;结合Nacos进行动态更新配置,并利用Sentinel进行端口的动态流控;使用RocketMQ;
项目默认使用WebFlux、但Servlet我也做了支持,branch中带有-servlet则是完全采用Servlet。
各层必要方法的逻辑我会详细说明。
项目地址
传送门
PS:实际业务逻辑请以项目为准,文档有时候来不及更新!
版本预览
如果最新版代码中有对上版示例代码的进行大量变更时,将在最新版中附上完整的示例代码,否则只会针对需要更新的业务方法进行详细说明。
若无必要,将不会删除各版间的留言!第一版:传送门
PS:代码我并没有实机测试(特指集群环境),若有BUG,望不吝赐教!第二版:传送门
PS:正在更新,目前已更新了Mysql版,MongoDB正在调试中。项目中的一些代码我事后回想起来是有坑的,正在结合源码进行填坑……更新完毕,暂时解决了消息ID自定义,不过RocketMQ Spring Boot Starter异步回调确实不完美,实际使用还需注意。
目录
- 主要组件版本控制
- 其他相关组件版本控制
- 主POM
- Bot
4.1 多Bot的坑
4.2 多Bot的坑(集群)
4.3 其他业务对多Bot的适配 - Nacos
5.1 整合Nacos
5.2 接入配置中心
5.3 整合Nacos
5.4 设置配置文件
5.5 监听Nacos配置刷新事件 - Sentinel
6.1 整合Sentinel
6.2 配置Sentinel客户端
6.3 自定义流控返回
6.4 其他类对Sentinel的适配
6.5 配置持久化 - 配置参考
7.1 Redis配置参考
7.2 Redisson配置参考
主要组件版本控制:
Adopt OpenJDK Spring Boot Spring Cloud Spring Cloud Alibaba Mirai Core 11.0.10 2.4.2 2020.0.1 2021.1 2.6.8 其他相关组件版本控制
Nacos Sentinel Hutool 1.4.1 1.8.0 5.7.15 主POM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath/> </parent> <groupId>cyou.wssy001.cloud</groupId> <artifactId>bot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>bot</name> <packaging>pom</packaging> <properties> <java.version>11</java.version> <spring.cloud.version>2020.0.1</spring.cloud.version> <spring.cloud.alibaba.version>2021.1</spring.cloud.alibaba.version> <lombok.version>1.18.20</lombok.version> <redisson.version>3.16.4</redisson.version> <hutool.version>5.7.15</hutool.version> <fastjson.version>1.2.78</fastjson.version> <commons.pool2.version>2.11.1</commons.pool2.version> <mirai.core.version>2.6.8</mirai.core.version> </properties> <dependencyManagement> <dependencies> <!-- Spring Cloud Alibaba--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring.cloud.alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Spring Cloud--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> <!-- Redisson--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>${redisson.version}</version> </dependency> <!-- Hutool--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-core</artifactId> <version>${hutool.version}</version> </dependency> <!-- Hutool-http--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-http</artifactId> <version>${hutool.version}</version> </dependency> <!-- FastJSON--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- Commons Pool2--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>${commons.pool2.version}</version> </dependency> <!-- Mirai--> <dependency> <groupId>net.mamoe</groupId> <artifactId>mirai-core-jvm</artifactId> <version>${mirai.core.version}</version> </dependency> </dependencies> </dependencyManagement> </project>
Bot
引入Mirai依赖,由于Spring Boot 2.4.2自带的kotlin依赖版本是
<kotlin.version>1.4.21</kotlin.version> <kotlin-coroutines.version>1.4.2</kotlin-coroutines.version>
所以可以直接引入mirai-core-jvm,无需再指定kotlin相关依赖的版本
<dependency> <groupId>net.mamoe</groupId> <artifactId>mirai-core-jvm</artifactId> </dependency>
定义一个一个存储Bot账户的类
@Data public class BotAccount implements Serializable { private static final long serialVersionUID = 1L; private Long account; private String password; }
然后有一个用户机器人管理的Service,Bot登录后可以不放入IOC,使用Bot.getInstanceOrNull(botQQNumber)获得指定的Bot实例。
@Slf4j @Service @RequiredArgsConstructor public class RobotService { private final DynamicProperty dynamicProperty; private final BotOnlineHandler botOnlineHandler; private final BotOfflineHandler botOfflineHandler; private final GroupMessageHandler groupMessageHandler; private final MemberJoinHandler memberJoinHandler; private final MemberLeaveHandler memberLeaveHandler; @PostConstruct private void init() { login(stringToBotAccountList(dynamicProperty.getAccounts())); } public void refreshBot() { List<BotAccount> botAccounts = stringToBotAccountList(dynamicProperty.getAccounts()) .stream() .filter(v -> Bot.getInstanceOrNull(v.getAccount()) == null) .collect(Collectors.toList()); login(botAccounts); } private void login(List<BotAccount> botAccounts) { BotConfiguration botConfiguration = new BotConfiguration(); botConfiguration.fileBasedDeviceInfo("device.json"); // botConfiguration.noNetworkLog(); // botConfiguration.noBotLog(); for (BotAccount botAccountInfo : botAccounts) { Bot bot = BotFactory.INSTANCE.newBot(botAccountInfo.getAccount(), botAccountInfo.getPassword(), botConfiguration); try { bot.getEventChannel().subscribeAlways(BotOnlineEvent.class, botOnlineHandler::handle); bot.getEventChannel().subscribeAlways(BotOfflineEvent.class, botOfflineHandler::handle); bot.getEventChannel().subscribeAlways(GroupMessageEvent.class, groupMessageHandler::handle); bot.getEventChannel().subscribeAlways(MemberJoinEvent.class, memberJoinHandler::handle); bot.getEventChannel().subscribeAlways(MemberLeaveEvent.class, memberLeaveHandler::handle); bot.login(); log.info("******RobotService QQ:{} 登陆成功", botAccountInfo.getAccount()); // 去掉break即可实现多Bot在线 break; } catch (LoginFailedException e) { bot.close(); log.error("******Bot LoginFailedException:{} ,QQ:{}", e.getMessage(), botAccountInfo.getAccount()); } } } private List<BotAccount> stringToBotAccountList(String s) { return JSON.parseArray(s, BotAccount.class); } }
多Bot的坑
当你的项目同时启用多个Bot时,可能会遇到同时监听一个群消息的问题,这时我们需要加锁,在监听群消息上强行“单线程”。由于只部署一个实例,所以用JVM级别的锁就行。
逻辑
使用synchronized可以确保同一时间只有一个机器人能够调用该方法,同时检查消息ID是否存在于Redis来避免重复消费(这块我使用Redis的有序集合,通过设置scope为过期时间来实现过期,同时利用有序集合对存储已存在数据会执行更新操作且返回0的性质完成对消息IDs的存储和是否全部存在的判断),同时做了优化,只有存在多个机器人同时监听的群才会被强制“单线程”
@Component @RequiredArgsConstructor public class GroupMessageHandler2 { private final RepetitiveGroupService repetitiveGroupService; @Resource private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations; public void handle(@NotNull GroupMessageEvent event) { long groupId = event.getGroup().getId(); long botId = event.getBot().getId(); RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId) .share() .block(); if (hasBeenConsumed(event, groupId)) return; if (containMultiBots(repetitiveGroup, botId)) { synchronized (this) { consume(); } } else { consume(); } } // 检查事件是否以及被消费过 private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) { long second = new Date().getTime(); Range<Double> range = Range.closed(0.0, second + 0.0); reactiveZSetOperations.removeRangeByScore(groupId, range); int[] ids = event.getSource().getIds(); List<DefaultTypedTuple<Integer>> list = new ArrayList<>(); for (int id : ids) { list.add(new DefaultTypedTuple<>(id, second + 10.0)); } Long unExist = reactiveZSetOperations.addAll(groupId, list) .share() .block(); return unExist == null || unExist == 0; } // 检查这个群是否有多个机器人监听群消息 private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) { return repetitiveGroup.getId() == null || !repetitiveGroup.getBotIds().contains(botId); } // 消费事件 private void consume() { } }
多Bot的坑(集群)
集群环境下,JVM级别的锁失效,考虑到接入了Redis,所以引用Redisson来实现分布式锁。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> </dependency>
代码逻辑大致和非集群的类似,redisson拿到的锁是有超时时间的,时间到了自动释放。当事件A同时被Bot A、Bot B监听时,若Bot A拿到了锁但是消费事件时突然宕机,无法正常释放锁。若锁的超时时间大于事件Ids的有效时间,那Bot B可以正常消费事件。反之Bot B会被系统判定为事件重复消费。
@Component @RequiredArgsConstructor public class GroupMessageHandler { private final RedissonClient redissonClient; private final RepetitiveGroupService repetitiveGroupService; @Resource private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations; public void handle(@NotNull GroupMessageEvent event) { long groupId = event.getGroup().getId(); long botId = event.getBot().getId(); RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId) .share() .block(); RLock lock = null; if (hasBeenConsumed(event, groupId)) return; if (containMultiBots(repetitiveGroup, botId)) { lock = redissonClient.getLock(IdUtil.fastSimpleUUID()); lock.lock(20, TimeUnit.SECONDS); } consume(); if (lock != null) lock.unlock(); } // 检查事件是否以及被消费过 private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) { long second = new Date().getTime(); Range<Double> range = Range.closed(0.0, second + 0.0); reactiveZSetOperations.removeRangeByScore(groupId, range); int[] ids = event.getSource().getIds(); List<DefaultTypedTuple<Integer>> list = new ArrayList<>(); for (int id : ids) { list.add(new DefaultTypedTuple<>(id, second + 10.0)); } Long unExist = reactiveZSetOperations.addAll(groupId, list) .share() .block(); return unExist == null || unExist == 0; } // 检查这个群是否有多个机器人监听群消息 private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) { return repetitiveGroup.getId() != null && repetitiveGroup.getBotIds().contains(botId); } // 消费事件 private void consume() { } }
其他业务对多Bot的适配
RepetitiveGroup,存储加入了多个机器人的群
@Data @AllArgsConstructor @NoArgsConstructor public class RepetitiveGroup implements Serializable { private static final long serialVersionUID = 1L; @Id private Long id; private List<Long> botIds; }
RepetitiveGroupService实现CRUD
@Service public class RepetitiveGroupService { @Resource private ReactiveRedisOperations<String, RepetitiveGroup> reactiveRedisOperations; public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName(); public Mono<RepetitiveGroup> get(Long groupId) { return reactiveRedisOperations.opsForHash() .get(HASH_KEY, groupId) .map(v -> (RepetitiveGroup) v) .switchIfEmpty(Mono.defer(() -> Mono.just(new RepetitiveGroup()))); } public Flux<RepetitiveGroup> getAll() { return reactiveRedisOperations.opsForHash() .values(HASH_KEY) .map(v -> (RepetitiveGroup) v); } public Mono<Boolean> upset(RepetitiveGroup repetitiveGroup) { return reactiveRedisOperations.opsForHash() .put(HASH_KEY, repetitiveGroup.getId(), repetitiveGroup); } public Mono<Boolean> delete(RepetitiveGroup repetitiveGroup) { return reactiveRedisOperations.opsForHash() .remove(HASH_KEY, repetitiveGroup.getId()) .map(v -> v.equals(1L)); } }
BotOnlineHandler:
@Slf4j @Component @RequiredArgsConstructor public class BotOnlineHandler { private final SentinelService sentinelService; private final RepetitiveGroupService repetitiveGroupService; @Resource private ReactiveSetOperations<String, Long> reactiveSetOperations; public void handle(@NotNull BotOnlineEvent event) { Bot bot = event.getBot(); long id = bot.getId(); log.info("******BotOnlineHandler:QQ:{}", id); List<Bot> instances = Bot.getInstances(); Set<Long> groupIds = bot.getGroups() .parallelStream() .map(Group::getId) .collect(Collectors.toSet()); groupIds.parallelStream() .forEach(v -> reactiveSetOperations.add(id + "", v)); instances.removeIf(v -> v.getId() == id); instances.parallelStream() .forEach(v -> sinter(id, v.getId())); } private void sinter(Long self, Long target) { ArrayList<Long> list = new ArrayList<>(); list.add(self); list.add(target); reactiveSetOperations.intersect(self + "", target + "") .subscribe(v -> insert(v, list)); } private void insert(Long groupId, List<Long> botIds) { RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId) .share() .block(); if (repetitiveGroup.getId() == null) repetitiveGroup.setBotIds(new ArrayList<>()); Set<Long> set = new HashSet<>(botIds); set.addAll(repetitiveGroup.getBotIds()); repetitiveGroup.setBotIds(new ArrayList<>(set)); repetitiveGroupService.upset(repetitiveGroup); } }
BotOfflineHandler:
@Slf4j @Component @RequiredArgsConstructor public class BotOfflineHandler { private final RepetitiveGroupService repetitiveGroupService; public void handle(@NotNull BotOfflineEvent event) { Bot bot = event.getBot(); long id = bot.getId(); String reason = "未知"; if (event instanceof BotOfflineEvent.Active) reason = "主动下线"; if (event instanceof BotOfflineEvent.Force) reason = "被挤下线"; if (event instanceof BotOfflineEvent.Dropped) reason = "被服务器断开或因网络问题而掉线"; if (event instanceof BotOfflineEvent.RequireReconnect) reason = "服务器主动要求更换另一个服务器"; log.info("******BotOfflineHandler:QQ:{},Reason:{}", id, reason); repetitiveGroupService.getAll() .parallel() .runOn(Schedulers.parallel()) .sequential() .filter(v -> v.getBotIds().contains(id)) .map(v -> updateBotIdList(id, v)) .subscribe(this::updateOrDeleteRepetitiveGroup); bot.close(); } @NotNull private RepetitiveGroup updateBotIdList(long id, RepetitiveGroup repetitiveGroup) { List<Long> botIds = repetitiveGroup.getBotIds(); botIds.remove(id); repetitiveGroup.setBotIds(botIds); return repetitiveGroup; } private void updateOrDeleteRepetitiveGroup(RepetitiveGroup repetitiveGroup) { if (repetitiveGroup.getBotIds().size() < 2) { repetitiveGroupService.delete(repetitiveGroup); } else { repetitiveGroupService.upset(repetitiveGroup); } } }
MemberJoinHandler:
@Component @RequiredArgsConstructor public class MemberJoinHandler { private final RedissonClient redissonClient; private final RepetitiveGroupService repetitiveGroupService; public void handle(@NotNull MemberJoinEvent event) { long memberId = event.getMember().getId(); Group group = event.getGroup(); if (event.getBot().getId() == memberId) return; RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID()); lock.lock(30, TimeUnit.SECONDS); List<Long> idList = Bot.getInstances() .parallelStream() .map(Bot::getId) .collect(Collectors.toList()); if (!idList.contains(memberId)) return; RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId()) .share() .block(); List<Long> botIds = idList.parallelStream() .filter(group::contains) .collect(Collectors.toList()); if (repetitiveGroup.getId() != null && botIds.size() == repetitiveGroup.getBotIds().size()) return; repetitiveGroup.setId(group.getId()); repetitiveGroupService.upset(repetitiveGroup); lock.unlock(); } }
MemberLeaveHandler:
@Component @RequiredArgsConstructor public class MemberLeaveHandler { private final RedissonClient redissonClient; private final RepetitiveGroupService repetitiveGroupService; public void handle(@NotNull MemberLeaveEvent event) { long memberId = event.getMember().getId(); Group group = event.getGroup(); if (event.getBot().getId() == memberId) return; RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID()); lock.lock(30, TimeUnit.SECONDS); List<Long> idList = Bot.getInstances() .parallelStream() .map(Bot::getId) .collect(Collectors.toList()); if (!idList.contains(memberId)) return; RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId()) .share() .block(); if (repetitiveGroup.getId() == null) return; List<Long> botIds = idList.parallelStream() .filter(group::contains) .collect(Collectors.toList()); if (botIds.size() > 1) { repetitiveGroup.setId(group.getId()); repetitiveGroup.setBotIds(botIds); repetitiveGroupService.upset(repetitiveGroup); } if (botIds.size() == 1) repetitiveGroupService.delete(repetitiveGroup); lock.unlock(); } }
至此,Bot相关配置到此结束,下面介绍Nacos↓↓↓
Nacos
整合Nacos
这里只做Nacos Client的相关配置,Nacos Server端的请参阅Nacos官方文档
相关依赖:(注:因为要配合OpenFeign,所以将ribbon替换成了Loadbalancer)<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <exclusions> <exclusion> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-ribbon</artifactId> </exclusion> </exclusions> </dependency>
接入配置中心
本项目中使用Nacos的主要目的还是其动态下发配置的便利。
先设置bootstrap.yml,高版本的Spring Boot不支持bootstrap.yml,可以引入以下依赖:<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency>
bootstrap.yml配置如下:
spring: application: name: qqrobot cloud: nacos: server-addr: localhost:8848 config: # 如果在Nacos Web端没设置命名空间的可以删除 namespace: 3e4a06ab-3139-46e8-bc82-888154383de0 # 可以指定配置文件名,也可以不配置,用默认规则 name: qqrobot-dev.yml main: # 允许Bean覆盖 allow-bean-definition-overriding: true
使用@EnableDiscoveryClient就可以让模块注册进Nacos了
设置配置文件
Nacos Web端设置配置文件的相关操作我这边直接跳过,说说后端这边,
可以用@Value或@ConfigurationProperties(prefix = "xxx")取值,使用@RefreshScope注解即可开启自动刷新@Data @Component @RefreshScope public class DynamicProperty { @Value("${accounts}") private String accounts; }
这边说几个坑。有几个List写法我试了几次,后端自动装配没成功(从Nacos获取配置)
list<Bean>test: list: - a:1 b:1
test: list: - {a:1,b:1}
List<Map<String,String>>
test: list: - a:1 b:1
所以我建议JSON
监听Nacos配置刷新事件
当Nacos刷新本地项目的配置后会被RefreshEventListener(org.springframework.cloud.endpoint.event.RefreshEventListener)监听,通过参考onApplicationEvent()方法我们可以实现一个监听器
其中robotService是Bot的控制类,示例中我将机器人账号信息存入Nacos,当相关信息刷新后调用refreshBot()更新相关机器人的状态(上线、下线)@Configuration @RequiredArgsConstructor public class RefreshEventListenerConfig { private final RobotService robotService; private AtomicBoolean ready = new AtomicBoolean(false); @EventListener public void listen(ApplicationEvent event) { if (event instanceof ApplicationReadyEvent) this.ready.compareAndSet(false, true); if (event instanceof RefreshEvent && this.ready.get()) robotService.refreshBot(); } }
至此,Nacos相关的适配到此结束
Sentinel
接入Sentinel的目的是进行动态流控,当机器人全部离线时,如果有请求打进来,立刻执行兜底方法,并存储请求参数做延迟处理。当机器人至少有一个在线时则开放端口,同时对已存储的请求进行消费。这块功能未来会持续优化。
整合Sentinel
我们要引入sentinel组件,sentinel对web框架的适配(这里我选择了webflux)以及配置持久化方式
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-webflux-adapter</artifactId> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> </dependency>
配置Sentinel客户端
在bootstrap.yml中配置Sentinel Dashboard的地址
spring: cloud: sentinel: transport: dashboard: localhost:38081 port: 38719
这里只做Sentinel客户端的适配,服务端配置请参考Sentinel官方文档
@Configuration @RequiredArgsConstructor public class SentinelConfig { private final List<ViewResolver> viewResolvers; private final ServerCodecConfigurer serverCodecConfigurer; @Bean @Order(-1) public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() { return new SentinelBlockExceptionHandler(viewResolvers, serverCodecConfigurer); } @Bean @Order(-1) public SentinelWebFluxFilter sentinelWebFluxFilter() { return new SentinelWebFluxFilter(); } }
自定义流控返回
先写一个请求参数存储类UnhandledHttpRequest
@Data @AllArgsConstructor public class UnhandledHttpRequest implements Serializable { private static final long serialVersionUID = 1L; @Id private String id; private String body; private String method; public UnhandledHttpRequest() { id = IdUtil.fastSimpleUUID(); } public UnhandledHttpRequest(String body) { id = IdUtil.fastSimpleUUID(); this.body = body; } }
相关的服务类UnhandledHttpRequestService
@Service public class UnhandledHttpRequestService { @Resource private ReactiveRedisOperations<String, UnhandledHttpRequest> reactiveRedisOperations; public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName(); public Mono<UnhandledHttpRequest> get(String id) { return reactiveRedisOperations.opsForHash() .get(HASH_KEY, id) .map(v -> (UnhandledHttpRequest) v) .switchIfEmpty(Mono.defer(() -> Mono.just(new UnhandledHttpRequest()))); } public Flux<UnhandledHttpRequest> getAll() { return reactiveRedisOperations.opsForHash() .values(HASH_KEY) .map(v -> (UnhandledHttpRequest) v); } public Mono<Boolean> upset(UnhandledHttpRequest unhandledHttpRequest) { return reactiveRedisOperations.opsForHash() .put(HASH_KEY, unhandledHttpRequest.getId(), unhandledHttpRequest); } public Mono<Boolean> delete(UnhandledHttpRequest unhandledHttpRequest) { return reactiveRedisOperations.opsForHash() .remove(HASH_KEY, unhandledHttpRequest.getId()) .map(v -> v.equals(1L)); } }
最后定义一个返回内容处理类,WebFlux这边获取request的body没有Servlet容易,我试了几种方法都失败了,示例代码仅展示获取在URL上的请求参数。
@Component @RequiredArgsConstructor public class RobotBlockExceptionHandler implements BlockRequestHandler { private final UnhandledHttpRequestService unhandledHttpRequestService; @SneakyThrows @Override public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) { URI uri = exchange.getRequest().getURI(); JSONObject jsonObject = new JSONObject(); jsonObject.putAll(decodeBody(uri.getRawQuery())); UnhandledHttpRequest unhandledHttpRequest = new UnhandledHttpRequest(jsonObject.toJSONString()); if (uri.getPath().contains("/send/msg")) { unhandledHttpRequest.setMethod("/send/msg"); } else { unhandledHttpRequest.setMethod("/collect/msg"); } unhandledHttpRequestService.upset(unhandledHttpRequest); jsonObject.clear(); jsonObject.put("msg", "机器人服务正忙,请稍后重试"); jsonObject.put("code", HttpStatus.TOO_MANY_REQUESTS.value()); return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS) .contentType(MediaType.APPLICATION_JSON) .bodyValue(jsonObject.toJSONString()); } private Map<String, Object> decodeBody(String body) { if (StrUtil.isBlank(body)) return new HashMap<>(); if (body.contains("&") && body.contains("=")) return Arrays.stream(body.split("&")) .map(s -> s.split("=")) .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1])); return JSON.parseObject(body) .getInnerMap(); } }
其他类对Sentinel的适配
逻辑
我这边仅仅简单实现了,当Bot全部离线时,相关端口全部熔断,当有Bot在线时恢复。这个做法不可实际应用,更好点的做法则是依据request进行的限流。
BotOfflineHandlerprivate final SentinelService sentinelService; if (Bot.getInstances().isEmpty()) { FlowRule rule = new FlowRule("/send/msg"); rule.setCount(0); rule.setGrade(RuleConstant.FLOW_GRADE_QPS); rule.setLimitApp("default"); sentinelService.saveOrUpdateFlowRule(rule); rule.setResource("/collect/msg"); sentinelService.saveOrUpdateFlowRule(rule); }
BotOnlineHandler
private final SentinelService sentinelService; if (Bot.getInstances().isEmpty()) { FlowRule rule = new FlowRule("/send/msg"); rule.setCount(10); rule.setGrade(RuleConstant.FLOW_GRADE_QPS); rule.setLimitApp("default"); sentinelService.saveOrUpdateFlowRule(rule); rule.setResource("/collect/msg"); sentinelService.saveOrUpdateFlowRule(rule); }
配置持久化
修改bootstrap.yml,这里仅做参考,具体参数详见官网
spring: cloud: sentinel: transport: dashboard: localhost:38081 port: 38719 datasource: flow-rule: nacos: server-addr: localhost:8848 username: nacos password: nacos namespace: 3e4a06ab-3139-46e8-bc82-888154383de0 data-id: qqrobot-sentinel-flow-dev.json rule-type: flow
至此,Sentinel相关的适配到此结束
配置参考
Redis配置参考
@Configuration public class RedisConfig extends CachingConfigurerSupport { @Bean public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string()); } @Bean public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) { return connectionFactory.getReactiveConnection(); } @Bean ReactiveRedisOperations<String, Object> reactiveRedisOperations(ReactiveRedisConnectionFactory factory) { return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()); } @Bean ReactiveSetOperations<String, Object> reactiveSetOperations(ReactiveRedisConnectionFactory factory) { return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForSet(); } @Bean ReactiveZSetOperations<String, Object> reactiveZSetOperations(ReactiveRedisConnectionFactory factory) { return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForZSet(); } private static RedisSerializationContext<String, Object> getObjectRedisSerializationContext() { return RedisSerializationContext .<String, Object>newSerializationContext(new GenericFastJsonRedisSerializer()) .key(new FastJsonRedisSerializer<>(String.class)) .value(new FastJsonRedisSerializer<>(Object.class)) .hashKey(new FastJsonRedisSerializer<>(String.class)) .hashValue(new FastJsonRedisSerializer<>(Object.class)) .build(); } }
Redisson配置参考
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.setLockWatchdogTimeout(10000L); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setPassword("password"); singleServerConfig.setAddress("redis://localhost:6379"); singleServerConfig.setDatabase(1); return Redisson.create(config); } }
-
兄弟,有项目地址吗,我git clone一下
-
@840670339 这个项目后续还在更新,完结了再分享吧
-
@wssy001 好的
-
ohhhh刚好毕设用这个,得想办法把mirai塞毕设里。
-
第二版
写在开头
第二版主要是为项目加入了RocketMQ和DB。使用RocketMQ可以缓解高并发给DB(特指Mysql)带来的压力、提供Bot消费时突然宕机的基本解决方案,不过避免重复消费目前仍然需要借助Redis缓存消息ID来实现。DB方面我将采用MongoDB(Spring Data MongoDB Reactive)和Mysql(Mybatis Plus)作为示例。
之前在群里问了下,发现大部分人对DB的事务了解得不多、也有问我事务是啥,是否美味……所以本次更新暂未整合Seata,若有需要可以留言,我会尽快更新。
P.S.目前更新了Mysql版,MongoDB版稍后奉献。一些代码实际应用中会有坑,我正在看源码,未来会改正。代码更新完毕,文档更新略慢。目录
- 主要组件版本控制
- 与第一版的文件变动
- Mysql
3.1 引入Mybatis Plus
3.2 扫描Dao - MongoDB
4.1 [引入Spring Data MongoDB Reactive](#引入Spring Data MongoDB Reactive)
4.2 编写entity - RocketMQ
5.1 配置RocketMQ
5.2 GroupMessageHandler
5.3 ImageMessageSorter
5.4 PlainTextMessageSorter
5.5 UnhandledSendMessageSorter
5.6 ImageMessageDBConsumer
5.7 PlainTextMessageDBConsumer
5.8 UnhandledPlainTextMessageConsumer - DTO
- 其他
- 参考配置
6.1 MybatisGeneratorTest
第二版主要组件版本控制:
Mybatis Plus Mybatis Plus Generator Velocity Engine Core Rocket MQ 3.4.3 3.5.1 2.2 2.2.1
与第一版的文件变动
删除了:RepetitiveGroup(entity)
删除了:MemberJoinHandler、MemberLeaveHandler(handler)
删除了:RepetitiveGroupService(service)修改了:RobotService#handleStoredHttpRequest(service)
Mysql
Mybatis Plus,互联网人都爱它。
引入Mybatis Plus
主POM.xml引入Mybatis Plus、Mybatis Plus Generator以及模板引擎,这里用Velocity
<properties> <mybatis.plus.version>3.4.3</mybatis.plus.version> <mybatis.plus.generator.version>3.5.1</mybatis.plus.generator.version> <velocity.version>2.2</velocity.version> <rocketMQ.version>2.2.1</rocketMQ.version> </properties> <!-- ORM--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatis.plus.version}</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>${mybatis.plus.generator.version}</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>${velocity.version}</version> </dependency>
模块中引入
<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> </dependency>
连接池我喜欢用hikari(号称全球最快的),所以使用默认设置。然后利用Mybatis Plus Generator生成entity,dao,service……相关类即可,参看配置可以见目录 -> 配置参考。
扫描Dao
这块做法有俩,一:在每个Dao上添加@Mapper注解,Mybatis Plus将会自动扫描。二:配置类上使用@MapperScan("cyou.wssy001.cloud.bot.dao"),Mybatis Plus将会扫描cyou.wssy001.cloud.bot.dao下所有Dao。
Mybatis Plus相关的配置到此结束
MongoDB
听说用了MongoDB,妈妈再也不用担心分库分表了。
引入Spring Data MongoDB Reactive
废话不多说,直接上GAV
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
application.yml
spring: data: mongodb: uri: mongodb://root:wssy001@10.60.64.66:27017 database: back_up # 项目中没有Dao,所以选none repositories: type: none
编写entity
MongoDB里每张表都自带一个_id字段代表id,不过默认是String类型,当然我们可以自己实现数值类型的自增id。
大坑
MongoDB里没有Mysql的时区概念(一律UTC),如果要存储时间需要特殊处理,否则会发生以下问题(以java.util.Date,北京时间为例)(存储一个time为2021-12-26 20:00:00、id为“111”的对象,存入MongoDB后会存在一条id为"111",time为2021-12-26 12:00:00的记录;从MongoDB获取id为“111”的记录存入本地,打印显示该对象的time为2021-12-26 12:00:00)。当时网上搜了不少解决方案,但没有生效的,后来想到了LocalDateTime。
@Data @Builder @Document @AllArgsConstructor public class TImage { /** * ID */ @Id private String id; /** * mirai消息ID */ private Integer miraiId; /** * url */ private String url; /** * 本地路径 */ private String path; /** * 机器人账号 */ private Integer botNumber; /** * 群号 */ private Integer groupNumber; /** * 好友账号 */ private Integer friendNumber; /** * 创建时间 */ private LocalDateTime createTime; /** * 更新时间 */ private LocalDateTime updateTime; /** * 是否启用 */ private Boolean enable; @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TImage image = (TImage) o; return Objects.equals(id, image.id) && Objects.equals(miraiId, image.miraiId); } @Override public int hashCode() { return Objects.hash(id, miraiId); } }
Spring Data MongoDB Reactive相关的配置到此结束
RocketMQ
本教程不涉及RocketMQ的安装,仅涉及与Mirai相关的适配设置。
使用RocketMQ而不是Kafka的主要原因就是前者有事务消息,只是目前不适配Seata。配置RocketMQ
引入RocketMQ
主POM<properties> <rocketMQ.version>2.2.1</rocketMQ.version> </properties> <!-- RocketMQ--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketMQ.version}</version> </dependency>
模块
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
向application.yml增加配置
rocketmq: name-server: localhost:39876 producer: group: group1
由于我是用的是RocketMQ Spring Boot Starter,翻看DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently#consumeMessage 源码可以得知:使用@RocketMQMessageListener不太好实现批量拉取并消费。但为了网络优化,我们可以注入一个DefaultMQPushConsumer。
编写RocketMQConfig
@Slf4j @Setter @Configuration @RequiredArgsConstructor public class RocketMQConfig { @Value("${rocketmq.name-server}") private String nameServer; private final ImageMessageDBConsumer imageMessageDBConsumer; private final PlainTextMessageDBConsumer plainTextMessageDBConsumer; private final UnhandledSendMessageSorter unhandledSendMessageSorter; @Bean public DefaultMQPushConsumer imageMessageDBBatchConsumer() { DefaultMQPushConsumer consumer = getDefaultBatchConsumer(); consumer.setNamesrvAddr(nameServer); consumer.registerMessageListener(imageMessageDBConsumer); consumer.setConsumerGroup("image-db"); try { consumer.subscribe("image-message", ""); consumer.start(); } catch (Exception e) { log.info("******Exception:{}", e.getMessage()); } return consumer; } @Bean public DefaultMQPushConsumer plainTextMessageDBBatchConsumer() { DefaultMQPushConsumer consumer = getDefaultBatchConsumer(); consumer.registerMessageListener(plainTextMessageDBConsumer); consumer.setConsumerGroup("plain-text-db"); try { consumer.subscribe("plain-text-message", ""); consumer.start(); } catch (Exception e) { log.info("******Exception:{}", e.getMessage()); } return consumer; } @Bean public DefaultMQPushConsumer unhandledGroupMessageConsumer() { DefaultMQPushConsumer consumer = getDefaultBatchConsumer(); consumer.registerMessageListener(unhandledSendMessageSorter); consumer.setConsumerGroup("group-message"); try { consumer.subscribe("unhandled-group-message", ""); consumer.start(); } catch (Exception e) { log.info("******Exception:{}", e.getMessage()); } return consumer; } private DefaultMQPushConsumer getDefaultBatchConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr(nameServer); consumer.setPullInterval(2000); // 设置拉取消息的线程最大,最小值 consumer.setConsumeThreadMax(2); consumer.setConsumeThreadMin(1); consumer.setPullBatchSize(16); consumer.setConsumeMessageBatchMaxSize(16); return consumer; } }
LogSendCallbackService
MQ我喜欢用异步发送,因此需要自己编写一个回调方法,这里贴上一个LogSendCallbackService,但是异步发送时只有消费成功才会给相关消息的信息。
@Slf4j @Service public class LogSendCallbackService implements SendCallback { @Override public void onSuccess(SendResult sendResult) { log.info("******发送成功!,消息ID:{}", sendResult.getMsgId()); } @Override public void onException(Throwable e) { log.info("******发送失败!,错误信息:{}", e.getMessage()); } }
GroupMessageHandler
GroupMessageHandler,单纯的producer。获取所有的群消息,封装后发送至MQ,交给后续的Sorter分拣。
rocketMQTemplate.asyncSend()方法中可以直接传入一个Bean,但是会自动生成消息Id,故我选择手动封装一个消息,自定义消息Id更适合业务场景。@Component @RequiredArgsConstructor public class GroupMessageHandler { @Resource private ReactiveRedisOperations<String, MessageChainDto> reactiveRedisOperations; private final RocketMQTemplate rocketMQTemplate; private final LogSendCallbackService logSendCallbackService; public void handle(@NotNull GroupMessageEvent event) { String miraiCode = MiraiCode.serializeToMiraiCode(event.getMessage().iterator()); StringBuffer stringBuffer = new StringBuffer(); int[] ids = event.getSource().getIds(); Arrays.stream(ids) .forEach(stringBuffer::append); String key = stringBuffer.toString(); MessageChainDto messageChainDto = new MessageChainDto(); messageChainDto.setIds(ids); messageChainDto.setBotAccount(event.getBot().getId()); messageChainDto.setGroupNumber(event.getGroup().getId()); messageChainDto.setMiraiCode(miraiCode); Boolean nonExist = reactiveRedisOperations.opsForValue() .setIfAbsent(key, messageChainDto) .share() .block(); if (!nonExist) return; Duration ttl = Duration.ofSeconds(5); reactiveRedisOperations.expire(key, ttl); Message<MessageChainDto> message = MessageBuilder.withPayload(messageChainDto) .setHeader("KEYS", "GroupMessage_" + key) .build(); rocketMQTemplate.asyncSend("group-message", message, logSendCallbackService); } }
ImageMessageSorter
ImageMessageSorter,监听GroupMessageEvent中发送的QQ群消息,从中获取Image相关的,封装处理后发送至DBConsumer。
@Component @RequiredArgsConstructor @RocketMQMessageListener(topic = "group-message", consumerGroup = "image-message") public class ImageMessageSorter implements RocketMQListener<String> { private final RocketMQTemplate rocketMQTemplate; private final LogSendCallbackService logSendCallbackService; @Override public void onMessage(String message) { MessageChainDto messageChainDto = JSON.parseObject(message, MessageChainDto.class); int[] ids = messageChainDto.getIds(); MessageChain messageChain = MiraiCode.deserializeMiraiCode(messageChainDto.getMiraiCode()); ImageDto imageDto = new ImageDto(); imageDto.setBotAccount(messageChainDto.getBotAccount()); imageDto.setGroupNumber(messageChainDto.getGroupNumber()); List<ImageDto> imageDtoList = new ArrayList<>(); for (int i = 0; i < messageChain.size(); i++) { SingleMessage singleMessage = messageChain.get(i); if (singleMessage instanceof Image) imageDtoList.add(saveImage(imageDto, (Image) singleMessage, ids[i])); } if (imageDtoList.isEmpty()) return; send(imageDtoList); } // 存储图片 private ImageDto saveImage(ImageDto imageDto, Image image, Integer miraiId) { imageDto.setMiraiId(miraiId); String url = Image.queryUrl(image); return imageDto; } @SneakyThrows private void send(List<ImageDto> imageDtoList) { List<Message<ImageDto>> messageList = imageDtoList.parallelStream() .map(v -> MessageBuilder.withPayload(v) .setHeader("KEYS", "ImageDto_" + v.getId()) .build()) .collect(Collectors.toList()); rocketMQTemplate.asyncSend("image-message", messageList, logSendCallbackService); } }
PlainTextMessageSorter
PlainTextMessageSorter,监听GroupMessageEvent中发送的QQ群消息,从中获取Plain Text相关的,封装处理后发送至DBConsumer。
@Component @RequiredArgsConstructor @RocketMQMessageListener(topic = "group-message", consumerGroup = "plain-text-message") public class PlainTextMessageSorter implements RocketMQListener<String> { private final RocketMQTemplate rocketMQTemplate; private final LogSendCallbackService logSendCallbackService; @Override public void onMessage(String message) { MessageChainDto messageChainDto = JSON.parseObject(message, MessageChainDto.class); int[] ids = messageChainDto.getIds(); MessageChain messageChain = MiraiCode.deserializeMiraiCode(messageChainDto.getMiraiCode()); PlainTextDto plainTextDto = new PlainTextDto(); plainTextDto.setBotAccount(messageChainDto.getBotAccount()); plainTextDto.setGroupNumber(messageChainDto.getGroupNumber()); List<PlainTextDto> plainTextDtoList = new ArrayList<>(); for (int i = 0; i < messageChain.size(); i++) { SingleMessage singleMessage = messageChain.get(i); if (singleMessage instanceof PlainText) plainTextDtoList.add(box(plainTextDto, (PlainText) singleMessage, ids[i])); } if (plainTextDtoList.isEmpty()) return; send(plainTextDtoList); } private PlainTextDto box(PlainTextDto plainTextDto, PlainText plainText, Integer miraiCode) { plainTextDto.setMiraiId(miraiCode); plainTextDto.setText(plainText.getContent()); return plainTextDto; } @SneakyThrows private void send(List<PlainTextDto> plainTextDto) { List<Message<PlainTextDto>> messageList = plainTextDto.parallelStream() .map(v -> MessageBuilder.withPayload(v) .setHeader("KEYS", "PlainTextDto_" + v.getId()) .build()) .collect(Collectors.toList()); rocketMQTemplate.asyncSend("plain-text-message", messageList, logSendCallbackService); } }
UnhandledSendMessageSorter
UnhandledSendMessageSorter,处理由于Sentinel流控导致未能及时消费Http请求的信息。获取UnhandledHttpRequestDto,封装后发送至UnhandledPlainTextMessageConsumer。
@Component @RequiredArgsConstructor public class UnhandledSendMessageSorter implements MessageListenerConcurrently { private final RocketMQTemplate rocketMQTemplate; private final LogSendCallbackService logSendCallbackService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) { List<Message<UnhandledHttpRequestDto>> messageList = messageExtList.parallelStream() .map(v -> JSON.parseObject(new String(v.getBody()), UnhandledHttpRequest.class)) .map(this::toUnhandledHttpRequestDto) .filter(Objects::nonNull) .map(v -> MessageBuilder.withPayload(v) .setHeader("KEYS", "UnhandledHttpRequestDto_" + v.getId()) .build()) .collect(Collectors.toList()); rocketMQTemplate.asyncSend("unhandled-group-plain-text-message", messageList, logSendCallbackService); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } private UnhandledHttpRequestDto toUnhandledHttpRequestDto(UnhandledHttpRequest unhandledHttpRequest) { if (!unhandledHttpRequest.getMethod().equals("/send/msg") || unhandledHttpRequest.getGroupId() == null) return null; MessageChainBuilder append = new MessageChainBuilder() .append(new PlainText(unhandledHttpRequest.getMsg())) .append(new At(unhandledHttpRequest.getQQ())); return new UnhandledHttpRequestDto(unhandledHttpRequest.getId(), MiraiCode.serializeToMiraiCode(append), unhandledHttpRequest.getGroupId()); } }
ImageMessageDBConsumer
ImageMessageDBConsumer,批量接收来自ImageMessageSorter的信息,批量存入数据库
@Component @RequiredArgsConstructor public class ImageMessageDBConsumer implements MessageListenerConcurrently { private final TImageService tImageService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) { List<TImage> imageList = messageExtList.parallelStream() .map(v -> JSON.parseObject(new String(v.getBody()), ImageDto.class)) .map(this::toTImage) .collect(Collectors.toList()); tImageService.saveBatch(imageList) .share() .collectList() .block(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } private TImage toTImage(ImageDto imageDto) { TImage image = TImage.builder().build(); BeanUtil.copyProperties(imageDto, image); return image; } }
PlainTextMessageDBConsumer
PlainTextMessageDBConsumer,批量接收来自PlainTextMessageSorter的信息,批量存入数据库
@Component @RequiredArgsConstructor public class PlainTextMessageDBConsumer implements MessageListenerConcurrently { private final TPlainTextService tPlainTextService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) { List<TPlainText> plainTextList = messageExtList.parallelStream() .map(v -> JSON.parseObject(new String(v.getBody()), PlainTextDto.class)) .map(this::toTPlainText) .collect(Collectors.toList()); tPlainTextService.saveBatch(plainTextList) .share() .collectList() .block(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } private TPlainText toTPlainText(PlainTextDto plainTextDto) { TPlainText plainText = TPlainText.builder().build(); BeanUtil.copyProperties(plainTextDto, plainText); return plainText; } }
UnhandledPlainTextMessageConsumer
UnhandledPlainTextMessageConsumer,通过翻看上层源码,发现只有抛异常才会触发消费失败,然后触发重复投递。
@Component @RequiredArgsConstructor @RocketMQMessageListener(topic = "unhandled-group-plain-text-message", consumerGroup = "plain-text-message") public class UnhandledPlainTextMessageConsumer implements RocketMQListener<String> { @Resource private ReactiveRedisOperations<String, Long> reactiveRedisOperations; @Override @SneakyThrows public void onMessage(String message) { if (Bot.getInstances().isEmpty()) return; UnhandledHttpRequestDto unhandledHttpRequestDto = JSON.parseObject(message, UnhandledHttpRequestDto.class); Bot bot = Bot.getInstances() .stream() .filter(v -> v.getGroup(unhandledHttpRequestDto.getGroupId()) != null) .findFirst() .orElse(null); if (bot == null) throw new RuntimeException("无Bot在目标群"); String msgId = unhandledHttpRequestDto.getId(); Boolean nonExist = reactiveRedisOperations.opsForValue() .setIfAbsent(msgId, bot.getId()) .share() .block(); if (!nonExist) throw new RuntimeException("重复消费"); Duration ttl = Duration.ofSeconds(5); reactiveRedisOperations.expire(msgId, ttl); MessageChain messageChain = MiraiCode.deserializeMiraiCode(unhandledHttpRequestDto.getMiraiCode()); bot.getGroup(unhandledHttpRequestDto.getGroupId()) .sendMessage(messageChain); Thread.sleep(500); } }
RocketMQ相关的配置到此结束
DTO
这里主要是贴一下上文所用到的DTO
BaseMessageDto@Data public abstract class BaseMessageDto { private Long id; private Integer miraiId; private Long botAccount; private Long groupNumber; private Long friendNumber; }
ImageDto
@Data @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode(callSuper = true) public class ImageDto extends BaseMessageDto implements Serializable { static final long serialVersionUID = 1L; private String url; private String path; }
MessageChainDto
@Data @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode(callSuper = true) public class MessageChainDto extends BaseMessageDto implements Serializable { static final long serialVersionUID = 1L; private int[] ids; private String miraiCode; }
PlainTextDto
@Data @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode(callSuper = true) public class PlainTextDto extends BaseMessageDto implements Serializable { static final long serialVersionUID = 1L; private String text; }
UnhandledHttpRequestDto
@Data @AllArgsConstructor @NoArgsConstructor public class UnhandledHttpRequestDto implements Serializable { private static final long serialVersionUID = 1L; private String id; private String miraiCode; private Long groupId; }
其他
RobotService#handleStoredHttpRequest
private void handleStoredHttpRequest() { List<Message<UnhandledHttpRequest>> messageList = unhandledHttpRequestService.getAll() .filter(Objects::nonNull) .map(v -> MessageBuilder.withPayload(v).build()) .share() .collectList() .block(); if (messageList == null) return; rocketMQTemplate.asyncSend("unhandled-group-message", messageList, logSendCallbackService); }
参考配置
MybatisGeneratorTest
@Slf4j class MybatisGeneratorTest { @Test void mybatisPlusGenerator() { FastAutoGenerator.create(getDataSourceConfig()) // 全局配置 .globalConfig(getGlobalConfig()) // 包配置 .packageConfig(getPackageConfig()) // 策略配置 .strategyConfig(getStrategyConfig()) .execute(); } private DataSourceConfig.Builder getDataSourceConfig() { return new DataSourceConfig.Builder("jdbc:mysql://localhost:3306/test?useUnicode=true" + "&useSSL=false&autoReconnect=true&characterEncoding=utf-8&serverTimezone=GMT%2B8" + "&rewriteBatchedStatements=true", "root", "root") .typeConvert(new MySqlTypeConvert()) .keyWordsHandler(new MySqlKeyWordsHandler()); } private Consumer<StrategyConfig.Builder> getStrategyConfig() { return builder -> builder // 添加表名 .addInclude( "t_image", "t_plain_text") .entityBuilder() .disableSerialVersionUID() .enableChainModel() .enableLombok() .enableRemoveIsPrefix() .enableTableFieldAnnotation() .enableActiveRecord() // 乐观锁 .versionColumnName("version") .versionPropertyName("version") // 逻辑删除 .logicDeleteColumnName("enable") .logicDeletePropertyName("enable") .naming(NamingStrategy.underline_to_camel) .columnNaming(NamingStrategy.underline_to_camel) .addTableFills(new Column("create_time", FieldFill.INSERT)) .addTableFills(new Column("update_time", FieldFill.INSERT_UPDATE)) .idType(IdType.AUTO) .serviceBuilder() .formatServiceFileName("%sService") .formatServiceImplFileName("%sServiceImp") .mapperBuilder() .formatMapperFileName("%sDAO"); } private BiConsumer<Function<String, String>, PackageConfig.Builder> getPackageConfig() { return (scanner, builder) -> builder.parent("org.test") .moduleName("test") .entity("entity") .service("service") .serviceImpl("service.impl") .mapper("dao") .controller("controller") .other("other"); } private BiConsumer<Function<String, String>, GlobalConfig.Builder> getGlobalConfig() { return (scanner, builder) -> builder.author("yourName") .disableOpenDir() .dateType(DateType.ONLY_DATE) .outputDir(System.getProperty("user.dir") + "/src/main/java") // Swagger 2 API Doc相关注解,Knife4j适用 // .enableSwagger() .fileOverride(); } }
-
@840670339 项目公开了,帖子最前面有地址。代码仅供参考