写在开头
写这篇教程的目的是为了给那些有毕设需求而准备的,实际开放中应该没多少人会把mirai拉进微服务这个圈子吧?
本篇中,Mirai Core充当着用户终端的角色,把用户输入传出、系统信息传送至用户;对分布式环境做了初步适配;结合Nacos进行动态更新配置,并利用Sentinel进行端口的动态流控;使用RocketMQ;
项目默认使用WebFlux、但Servlet我也做了支持,branch中带有-servlet则是完全采用Servlet。
各层必要方法的逻辑我会详细说明。
项目地址
传送门
PS:实际业务逻辑请以项目为准,文档有时候来不及更新!
版本预览
如果最新版代码中有对上版示例代码的进行大量变更时,将在最新版中附上完整的示例代码,否则只会针对需要更新的业务方法进行详细说明。
若无必要,将不会删除各版间的留言!
第一版:传送门
PS:代码我并没有实机测试(特指集群环境),若有BUG,望不吝赐教!
第二版:传送门
PS: 正在更新,目前已更新了Mysql版,MongoDB正在调试中。项目中的一些代码我事后回想起来是有坑的,正在结合源码进行填坑…… 更新完毕,暂时解决了消息ID自定义,不过RocketMQ Spring Boot Starter异步回调确实不完美,实际使用还需注意。
目录
主要组件版本控制
其他相关组件版本控制
主POM
Bot
4.1
多Bot的坑
4.2
多Bot的坑(集群)
4.3
其他业务对多Bot的适配
Nacos
5.1
整合Nacos
5.2
接入配置中心
5.3
整合Nacos
5.4
设置配置文件
5.5
监听Nacos配置刷新事件
Sentinel
6.1
整合Sentinel
6.2
配置Sentinel客户端
6.3
自定义流控返回
6.4
其他类对Sentinel的适配
6.5
配置持久化
配置参考
7.1
Redis配置参考
7.2
Redisson配置参考
主要组件版本控制:
Adopt OpenJDK
Spring Boot
Spring Cloud
Spring Cloud Alibaba
Mirai Core
11.0.10
2.4.2
2020.0.1
2021.1
2.6.8
其他相关组件版本控制
Nacos
Sentinel
Hutool
1.4.1
1.8.0
5.7.15
主POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<groupId>cyou.wssy001.cloud</groupId>
<artifactId>bot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>bot</name>
<packaging>pom</packaging>
<properties>
<java.version>11</java.version>
<spring.cloud.version>2020.0.1</spring.cloud.version>
<spring.cloud.alibaba.version>2021.1</spring.cloud.alibaba.version>
<lombok.version>1.18.20</lombok.version>
<redisson.version>3.16.4</redisson.version>
<hutool.version>5.7.15</hutool.version>
<fastjson.version>1.2.78</fastjson.version>
<commons.pool2.version>2.11.1</commons.pool2.version>
<mirai.core.version>2.6.8</mirai.core.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Spring Cloud Alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Spring Cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!-- Redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
<!-- Hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- Hutool-http-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-http</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- FastJSON-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Commons Pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons.pool2.version}</version>
</dependency>
<!-- Mirai-->
<dependency>
<groupId>net.mamoe</groupId>
<artifactId>mirai-core-jvm</artifactId>
<version>${mirai.core.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Bot
引入Mirai依赖,由于Spring Boot 2.4.2自带的kotlin依赖版本是
<kotlin.version>1.4.21</kotlin.version>
<kotlin-coroutines.version>1.4.2</kotlin-coroutines.version>
所以可以直接引入mirai-core-jvm,无需再指定kotlin相关依赖的版本
<dependency>
<groupId>net.mamoe</groupId>
<artifactId>mirai-core-jvm</artifactId>
</dependency>
定义一个一个存储Bot账户的类
@Data
public class BotAccount implements Serializable {
private static final long serialVersionUID = 1L;
private Long account;
private String password;
}
然后有一个用户机器人管理的Service,Bot登录后可以不放入IOC,使用Bot.getInstanceOrNull(botQQNumber)获得指定的Bot实例。
@Slf4j
@Service
@RequiredArgsConstructor
public class RobotService {
private final DynamicProperty dynamicProperty;
private final BotOnlineHandler botOnlineHandler;
private final BotOfflineHandler botOfflineHandler;
private final GroupMessageHandler groupMessageHandler;
private final MemberJoinHandler memberJoinHandler;
private final MemberLeaveHandler memberLeaveHandler;
@PostConstruct
private void init() {
login(stringToBotAccountList(dynamicProperty.getAccounts()));
}
public void refreshBot() {
List<BotAccount> botAccounts = stringToBotAccountList(dynamicProperty.getAccounts())
.stream()
.filter(v -> Bot.getInstanceOrNull(v.getAccount()) == null)
.collect(Collectors.toList());
login(botAccounts);
}
private void login(List<BotAccount> botAccounts) {
BotConfiguration botConfiguration = new BotConfiguration();
botConfiguration.fileBasedDeviceInfo("device.json");
// botConfiguration.noNetworkLog();
// botConfiguration.noBotLog();
for (BotAccount botAccountInfo : botAccounts) {
Bot bot = BotFactory.INSTANCE.newBot(botAccountInfo.getAccount(), botAccountInfo.getPassword(), botConfiguration);
try {
bot.getEventChannel().subscribeAlways(BotOnlineEvent.class, botOnlineHandler::handle);
bot.getEventChannel().subscribeAlways(BotOfflineEvent.class, botOfflineHandler::handle);
bot.getEventChannel().subscribeAlways(GroupMessageEvent.class, groupMessageHandler::handle);
bot.getEventChannel().subscribeAlways(MemberJoinEvent.class, memberJoinHandler::handle);
bot.getEventChannel().subscribeAlways(MemberLeaveEvent.class, memberLeaveHandler::handle);
bot.login();
log.info("******RobotService QQ:{} 登陆成功", botAccountInfo.getAccount());
// 去掉break即可实现多Bot在线
break;
} catch (LoginFailedException e) {
bot.close();
log.error("******Bot LoginFailedException:{} ,QQ:{}", e.getMessage(), botAccountInfo.getAccount());
}
}
}
private List<BotAccount> stringToBotAccountList(String s) {
return JSON.parseArray(s, BotAccount.class);
}
}
多Bot的坑
当你的项目同时启用多个Bot时,可能会遇到同时监听一个群消息的问题,这时我们需要加锁,在监听群消息上强行“单线程”。由于只部署一个实例,所以用JVM级别的锁就行。
逻辑
使用synchronized可以确保同一时间只有一个机器人能够调用该方法,同时检查消息ID是否存在于Redis来避免重复消费(这块我使用Redis的有序集合,通过设置scope为过期时间来实现过期,同时利用有序集合对存储已存在数据会执行更新操作且返回0的性质完成对消息IDs的存储和是否全部存在的判断),同时做了优化,只有存在多个机器人同时监听的群才会被强制“单线程”
@Component
@RequiredArgsConstructor
public class GroupMessageHandler2 {
private final RepetitiveGroupService repetitiveGroupService;
@Resource
private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
public void handle(@NotNull GroupMessageEvent event) {
long groupId = event.getGroup().getId();
long botId = event.getBot().getId();
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
.share()
.block();
if (hasBeenConsumed(event, groupId)) return;
if (containMultiBots(repetitiveGroup, botId)) {
synchronized (this) {
consume();
}
} else {
consume();
}
}
// 检查事件是否以及被消费过
private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
long second = new Date().getTime();
Range<Double> range = Range.closed(0.0, second + 0.0);
reactiveZSetOperations.removeRangeByScore(groupId, range);
int[] ids = event.getSource().getIds();
List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
for (int id : ids) {
list.add(new DefaultTypedTuple<>(id, second + 10.0));
}
Long unExist = reactiveZSetOperations.addAll(groupId, list)
.share()
.block();
return unExist == null || unExist == 0;
}
// 检查这个群是否有多个机器人监听群消息
private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
return repetitiveGroup.getId() == null || !repetitiveGroup.getBotIds().contains(botId);
}
// 消费事件
private void consume() {
}
}
多Bot的坑(集群)
集群环境下,JVM级别的锁失效,考虑到接入了Redis,所以引用Redisson来实现分布式锁。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
代码逻辑大致和非集群的类似,redisson拿到的锁是有超时时间的,时间到了自动释放。当事件A同时被Bot A、Bot B监听时,若Bot A拿到了锁但是消费事件时突然宕机,无法正常释放锁。若锁的超时时间大于事件Ids的有效时间,那Bot B可以正常消费事件。反之Bot B会被系统判定为事件重复消费。
@Component
@RequiredArgsConstructor
public class GroupMessageHandler {
private final RedissonClient redissonClient;
private final RepetitiveGroupService repetitiveGroupService;
@Resource
private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
public void handle(@NotNull GroupMessageEvent event) {
long groupId = event.getGroup().getId();
long botId = event.getBot().getId();
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
.share()
.block();
RLock lock = null;
if (hasBeenConsumed(event, groupId)) return;
if (containMultiBots(repetitiveGroup, botId)) {
lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
lock.lock(20, TimeUnit.SECONDS);
}
consume();
if (lock != null) lock.unlock();
}
// 检查事件是否以及被消费过
private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
long second = new Date().getTime();
Range<Double> range = Range.closed(0.0, second + 0.0);
reactiveZSetOperations.removeRangeByScore(groupId, range);
int[] ids = event.getSource().getIds();
List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
for (int id : ids) {
list.add(new DefaultTypedTuple<>(id, second + 10.0));
}
Long unExist = reactiveZSetOperations.addAll(groupId, list)
.share()
.block();
return unExist == null || unExist == 0;
}
// 检查这个群是否有多个机器人监听群消息
private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
return repetitiveGroup.getId() != null && repetitiveGroup.getBotIds().contains(botId);
}
// 消费事件
private void consume() {
}
}
其他业务对多Bot的适配
RepetitiveGroup,存储加入了多个机器人的群
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RepetitiveGroup implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private Long id;
private List<Long> botIds;
}
RepetitiveGroupService实现CRUD
@Service
public class RepetitiveGroupService {
@Resource
private ReactiveRedisOperations<String, RepetitiveGroup> reactiveRedisOperations;
public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
public Mono<RepetitiveGroup> get(Long groupId) {
return reactiveRedisOperations.opsForHash()
.get(HASH_KEY, groupId)
.map(v -> (RepetitiveGroup) v)
.switchIfEmpty(Mono.defer(() -> Mono.just(new RepetitiveGroup())));
}
public Flux<RepetitiveGroup> getAll() {
return reactiveRedisOperations.opsForHash()
.values(HASH_KEY)
.map(v -> (RepetitiveGroup) v);
}
public Mono<Boolean> upset(RepetitiveGroup repetitiveGroup) {
return reactiveRedisOperations.opsForHash()
.put(HASH_KEY, repetitiveGroup.getId(), repetitiveGroup);
}
public Mono<Boolean> delete(RepetitiveGroup repetitiveGroup) {
return reactiveRedisOperations.opsForHash()
.remove(HASH_KEY, repetitiveGroup.getId())
.map(v -> v.equals(1L));
}
}
BotOnlineHandler:
@Slf4j
@Component
@RequiredArgsConstructor
public class BotOnlineHandler {
private final SentinelService sentinelService;
private final RepetitiveGroupService repetitiveGroupService;
@Resource
private ReactiveSetOperations<String, Long> reactiveSetOperations;
public void handle(@NotNull BotOnlineEvent event) {
Bot bot = event.getBot();
long id = bot.getId();
log.info("******BotOnlineHandler:QQ:{}", id);
List<Bot> instances = Bot.getInstances();
Set<Long> groupIds = bot.getGroups()
.parallelStream()
.map(Group::getId)
.collect(Collectors.toSet());
groupIds.parallelStream()
.forEach(v -> reactiveSetOperations.add(id + "", v));
instances.removeIf(v -> v.getId() == id);
instances.parallelStream()
.forEach(v -> sinter(id, v.getId()));
}
private void sinter(Long self, Long target) {
ArrayList<Long> list = new ArrayList<>();
list.add(self);
list.add(target);
reactiveSetOperations.intersect(self + "", target + "")
.subscribe(v -> insert(v, list));
}
private void insert(Long groupId, List<Long> botIds) {
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
.share()
.block();
if (repetitiveGroup.getId() == null)
repetitiveGroup.setBotIds(new ArrayList<>());
Set<Long> set = new HashSet<>(botIds);
set.addAll(repetitiveGroup.getBotIds());
repetitiveGroup.setBotIds(new ArrayList<>(set));
repetitiveGroupService.upset(repetitiveGroup);
}
}
BotOfflineHandler:
@Slf4j
@Component
@RequiredArgsConstructor
public class BotOfflineHandler {
private final RepetitiveGroupService repetitiveGroupService;
public void handle(@NotNull BotOfflineEvent event) {
Bot bot = event.getBot();
long id = bot.getId();
String reason = "未知";
if (event instanceof BotOfflineEvent.Active)
reason = "主动下线";
if (event instanceof BotOfflineEvent.Force)
reason = "被挤下线";
if (event instanceof BotOfflineEvent.Dropped)
reason = "被服务器断开或因网络问题而掉线";
if (event instanceof BotOfflineEvent.RequireReconnect)
reason = "服务器主动要求更换另一个服务器";
log.info("******BotOfflineHandler:QQ:{},Reason:{}", id, reason);
repetitiveGroupService.getAll()
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.filter(v -> v.getBotIds().contains(id))
.map(v -> updateBotIdList(id, v))
.subscribe(this::updateOrDeleteRepetitiveGroup);
bot.close();
}
@NotNull
private RepetitiveGroup updateBotIdList(long id, RepetitiveGroup repetitiveGroup) {
List<Long> botIds = repetitiveGroup.getBotIds();
botIds.remove(id);
repetitiveGroup.setBotIds(botIds);
return repetitiveGroup;
}
private void updateOrDeleteRepetitiveGroup(RepetitiveGroup repetitiveGroup) {
if (repetitiveGroup.getBotIds().size() < 2) {
repetitiveGroupService.delete(repetitiveGroup);
} else {
repetitiveGroupService.upset(repetitiveGroup);
}
}
}
MemberJoinHandler:
@Component
@RequiredArgsConstructor
public class MemberJoinHandler {
private final RedissonClient redissonClient;
private final RepetitiveGroupService repetitiveGroupService;
public void handle(@NotNull MemberJoinEvent event) {
long memberId = event.getMember().getId();
Group group = event.getGroup();
if (event.getBot().getId() == memberId) return;
RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
lock.lock(30, TimeUnit.SECONDS);
List<Long> idList = Bot.getInstances()
.parallelStream()
.map(Bot::getId)
.collect(Collectors.toList());
if (!idList.contains(memberId)) return;
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
.share()
.block();
List<Long> botIds = idList.parallelStream()
.filter(group::contains)
.collect(Collectors.toList());
if (repetitiveGroup.getId() != null && botIds.size() == repetitiveGroup.getBotIds().size()) return;
repetitiveGroup.setId(group.getId());
repetitiveGroupService.upset(repetitiveGroup);
lock.unlock();
}
}
MemberLeaveHandler:
@Component
@RequiredArgsConstructor
public class MemberLeaveHandler {
private final RedissonClient redissonClient;
private final RepetitiveGroupService repetitiveGroupService;
public void handle(@NotNull MemberLeaveEvent event) {
long memberId = event.getMember().getId();
Group group = event.getGroup();
if (event.getBot().getId() == memberId) return;
RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
lock.lock(30, TimeUnit.SECONDS);
List<Long> idList = Bot.getInstances()
.parallelStream()
.map(Bot::getId)
.collect(Collectors.toList());
if (!idList.contains(memberId)) return;
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
.share()
.block();
if (repetitiveGroup.getId() == null) return;
List<Long> botIds = idList.parallelStream()
.filter(group::contains)
.collect(Collectors.toList());
if (botIds.size() > 1) {
repetitiveGroup.setId(group.getId());
repetitiveGroup.setBotIds(botIds);
repetitiveGroupService.upset(repetitiveGroup);
}
if (botIds.size() == 1)
repetitiveGroupService.delete(repetitiveGroup);
lock.unlock();
}
}
至此,Bot相关配置到此结束,下面介绍Nacos↓↓↓
Nacos
整合Nacos
这里只做Nacos Client的相关配置,Nacos Server端的请参阅Nacos官方文档
相关依赖:(注:因为要配合OpenFeign,所以将ribbon替换成了Loadbalancer)
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</exclusion>
</exclusions>
</dependency>
接入配置中心
本项目中使用Nacos的主要目的还是其动态下发配置的便利。
先设置bootstrap.yml,高版本的Spring Boot不支持bootstrap.yml,可以引入以下依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
bootstrap.yml配置如下:
spring:
application:
name: qqrobot
cloud:
nacos:
server-addr: localhost:8848
config:
# 如果在Nacos Web端没设置命名空间的可以删除
namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
# 可以指定配置文件名,也可以不配置,用默认规则
name: qqrobot-dev.yml
main:
# 允许Bean覆盖
allow-bean-definition-overriding: true
使用@EnableDiscoveryClient就可以让模块注册进Nacos了
设置配置文件
Nacos Web端设置配置文件的相关操作我这边直接跳过,说说后端这边,
可以用@Value或@ConfigurationProperties(prefix = "xxx")取值,使用@RefreshScope注解即可开启自动刷新
@Data
@Component
@RefreshScope
public class DynamicProperty {
@Value("${accounts}")
private String accounts;
}
这边说几个坑。有几个List写法我试了几次,后端自动装配没成功(从Nacos获取配置)
list<Bean>
test:
list:
- a:1
b:1
test:
list:
- {a:1,b:1}
List<Map<String,String>>
test:
list:
- a:1
b:1
所以我建议JSON
监听Nacos配置刷新事件
当Nacos刷新本地项目的配置后会被RefreshEventListener(org.springframework.cloud.endpoint.event.RefreshEventListener)监听,通过参考onApplicationEvent()方法我们可以实现一个监听器
其中robotService是Bot的控制类,示例中我将机器人账号信息存入Nacos,当相关信息刷新后调用refreshBot()更新相关机器人的状态(上线、下线)
@Configuration
@RequiredArgsConstructor
public class RefreshEventListenerConfig {
private final RobotService robotService;
private AtomicBoolean ready = new AtomicBoolean(false);
@EventListener
public void listen(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent)
this.ready.compareAndSet(false, true);
if (event instanceof RefreshEvent && this.ready.get())
robotService.refreshBot();
}
}
至此,Nacos相关的适配到此结束
Sentinel
接入Sentinel的目的是进行动态流控,当机器人全部离线时,如果有请求打进来,立刻执行兜底方法,并存储请求参数做延迟处理。当机器人至少有一个在线时则开放端口,同时对已存储的请求进行消费。这块功能未来会持续优化。
整合Sentinel
我们要引入sentinel组件,sentinel对web框架的适配(这里我选择了webflux)以及配置持久化方式
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-webflux-adapter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
配置Sentinel客户端
在bootstrap.yml中配置Sentinel Dashboard的地址
spring:
cloud:
sentinel:
transport:
dashboard: localhost:38081
port: 38719
这里只做Sentinel客户端的适配,服务端配置请参考Sentinel官方文档
@Configuration
@RequiredArgsConstructor
public class SentinelConfig {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
@Bean
@Order(-1)
public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() {
return new SentinelBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
}
@Bean
@Order(-1)
public SentinelWebFluxFilter sentinelWebFluxFilter() {
return new SentinelWebFluxFilter();
}
}
自定义流控返回
先写一个请求参数存储类UnhandledHttpRequest
@Data
@AllArgsConstructor
public class UnhandledHttpRequest implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private String id;
private String body;
private String method;
public UnhandledHttpRequest() {
id = IdUtil.fastSimpleUUID();
}
public UnhandledHttpRequest(String body) {
id = IdUtil.fastSimpleUUID();
this.body = body;
}
}
相关的服务类UnhandledHttpRequestService
@Service
public class UnhandledHttpRequestService {
@Resource
private ReactiveRedisOperations<String, UnhandledHttpRequest> reactiveRedisOperations;
public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
public Mono<UnhandledHttpRequest> get(String id) {
return reactiveRedisOperations.opsForHash()
.get(HASH_KEY, id)
.map(v -> (UnhandledHttpRequest) v)
.switchIfEmpty(Mono.defer(() -> Mono.just(new UnhandledHttpRequest())));
}
public Flux<UnhandledHttpRequest> getAll() {
return reactiveRedisOperations.opsForHash()
.values(HASH_KEY)
.map(v -> (UnhandledHttpRequest) v);
}
public Mono<Boolean> upset(UnhandledHttpRequest unhandledHttpRequest) {
return reactiveRedisOperations.opsForHash()
.put(HASH_KEY, unhandledHttpRequest.getId(), unhandledHttpRequest);
}
public Mono<Boolean> delete(UnhandledHttpRequest unhandledHttpRequest) {
return reactiveRedisOperations.opsForHash()
.remove(HASH_KEY, unhandledHttpRequest.getId())
.map(v -> v.equals(1L));
}
}
最后定义一个返回内容处理类,WebFlux这边获取request的body没有Servlet容易,我试了几种方法都失败了,示例代码仅展示获取在URL上的请求参数。
@Component
@RequiredArgsConstructor
public class RobotBlockExceptionHandler implements BlockRequestHandler {
private final UnhandledHttpRequestService unhandledHttpRequestService;
@SneakyThrows
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) {
URI uri = exchange.getRequest().getURI();
JSONObject jsonObject = new JSONObject();
jsonObject.putAll(decodeBody(uri.getRawQuery()));
UnhandledHttpRequest unhandledHttpRequest = new UnhandledHttpRequest(jsonObject.toJSONString());
if (uri.getPath().contains("/send/msg")) {
unhandledHttpRequest.setMethod("/send/msg");
} else {
unhandledHttpRequest.setMethod("/collect/msg");
}
unhandledHttpRequestService.upset(unhandledHttpRequest);
jsonObject.clear();
jsonObject.put("msg", "机器人服务正忙,请稍后重试");
jsonObject.put("code", HttpStatus.TOO_MANY_REQUESTS.value());
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonObject.toJSONString());
}
private Map<String, Object> decodeBody(String body) {
if (StrUtil.isBlank(body)) return new HashMap<>();
if (body.contains("&") && body.contains("="))
return Arrays.stream(body.split("&"))
.map(s -> s.split("="))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
return JSON.parseObject(body)
.getInnerMap();
}
}
其他类对Sentinel的适配
逻辑
我这边仅仅简单实现了,当Bot全部离线时,相关端口全部熔断,当有Bot在线时恢复。这个做法不可实际应用,更好点的做法则是依据request进行的限流。
BotOfflineHandler
private final SentinelService sentinelService;
if (Bot.getInstances().isEmpty()) {
FlowRule rule = new FlowRule("/send/msg");
rule.setCount(0);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
sentinelService.saveOrUpdateFlowRule(rule);
rule.setResource("/collect/msg");
sentinelService.saveOrUpdateFlowRule(rule);
}
BotOnlineHandler
private final SentinelService sentinelService;
if (Bot.getInstances().isEmpty()) {
FlowRule rule = new FlowRule("/send/msg");
rule.setCount(10);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
sentinelService.saveOrUpdateFlowRule(rule);
rule.setResource("/collect/msg");
sentinelService.saveOrUpdateFlowRule(rule);
}
配置持久化
修改bootstrap.yml,这里仅做参考,具体参数详见官网
spring:
cloud:
sentinel:
transport:
dashboard: localhost:38081
port: 38719
datasource:
flow-rule:
nacos:
server-addr: localhost:8848
username: nacos
password: nacos
namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
data-id: qqrobot-sentinel-flow-dev.json
rule-type: flow
至此,Sentinel相关的适配到此结束
配置参考
Redis配置参考
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string());
}
@Bean
public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) {
return connectionFactory.getReactiveConnection();
}
@Bean
ReactiveRedisOperations<String, Object> reactiveRedisOperations(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext());
}
@Bean
ReactiveSetOperations<String, Object> reactiveSetOperations(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForSet();
}
@Bean
ReactiveZSetOperations<String, Object> reactiveZSetOperations(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForZSet();
}
private static RedisSerializationContext<String, Object> getObjectRedisSerializationContext() {
return RedisSerializationContext
.<String, Object>newSerializationContext(new GenericFastJsonRedisSerializer())
.key(new FastJsonRedisSerializer<>(String.class))
.value(new FastJsonRedisSerializer<>(Object.class))
.hashKey(new FastJsonRedisSerializer<>(String.class))
.hashValue(new FastJsonRedisSerializer<>(Object.class))
.build();
}
}
Redisson配置参考
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.setLockWatchdogTimeout(10000L);
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setPassword("password");
singleServerConfig.setAddress("redis://localhost:6379");
singleServerConfig.setDatabase(1);
return Redisson.create(config);
}
}