写在开头
写这篇教程的目的是为了给那些有毕设需求而准备的,实际开放中应该没多少人会把mirai拉进微服务这个圈子吧?
本篇中,Mirai Core充当着用户终端的角色,把用户输入传出、系统信息传送至用户;对分布式环境做了初步适配;结合Nacos进行动态更新配置,并利用Sentinel进行端口的动态流控;使用RocketMQ;
项目默认使用WebFlux、但Servlet我也做了支持,branch中带有-servlet则是完全采用Servlet。
各层必要方法的逻辑我会详细说明。
项目地址
传送门
PS:实际业务逻辑请以项目为准,文档有时候来不及更新!
版本预览
如果最新版代码中有对上版示例代码的进行大量变更时,将在最新版中附上完整的示例代码,否则只会针对需要更新的业务方法进行详细说明。
若无必要,将不会删除各版间的留言!
第一版:传送门
PS:代码我并没有实机测试(特指集群环境),若有BUG,望不吝赐教!
第二版:传送门
PS: 正在更新,目前已更新了Mysql版,MongoDB正在调试中。项目中的一些代码我事后回想起来是有坑的,正在结合源码进行填坑…… 更新完毕,暂时解决了消息ID自定义,不过RocketMQ Spring Boot Starter异步回调确实不完美,实际使用还需注意。
目录
- 主要组件版本控制
- 其他相关组件版本控制
- 主POM
- Bot
4.1 多Bot的坑
4.2 多Bot的坑(集群)
4.3 其他业务对多Bot的适配 - Nacos
5.1 整合Nacos
5.2 接入配置中心
5.3 整合Nacos
5.4 设置配置文件
5.5 监听Nacos配置刷新事件 - Sentinel
6.1 整合Sentinel
6.2 配置Sentinel客户端
6.3 自定义流控返回
6.4 其他类对Sentinel的适配
6.5 配置持久化 - 配置参考
7.1 Redis配置参考
7.2 Redisson配置参考
主要组件版本控制:
Adopt OpenJDK | Spring Boot | Spring Cloud | Spring Cloud Alibaba | Mirai Core |
---|---|---|---|---|
11.0.10 | 2.4.2 | 2020.0.1 | 2021.1 | 2.6.8 |
其他相关组件版本控制
Nacos | Sentinel | Hutool |
---|---|---|
1.4.1 | 1.8.0 | 5.7.15 |
主POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<groupId>cyou.wssy001.cloud</groupId>
<artifactId>bot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>bot</name>
<packaging>pom</packaging>
<properties>
<java.version>11</java.version>
<spring.cloud.version>2020.0.1</spring.cloud.version>
<spring.cloud.alibaba.version>2021.1</spring.cloud.alibaba.version>
<lombok.version>1.18.20</lombok.version>
<redisson.version>3.16.4</redisson.version>
<hutool.version>5.7.15</hutool.version>
<fastjson.version>1.2.78</fastjson.version>
<commons.pool2.version>2.11.1</commons.pool2.version>
<mirai.core.version>2.6.8</mirai.core.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Spring Cloud Alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Spring Cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!-- Redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
<!-- Hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- Hutool-http-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-http</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- FastJSON-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Commons Pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons.pool2.version}</version>
</dependency>
<!-- Mirai-->
<dependency>
<groupId>net.mamoe</groupId>
<artifactId>mirai-core-jvm</artifactId>
<version>${mirai.core.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Bot
引入Mirai依赖,由于Spring Boot 2.4.2自带的kotlin依赖版本是
<kotlin.version>1.4.21</kotlin.version>
<kotlin-coroutines.version>1.4.2</kotlin-coroutines.version>
所以可以直接引入mirai-core-jvm,无需再指定kotlin相关依赖的版本
<dependency>
<groupId>net.mamoe</groupId>
<artifactId>mirai-core-jvm</artifactId>
</dependency>
定义一个一个存储Bot账户的类
@Data
public class BotAccount implements Serializable {
private static final long serialVersionUID = 1L;
private Long account;
private String password;
}
然后有一个用户机器人管理的Service,Bot登录后可以不放入IOC,使用Bot.getInstanceOrNull(botQQNumber)获得指定的Bot实例。
@Slf4j
@Service
@RequiredArgsConstructor
public class RobotService {
private final DynamicProperty dynamicProperty;
private final BotOnlineHandler botOnlineHandler;
private final BotOfflineHandler botOfflineHandler;
private final GroupMessageHandler groupMessageHandler;
private final MemberJoinHandler memberJoinHandler;
private final MemberLeaveHandler memberLeaveHandler;
@PostConstruct
private void init() {
login(stringToBotAccountList(dynamicProperty.getAccounts()));
}
public void refreshBot() {
List<BotAccount> botAccounts = stringToBotAccountList(dynamicProperty.getAccounts())
.stream()
.filter(v -> Bot.getInstanceOrNull(v.getAccount()) == null)
.collect(Collectors.toList());
login(botAccounts);
}
private void login(List<BotAccount> botAccounts) {
BotConfiguration botConfiguration = new BotConfiguration();
botConfiguration.fileBasedDeviceInfo("device.json");
// botConfiguration.noNetworkLog();
// botConfiguration.noBotLog();
for (BotAccount botAccountInfo : botAccounts) {
Bot bot = BotFactory.INSTANCE.newBot(botAccountInfo.getAccount(), botAccountInfo.getPassword(), botConfiguration);
try {
bot.getEventChannel().subscribeAlways(BotOnlineEvent.class, botOnlineHandler::handle);
bot.getEventChannel().subscribeAlways(BotOfflineEvent.class, botOfflineHandler::handle);
bot.getEventChannel().subscribeAlways(GroupMessageEvent.class, groupMessageHandler::handle);
bot.getEventChannel().subscribeAlways(MemberJoinEvent.class, memberJoinHandler::handle);
bot.getEventChannel().subscribeAlways(MemberLeaveEvent.class, memberLeaveHandler::handle);
bot.login();
log.info("******RobotService QQ:{} 登陆成功", botAccountInfo.getAccount());
// 去掉break即可实现多Bot在线
break;
} catch (LoginFailedException e) {
bot.close();
log.error("******Bot LoginFailedException:{} ,QQ:{}", e.getMessage(), botAccountInfo.getAccount());
}
}
}
private List<BotAccount> stringToBotAccountList(String s) {
return JSON.parseArray(s, BotAccount.class);
}
}
多Bot的坑
当你的项目同时启用多个Bot时,可能会遇到同时监听一个群消息的问题,这时我们需要加锁,在监听群消息上强行“单线程”。由于只部署一个实例,所以用JVM级别的锁就行。
逻辑
使用synchronized可以确保同一时间只有一个机器人能够调用该方法,同时检查消息ID是否存在于Redis来避免重复消费(这块我使用Redis的有序集合,通过设置scope为过期时间来实现过期,同时利用有序集合对存储已存在数据会执行更新操作且返回0的性质完成对消息IDs的存储和是否全部存在的判断),同时做了优化,只有存在多个机器人同时监听的群才会被强制“单线程”
@Component
@RequiredArgsConstructor
public class GroupMessageHandler2 {
private final RepetitiveGroupService repetitiveGroupService;
@Resource
private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
public void handle(@NotNull GroupMessageEvent event) {
long groupId = event.getGroup().getId();
long botId = event.getBot().getId();
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
.share()
.block();
if (hasBeenConsumed(event, groupId)) return;
if (containMultiBots(repetitiveGroup, botId)) {
synchronized (this) {
consume();
}
} else {
consume();
}
}
// 检查事件是否以及被消费过
private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
long second = new Date().getTime();
Range<Double> range = Range.closed(0.0, second + 0.0);
reactiveZSetOperations.removeRangeByScore(groupId, range);
int[] ids = event.getSource().getIds();
List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
for (int id : ids) {
list.add(new DefaultTypedTuple<>(id, second + 10.0));
}
Long unExist = reactiveZSetOperations.addAll(groupId, list)
.share()
.block();
return unExist == null || unExist == 0;
}
// 检查这个群是否有多个机器人监听群消息
private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
return repetitiveGroup.getId() == null || !repetitiveGroup.getBotIds().contains(botId);
}
// 消费事件
private void consume() {
}
}
多Bot的坑(集群)
集群环境下,JVM级别的锁失效,考虑到接入了Redis,所以引用Redisson来实现分布式锁。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
代码逻辑大致和非集群的类似,redisson拿到的锁是有超时时间的,时间到了自动释放。当事件A同时被Bot A、Bot B监听时,若Bot A拿到了锁但是消费事件时突然宕机,无法正常释放锁。若锁的超时时间大于事件Ids的有效时间,那Bot B可以正常消费事件。反之Bot B会被系统判定为事件重复消费。
@Component
@RequiredArgsConstructor
public class GroupMessageHandler {
private final RedissonClient redissonClient;
private final RepetitiveGroupService repetitiveGroupService;
@Resource
private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
public void handle(@NotNull GroupMessageEvent event) {
long groupId = event.getGroup().getId();
long botId = event.getBot().getId();
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
.share()
.block();
RLock lock = null;
if (hasBeenConsumed(event, groupId)) return;
if (containMultiBots(repetitiveGroup, botId)) {
lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
lock.lock(20, TimeUnit.SECONDS);
}
consume();
if (lock != null) lock.unlock();
}
// 检查事件是否以及被消费过
private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
long second = new Date().getTime();
Range<Double> range = Range.closed(0.0, second + 0.0);
reactiveZSetOperations.removeRangeByScore(groupId, range);
int[] ids = event.getSource().getIds();
List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
for (int id : ids) {
list.add(new DefaultTypedTuple<>(id, second + 10.0));
}
Long unExist = reactiveZSetOperations.addAll(groupId, list)
.share()
.block();
return unExist == null || unExist == 0;
}
// 检查这个群是否有多个机器人监听群消息
private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
return repetitiveGroup.getId() != null && repetitiveGroup.getBotIds().contains(botId);
}
// 消费事件
private void consume() {
}
}
其他业务对多Bot的适配
RepetitiveGroup,存储加入了多个机器人的群
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RepetitiveGroup implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private Long id;
private List<Long> botIds;
}
RepetitiveGroupService实现CRUD
@Service
public class RepetitiveGroupService {
@Resource
private ReactiveRedisOperations<String, RepetitiveGroup> reactiveRedisOperations;
public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
public Mono<RepetitiveGroup> get(Long groupId) {
return reactiveRedisOperations.opsForHash()
.get(HASH_KEY, groupId)
.map(v -> (RepetitiveGroup) v)
.switchIfEmpty(Mono.defer(() -> Mono.just(new RepetitiveGroup())));
}
public Flux<RepetitiveGroup> getAll() {
return reactiveRedisOperations.opsForHash()
.values(HASH_KEY)
.map(v -> (RepetitiveGroup) v);
}
public Mono<Boolean> upset(RepetitiveGroup repetitiveGroup) {
return reactiveRedisOperations.opsForHash()
.put(HASH_KEY, repetitiveGroup.getId(), repetitiveGroup);
}
public Mono<Boolean> delete(RepetitiveGroup repetitiveGroup) {
return reactiveRedisOperations.opsForHash()
.remove(HASH_KEY, repetitiveGroup.getId())
.map(v -> v.equals(1L));
}
}
BotOnlineHandler:
@Slf4j
@Component
@RequiredArgsConstructor
public class BotOnlineHandler {
private final SentinelService sentinelService;
private final RepetitiveGroupService repetitiveGroupService;
@Resource
private ReactiveSetOperations<String, Long> reactiveSetOperations;
public void handle(@NotNull BotOnlineEvent event) {
Bot bot = event.getBot();
long id = bot.getId();
log.info("******BotOnlineHandler:QQ:{}", id);
List<Bot> instances = Bot.getInstances();
Set<Long> groupIds = bot.getGroups()
.parallelStream()
.map(Group::getId)
.collect(Collectors.toSet());
groupIds.parallelStream()
.forEach(v -> reactiveSetOperations.add(id + "", v));
instances.removeIf(v -> v.getId() == id);
instances.parallelStream()
.forEach(v -> sinter(id, v.getId()));
}
private void sinter(Long self, Long target) {
ArrayList<Long> list = new ArrayList<>();
list.add(self);
list.add(target);
reactiveSetOperations.intersect(self + "", target + "")
.subscribe(v -> insert(v, list));
}
private void insert(Long groupId, List<Long> botIds) {
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
.share()
.block();
if (repetitiveGroup.getId() == null)
repetitiveGroup.setBotIds(new ArrayList<>());
Set<Long> set = new HashSet<>(botIds);
set.addAll(repetitiveGroup.getBotIds());
repetitiveGroup.setBotIds(new ArrayList<>(set));
repetitiveGroupService.upset(repetitiveGroup);
}
}
BotOfflineHandler:
@Slf4j
@Component
@RequiredArgsConstructor
public class BotOfflineHandler {
private final RepetitiveGroupService repetitiveGroupService;
public void handle(@NotNull BotOfflineEvent event) {
Bot bot = event.getBot();
long id = bot.getId();
String reason = "未知";
if (event instanceof BotOfflineEvent.Active)
reason = "主动下线";
if (event instanceof BotOfflineEvent.Force)
reason = "被挤下线";
if (event instanceof BotOfflineEvent.Dropped)
reason = "被服务器断开或因网络问题而掉线";
if (event instanceof BotOfflineEvent.RequireReconnect)
reason = "服务器主动要求更换另一个服务器";
log.info("******BotOfflineHandler:QQ:{},Reason:{}", id, reason);
repetitiveGroupService.getAll()
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.filter(v -> v.getBotIds().contains(id))
.map(v -> updateBotIdList(id, v))
.subscribe(this::updateOrDeleteRepetitiveGroup);
bot.close();
}
@NotNull
private RepetitiveGroup updateBotIdList(long id, RepetitiveGroup repetitiveGroup) {
List<Long> botIds = repetitiveGroup.getBotIds();
botIds.remove(id);
repetitiveGroup.setBotIds(botIds);
return repetitiveGroup;
}
private void updateOrDeleteRepetitiveGroup(RepetitiveGroup repetitiveGroup) {
if (repetitiveGroup.getBotIds().size() < 2) {
repetitiveGroupService.delete(repetitiveGroup);
} else {
repetitiveGroupService.upset(repetitiveGroup);
}
}
}
MemberJoinHandler:
@Component
@RequiredArgsConstructor
public class MemberJoinHandler {
private final RedissonClient redissonClient;
private final RepetitiveGroupService repetitiveGroupService;
public void handle(@NotNull MemberJoinEvent event) {
long memberId = event.getMember().getId();
Group group = event.getGroup();
if (event.getBot().getId() == memberId) return;
RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
lock.lock(30, TimeUnit.SECONDS);
List<Long> idList = Bot.getInstances()
.parallelStream()
.map(Bot::getId)
.collect(Collectors.toList());
if (!idList.contains(memberId)) return;
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
.share()
.block();
List<Long> botIds = idList.parallelStream()
.filter(group::contains)
.collect(Collectors.toList());
if (repetitiveGroup.getId() != null && botIds.size() == repetitiveGroup.getBotIds().size()) return;
repetitiveGroup.setId(group.getId());
repetitiveGroupService.upset(repetitiveGroup);
lock.unlock();
}
}
MemberLeaveHandler:
@Component
@RequiredArgsConstructor
public class MemberLeaveHandler {
private final RedissonClient redissonClient;
private final RepetitiveGroupService repetitiveGroupService;
public void handle(@NotNull MemberLeaveEvent event) {
long memberId = event.getMember().getId();
Group group = event.getGroup();
if (event.getBot().getId() == memberId) return;
RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
lock.lock(30, TimeUnit.SECONDS);
List<Long> idList = Bot.getInstances()
.parallelStream()
.map(Bot::getId)
.collect(Collectors.toList());
if (!idList.contains(memberId)) return;
RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
.share()
.block();
if (repetitiveGroup.getId() == null) return;
List<Long> botIds = idList.parallelStream()
.filter(group::contains)
.collect(Collectors.toList());
if (botIds.size() > 1) {
repetitiveGroup.setId(group.getId());
repetitiveGroup.setBotIds(botIds);
repetitiveGroupService.upset(repetitiveGroup);
}
if (botIds.size() == 1)
repetitiveGroupService.delete(repetitiveGroup);
lock.unlock();
}
}
至此,Bot相关配置到此结束,下面介绍Nacos↓↓↓
Nacos
整合Nacos
这里只做Nacos Client的相关配置,Nacos Server端的请参阅Nacos官方文档
相关依赖:(注:因为要配合OpenFeign,所以将ribbon替换成了Loadbalancer)
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</exclusion>
</exclusions>
</dependency>
接入配置中心
本项目中使用Nacos的主要目的还是其动态下发配置的便利。
先设置bootstrap.yml,高版本的Spring Boot不支持bootstrap.yml,可以引入以下依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
bootstrap.yml配置如下:
spring:
application:
name: qqrobot
cloud:
nacos:
server-addr: localhost:8848
config:
# 如果在Nacos Web端没设置命名空间的可以删除
namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
# 可以指定配置文件名,也可以不配置,用默认规则
name: qqrobot-dev.yml
main:
# 允许Bean覆盖
allow-bean-definition-overriding: true
使用@EnableDiscoveryClient就可以让模块注册进Nacos了
设置配置文件
Nacos Web端设置配置文件的相关操作我这边直接跳过,说说后端这边,
可以用@Value或@ConfigurationProperties(prefix = "xxx")取值,使用@RefreshScope注解即可开启自动刷新
@Data
@Component
@RefreshScope
public class DynamicProperty {
@Value("${accounts}")
private String accounts;
}
这边说几个坑。有几个List写法我试了几次,后端自动装配没成功(从Nacos获取配置)
list<Bean>
test:
list:
- a:1
b:1
test:
list:
- {a:1,b:1}
List<Map<String,String>>
test:
list:
- a:1
b:1
所以我建议JSON
监听Nacos配置刷新事件
当Nacos刷新本地项目的配置后会被RefreshEventListener(org.springframework.cloud.endpoint.event.RefreshEventListener)监听,通过参考onApplicationEvent()方法我们可以实现一个监听器
其中robotService是Bot的控制类,示例中我将机器人账号信息存入Nacos,当相关信息刷新后调用refreshBot()更新相关机器人的状态(上线、下线)
@Configuration
@RequiredArgsConstructor
public class RefreshEventListenerConfig {
private final RobotService robotService;
private AtomicBoolean ready = new AtomicBoolean(false);
@EventListener
public void listen(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent)
this.ready.compareAndSet(false, true);
if (event instanceof RefreshEvent && this.ready.get())
robotService.refreshBot();
}
}
至此,Nacos相关的适配到此结束
Sentinel
接入Sentinel的目的是进行动态流控,当机器人全部离线时,如果有请求打进来,立刻执行兜底方法,并存储请求参数做延迟处理。当机器人至少有一个在线时则开放端口,同时对已存储的请求进行消费。这块功能未来会持续优化。
整合Sentinel
我们要引入sentinel组件,sentinel对web框架的适配(这里我选择了webflux)以及配置持久化方式
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-webflux-adapter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
配置Sentinel客户端
在bootstrap.yml中配置Sentinel Dashboard的地址
spring:
cloud:
sentinel:
transport:
dashboard: localhost:38081
port: 38719
这里只做Sentinel客户端的适配,服务端配置请参考Sentinel官方文档
@Configuration
@RequiredArgsConstructor
public class SentinelConfig {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
@Bean
@Order(-1)
public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() {
return new SentinelBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
}
@Bean
@Order(-1)
public SentinelWebFluxFilter sentinelWebFluxFilter() {
return new SentinelWebFluxFilter();
}
}
自定义流控返回
先写一个请求参数存储类UnhandledHttpRequest
@Data
@AllArgsConstructor
public class UnhandledHttpRequest implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private String id;
private String body;
private String method;
public UnhandledHttpRequest() {
id = IdUtil.fastSimpleUUID();
}
public UnhandledHttpRequest(String body) {
id = IdUtil.fastSimpleUUID();
this.body = body;
}
}
相关的服务类UnhandledHttpRequestService
@Service
public class UnhandledHttpRequestService {
@Resource
private ReactiveRedisOperations<String, UnhandledHttpRequest> reactiveRedisOperations;
public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
public Mono<UnhandledHttpRequest> get(String id) {
return reactiveRedisOperations.opsForHash()
.get(HASH_KEY, id)
.map(v -> (UnhandledHttpRequest) v)
.switchIfEmpty(Mono.defer(() -> Mono.just(new UnhandledHttpRequest())));
}
public Flux<UnhandledHttpRequest> getAll() {
return reactiveRedisOperations.opsForHash()
.values(HASH_KEY)
.map(v -> (UnhandledHttpRequest) v);
}
public Mono<Boolean> upset(UnhandledHttpRequest unhandledHttpRequest) {
return reactiveRedisOperations.opsForHash()
.put(HASH_KEY, unhandledHttpRequest.getId(), unhandledHttpRequest);
}
public Mono<Boolean> delete(UnhandledHttpRequest unhandledHttpRequest) {
return reactiveRedisOperations.opsForHash()
.remove(HASH_KEY, unhandledHttpRequest.getId())
.map(v -> v.equals(1L));
}
}
最后定义一个返回内容处理类,WebFlux这边获取request的body没有Servlet容易,我试了几种方法都失败了,示例代码仅展示获取在URL上的请求参数。
@Component
@RequiredArgsConstructor
public class RobotBlockExceptionHandler implements BlockRequestHandler {
private final UnhandledHttpRequestService unhandledHttpRequestService;
@SneakyThrows
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) {
URI uri = exchange.getRequest().getURI();
JSONObject jsonObject = new JSONObject();
jsonObject.putAll(decodeBody(uri.getRawQuery()));
UnhandledHttpRequest unhandledHttpRequest = new UnhandledHttpRequest(jsonObject.toJSONString());
if (uri.getPath().contains("/send/msg")) {
unhandledHttpRequest.setMethod("/send/msg");
} else {
unhandledHttpRequest.setMethod("/collect/msg");
}
unhandledHttpRequestService.upset(unhandledHttpRequest);
jsonObject.clear();
jsonObject.put("msg", "机器人服务正忙,请稍后重试");
jsonObject.put("code", HttpStatus.TOO_MANY_REQUESTS.value());
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonObject.toJSONString());
}
private Map<String, Object> decodeBody(String body) {
if (StrUtil.isBlank(body)) return new HashMap<>();
if (body.contains("&") && body.contains("="))
return Arrays.stream(body.split("&"))
.map(s -> s.split("="))
.collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
return JSON.parseObject(body)
.getInnerMap();
}
}
其他类对Sentinel的适配
逻辑
我这边仅仅简单实现了,当Bot全部离线时,相关端口全部熔断,当有Bot在线时恢复。这个做法不可实际应用,更好点的做法则是依据request进行的限流。
BotOfflineHandler
private final SentinelService sentinelService;
if (Bot.getInstances().isEmpty()) {
FlowRule rule = new FlowRule("/send/msg");
rule.setCount(0);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
sentinelService.saveOrUpdateFlowRule(rule);
rule.setResource("/collect/msg");
sentinelService.saveOrUpdateFlowRule(rule);
}
BotOnlineHandler
private final SentinelService sentinelService;
if (Bot.getInstances().isEmpty()) {
FlowRule rule = new FlowRule("/send/msg");
rule.setCount(10);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
sentinelService.saveOrUpdateFlowRule(rule);
rule.setResource("/collect/msg");
sentinelService.saveOrUpdateFlowRule(rule);
}
配置持久化
修改bootstrap.yml,这里仅做参考,具体参数详见官网
spring:
cloud:
sentinel:
transport:
dashboard: localhost:38081
port: 38719
datasource:
flow-rule:
nacos:
server-addr: localhost:8848
username: nacos
password: nacos
namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
data-id: qqrobot-sentinel-flow-dev.json
rule-type: flow
至此,Sentinel相关的适配到此结束
配置参考
Redis配置参考
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string());
}
@Bean
public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) {
return connectionFactory.getReactiveConnection();
}
@Bean
ReactiveRedisOperations<String, Object> reactiveRedisOperations(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext());
}
@Bean
ReactiveSetOperations<String, Object> reactiveSetOperations(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForSet();
}
@Bean
ReactiveZSetOperations<String, Object> reactiveZSetOperations(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForZSet();
}
private static RedisSerializationContext<String, Object> getObjectRedisSerializationContext() {
return RedisSerializationContext
.<String, Object>newSerializationContext(new GenericFastJsonRedisSerializer())
.key(new FastJsonRedisSerializer<>(String.class))
.value(new FastJsonRedisSerializer<>(Object.class))
.hashKey(new FastJsonRedisSerializer<>(String.class))
.hashValue(new FastJsonRedisSerializer<>(Object.class))
.build();
}
}
Redisson配置参考
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.setLockWatchdogTimeout(10000L);
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setPassword("password");
singleServerConfig.setAddress("redis://localhost:6379");
singleServerConfig.setDatabase(1);
return Redisson.create(config);
}
}