写在开头
      写这篇教程的目的是为了给那些有毕设需求而准备的,实际开放中应该没多少人会把mirai拉进微服务这个圈子吧?
本篇中,Mirai Core充当着用户终端的角色,把用户输入传出、系统信息传送至用户;对分布式环境做了初步适配;结合Nacos进行动态更新配置,并利用Sentinel进行端口的动态流控;使用RocketMQ;
        项目默认使用WebFlux、但Servlet我也做了支持,branch中带有-servlet则是完全采用Servlet。
      各层必要方法的逻辑我会详细说明。
项目地址
传送门
PS:实际业务逻辑请以项目为准,文档有时候来不及更新!
版本预览
      如果最新版代码中有对上版示例代码的进行大量变更时,将在最新版中附上完整的示例代码,否则只会针对需要更新的业务方法进行详细说明。
      若无必要,将不会删除各版间的留言!
      第一版:传送门
      PS:代码我并没有实机测试(特指集群环境),若有BUG,望不吝赐教!
      第二版:传送门
      PS: 正在更新,目前已更新了Mysql版,MongoDB正在调试中。项目中的一些代码我事后回想起来是有坑的,正在结合源码进行填坑……   更新完毕,暂时解决了消息ID自定义,不过RocketMQ Spring Boot Starter异步回调确实不完美,实际使用还需注意。
目录
- 主要组件版本控制
 - 其他相关组件版本控制
 - 主POM
 - Bot
4.1 多Bot的坑
4.2 多Bot的坑(集群)
4.3 其他业务对多Bot的适配 - Nacos
5.1 整合Nacos
5.2 接入配置中心
5.3 整合Nacos
5.4 设置配置文件
5.5 监听Nacos配置刷新事件 - Sentinel
6.1 整合Sentinel
6.2 配置Sentinel客户端
6.3 自定义流控返回
6.4 其他类对Sentinel的适配
6.5 配置持久化 - 配置参考
7.1 Redis配置参考
7.2 Redisson配置参考 
主要组件版本控制:
| Adopt OpenJDK | Spring Boot | Spring Cloud | Spring Cloud Alibaba | Mirai Core | 
|---|---|---|---|---|
| 11.0.10 | 2.4.2 | 2020.0.1 | 2021.1 | 2.6.8 | 
其他相关组件版本控制
| Nacos | Sentinel | Hutool | 
|---|---|---|
| 1.4.1 | 1.8.0 | 5.7.15 | 
主POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/>
    </parent>
    <groupId>cyou.wssy001.cloud</groupId>
    <artifactId>bot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>bot</name>
    <packaging>pom</packaging>
    <properties>
        <java.version>11</java.version>
        <spring.cloud.version>2020.0.1</spring.cloud.version>
        <spring.cloud.alibaba.version>2021.1</spring.cloud.alibaba.version>
        <lombok.version>1.18.20</lombok.version>
        <redisson.version>3.16.4</redisson.version>
        <hutool.version>5.7.15</hutool.version>
        <fastjson.version>1.2.78</fastjson.version>
        <commons.pool2.version>2.11.1</commons.pool2.version>
        <mirai.core.version>2.6.8</mirai.core.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <!--        Spring Cloud Alibaba-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring.cloud.alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--            Spring Cloud-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--            Lombok-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
            <!--            Redisson-->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>${redisson.version}</version>
            </dependency>
            <!--            Hutool-->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-core</artifactId>
                <version>${hutool.version}</version>
            </dependency>
            <!--            Hutool-http-->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-http</artifactId>
                <version>${hutool.version}</version>
            </dependency>
            <!--            FastJSON-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
            <!--            Commons Pool2-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>${commons.pool2.version}</version>
            </dependency>
            <!--            Mirai-->
            <dependency>
                <groupId>net.mamoe</groupId>
                <artifactId>mirai-core-jvm</artifactId>
                <version>${mirai.core.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>
Bot
引入Mirai依赖,由于Spring Boot 2.4.2自带的kotlin依赖版本是
<kotlin.version>1.4.21</kotlin.version>
<kotlin-coroutines.version>1.4.2</kotlin-coroutines.version>
所以可以直接引入mirai-core-jvm,无需再指定kotlin相关依赖的版本
<dependency>
    <groupId>net.mamoe</groupId>
    <artifactId>mirai-core-jvm</artifactId>
</dependency>
定义一个一个存储Bot账户的类
@Data
public class BotAccount implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long account;
    private String password;
}
然后有一个用户机器人管理的Service,Bot登录后可以不放入IOC,使用Bot.getInstanceOrNull(botQQNumber)获得指定的Bot实例。
@Slf4j
@Service
@RequiredArgsConstructor
public class RobotService {
    private final DynamicProperty dynamicProperty;
    private final BotOnlineHandler botOnlineHandler;
    private final BotOfflineHandler botOfflineHandler;
    private final GroupMessageHandler groupMessageHandler;
    private final MemberJoinHandler memberJoinHandler;
    private final MemberLeaveHandler memberLeaveHandler;
    @PostConstruct
    private void init() {
        login(stringToBotAccountList(dynamicProperty.getAccounts()));
    }
    public void refreshBot() {
        List<BotAccount> botAccounts = stringToBotAccountList(dynamicProperty.getAccounts())
                .stream()
                .filter(v -> Bot.getInstanceOrNull(v.getAccount()) == null)
                .collect(Collectors.toList());
        login(botAccounts);
    }
    private void login(List<BotAccount> botAccounts) {
        BotConfiguration botConfiguration = new BotConfiguration();
        botConfiguration.fileBasedDeviceInfo("device.json");
//        botConfiguration.noNetworkLog();
//        botConfiguration.noBotLog();
        for (BotAccount botAccountInfo : botAccounts) {
            Bot bot = BotFactory.INSTANCE.newBot(botAccountInfo.getAccount(), botAccountInfo.getPassword(), botConfiguration);
            try {
                bot.getEventChannel().subscribeAlways(BotOnlineEvent.class, botOnlineHandler::handle);
                bot.getEventChannel().subscribeAlways(BotOfflineEvent.class, botOfflineHandler::handle);
                bot.getEventChannel().subscribeAlways(GroupMessageEvent.class, groupMessageHandler::handle);
                bot.getEventChannel().subscribeAlways(MemberJoinEvent.class, memberJoinHandler::handle);
                bot.getEventChannel().subscribeAlways(MemberLeaveEvent.class, memberLeaveHandler::handle);
                bot.login();
                log.info("******RobotService QQ:{} 登陆成功", botAccountInfo.getAccount());
//                去掉break即可实现多Bot在线
                break;
            } catch (LoginFailedException e) {
                bot.close();
                log.error("******Bot LoginFailedException:{} ,QQ:{}", e.getMessage(), botAccountInfo.getAccount());
            }
        }
    }
    private List<BotAccount> stringToBotAccountList(String s) {
        return JSON.parseArray(s, BotAccount.class);
    }
}
多Bot的坑
当你的项目同时启用多个Bot时,可能会遇到同时监听一个群消息的问题,这时我们需要加锁,在监听群消息上强行“单线程”。由于只部署一个实例,所以用JVM级别的锁就行。
逻辑
使用synchronized可以确保同一时间只有一个机器人能够调用该方法,同时检查消息ID是否存在于Redis来避免重复消费(这块我使用Redis的有序集合,通过设置scope为过期时间来实现过期,同时利用有序集合对存储已存在数据会执行更新操作且返回0的性质完成对消息IDs的存储和是否全部存在的判断),同时做了优化,只有存在多个机器人同时监听的群才会被强制“单线程”
@Component
@RequiredArgsConstructor
public class GroupMessageHandler2 {
    private final RepetitiveGroupService repetitiveGroupService;
    @Resource
    private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
    public void handle(@NotNull GroupMessageEvent event) {
        long groupId = event.getGroup().getId();
        long botId = event.getBot().getId();
        RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
                .share()
                .block();
        if (hasBeenConsumed(event, groupId)) return;
        if (containMultiBots(repetitiveGroup, botId)) {
            synchronized (this) {
                consume();
            }
        } else {
            consume();
        }
    }
    //    检查事件是否以及被消费过
    private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
        long second = new Date().getTime();
        Range<Double> range = Range.closed(0.0, second + 0.0);
        reactiveZSetOperations.removeRangeByScore(groupId, range);
        int[] ids = event.getSource().getIds();
        List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
        for (int id : ids) {
            list.add(new DefaultTypedTuple<>(id, second + 10.0));
        }
        Long unExist = reactiveZSetOperations.addAll(groupId, list)
                .share()
                .block();
        return unExist == null || unExist == 0;
    }
    //    检查这个群是否有多个机器人监听群消息
    private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
        return repetitiveGroup.getId() == null || !repetitiveGroup.getBotIds().contains(botId);
    }
    //    消费事件
    private void consume() {
    }
}
多Bot的坑(集群)
集群环境下,JVM级别的锁失效,考虑到接入了Redis,所以引用Redisson来实现分布式锁。
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
</dependency>
代码逻辑大致和非集群的类似,redisson拿到的锁是有超时时间的,时间到了自动释放。当事件A同时被Bot A、Bot B监听时,若Bot A拿到了锁但是消费事件时突然宕机,无法正常释放锁。若锁的超时时间大于事件Ids的有效时间,那Bot B可以正常消费事件。反之Bot B会被系统判定为事件重复消费。
@Component
@RequiredArgsConstructor
public class GroupMessageHandler {
    private final RedissonClient redissonClient;
    private final RepetitiveGroupService repetitiveGroupService;
    @Resource
    private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
    public void handle(@NotNull GroupMessageEvent event) {
        long groupId = event.getGroup().getId();
        long botId = event.getBot().getId();
        RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
                .share()
                .block();
        RLock lock = null;
        if (hasBeenConsumed(event, groupId)) return;
        if (containMultiBots(repetitiveGroup, botId)) {
            lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
            lock.lock(20, TimeUnit.SECONDS);
        }
        consume();
        if (lock != null) lock.unlock();
    }
    //    检查事件是否以及被消费过
    private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
        long second = new Date().getTime();
        Range<Double> range = Range.closed(0.0, second + 0.0);
        reactiveZSetOperations.removeRangeByScore(groupId, range);
        int[] ids = event.getSource().getIds();
        List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
        for (int id : ids) {
            list.add(new DefaultTypedTuple<>(id, second + 10.0));
        }
        Long unExist = reactiveZSetOperations.addAll(groupId, list)
                .share()
                .block();
        return unExist == null || unExist == 0;
    }
    //    检查这个群是否有多个机器人监听群消息
    private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
        return repetitiveGroup.getId() != null && repetitiveGroup.getBotIds().contains(botId);
    }
    //    消费事件
    private void consume() {
    }
}
其他业务对多Bot的适配
RepetitiveGroup,存储加入了多个机器人的群
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RepetitiveGroup implements Serializable {
    private static final long serialVersionUID = 1L;
    @Id
    private Long id;
    private List<Long> botIds;
}
RepetitiveGroupService实现CRUD
@Service
public class RepetitiveGroupService {
    @Resource
    private ReactiveRedisOperations<String, RepetitiveGroup> reactiveRedisOperations;
    public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
    public Mono<RepetitiveGroup> get(Long groupId) {
        return reactiveRedisOperations.opsForHash()
                .get(HASH_KEY, groupId)
                .map(v -> (RepetitiveGroup) v)
                .switchIfEmpty(Mono.defer(() -> Mono.just(new RepetitiveGroup())));
    }
    public Flux<RepetitiveGroup> getAll() {
        return reactiveRedisOperations.opsForHash()
                .values(HASH_KEY)
                .map(v -> (RepetitiveGroup) v);
    }
    public Mono<Boolean> upset(RepetitiveGroup repetitiveGroup) {
        return reactiveRedisOperations.opsForHash()
                .put(HASH_KEY, repetitiveGroup.getId(), repetitiveGroup);
    }
    public Mono<Boolean> delete(RepetitiveGroup repetitiveGroup) {
        return reactiveRedisOperations.opsForHash()
                .remove(HASH_KEY, repetitiveGroup.getId())
                .map(v -> v.equals(1L));
    }
}
BotOnlineHandler:
@Slf4j
@Component
@RequiredArgsConstructor
public class BotOnlineHandler {
    private final SentinelService sentinelService;
    private final RepetitiveGroupService repetitiveGroupService;
    @Resource
    private ReactiveSetOperations<String, Long> reactiveSetOperations;
    public void handle(@NotNull BotOnlineEvent event) {
        Bot bot = event.getBot();
        long id = bot.getId();
        log.info("******BotOnlineHandler:QQ:{}", id);
        List<Bot> instances = Bot.getInstances();
        Set<Long> groupIds = bot.getGroups()
                .parallelStream()
                .map(Group::getId)
                .collect(Collectors.toSet());
        groupIds.parallelStream()
                .forEach(v -> reactiveSetOperations.add(id + "", v));
        instances.removeIf(v -> v.getId() == id);
        instances.parallelStream()
                .forEach(v -> sinter(id, v.getId()));
    }
    private void sinter(Long self, Long target) {
        ArrayList<Long> list = new ArrayList<>();
        list.add(self);
        list.add(target);
        reactiveSetOperations.intersect(self + "", target + "")
                .subscribe(v -> insert(v, list));
    }
    private void insert(Long groupId, List<Long> botIds) {
        RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
                .share()
                .block();
        if (repetitiveGroup.getId() == null)
            repetitiveGroup.setBotIds(new ArrayList<>());
        Set<Long> set = new HashSet<>(botIds);
        set.addAll(repetitiveGroup.getBotIds());
        repetitiveGroup.setBotIds(new ArrayList<>(set));
        repetitiveGroupService.upset(repetitiveGroup);
    }
}
BotOfflineHandler:
@Slf4j
@Component
@RequiredArgsConstructor
public class BotOfflineHandler {
    private final RepetitiveGroupService repetitiveGroupService;
    public void handle(@NotNull BotOfflineEvent event) {
        Bot bot = event.getBot();
        long id = bot.getId();
        String reason = "未知";
        if (event instanceof BotOfflineEvent.Active)
            reason = "主动下线";
        if (event instanceof BotOfflineEvent.Force)
            reason = "被挤下线";
        if (event instanceof BotOfflineEvent.Dropped)
            reason = "被服务器断开或因网络问题而掉线";
        if (event instanceof BotOfflineEvent.RequireReconnect)
            reason = "服务器主动要求更换另一个服务器";
        log.info("******BotOfflineHandler:QQ:{},Reason:{}", id, reason);
        repetitiveGroupService.getAll()
                .parallel()
                .runOn(Schedulers.parallel())
                .sequential()
                .filter(v -> v.getBotIds().contains(id))
                .map(v -> updateBotIdList(id, v))
                .subscribe(this::updateOrDeleteRepetitiveGroup);
        bot.close();
    }
    @NotNull
    private RepetitiveGroup updateBotIdList(long id, RepetitiveGroup repetitiveGroup) {
        List<Long> botIds = repetitiveGroup.getBotIds();
        botIds.remove(id);
        repetitiveGroup.setBotIds(botIds);
        return repetitiveGroup;
    }
    private void updateOrDeleteRepetitiveGroup(RepetitiveGroup repetitiveGroup) {
        if (repetitiveGroup.getBotIds().size() < 2) {
            repetitiveGroupService.delete(repetitiveGroup);
        } else {
            repetitiveGroupService.upset(repetitiveGroup);
        }
    }
}
MemberJoinHandler:
@Component
@RequiredArgsConstructor
public class MemberJoinHandler {
    private final RedissonClient redissonClient;
    private final RepetitiveGroupService repetitiveGroupService;
    public void handle(@NotNull MemberJoinEvent event) {
        long memberId = event.getMember().getId();
        Group group = event.getGroup();
        if (event.getBot().getId() == memberId) return;
        RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
        lock.lock(30, TimeUnit.SECONDS);
        List<Long> idList = Bot.getInstances()
                .parallelStream()
                .map(Bot::getId)
                .collect(Collectors.toList());
        if (!idList.contains(memberId)) return;
        RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
                .share()
                .block();
        List<Long> botIds = idList.parallelStream()
                .filter(group::contains)
                .collect(Collectors.toList());
        if (repetitiveGroup.getId() != null && botIds.size() == repetitiveGroup.getBotIds().size()) return;
        repetitiveGroup.setId(group.getId());
        repetitiveGroupService.upset(repetitiveGroup);
        lock.unlock();
    }
}
MemberLeaveHandler:
@Component
@RequiredArgsConstructor
public class MemberLeaveHandler {
    private final RedissonClient redissonClient;
    private final RepetitiveGroupService repetitiveGroupService;
    public void handle(@NotNull MemberLeaveEvent event) {
        long memberId = event.getMember().getId();
        Group group = event.getGroup();
        if (event.getBot().getId() == memberId) return;
        RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
        lock.lock(30, TimeUnit.SECONDS);
        List<Long> idList = Bot.getInstances()
                .parallelStream()
                .map(Bot::getId)
                .collect(Collectors.toList());
        if (!idList.contains(memberId)) return;
        RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
                .share()
                .block();
        if (repetitiveGroup.getId() == null) return;
        List<Long> botIds = idList.parallelStream()
                .filter(group::contains)
                .collect(Collectors.toList());
        if (botIds.size() > 1) {
            repetitiveGroup.setId(group.getId());
            repetitiveGroup.setBotIds(botIds);
            repetitiveGroupService.upset(repetitiveGroup);
        }
        if (botIds.size() == 1)
            repetitiveGroupService.delete(repetitiveGroup);
        lock.unlock();
    }
}
至此,Bot相关配置到此结束,下面介绍Nacos↓↓↓
Nacos
整合Nacos
      这里只做Nacos Client的相关配置,Nacos Server端的请参阅Nacos官方文档
相关依赖:(注:因为要配合OpenFeign,所以将ribbon替换成了Loadbalancer)
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
        </exclusion>
    </exclusions>
</dependency>
接入配置中心
      本项目中使用Nacos的主要目的还是其动态下发配置的便利。
先设置bootstrap.yml,高版本的Spring Boot不支持bootstrap.yml,可以引入以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
bootstrap.yml配置如下:
spring:
  application:
    name: qqrobot
  cloud:
    nacos:
      server-addr: localhost:8848
      config:
#       如果在Nacos Web端没设置命名空间的可以删除
        namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
#       可以指定配置文件名,也可以不配置,用默认规则
        name: qqrobot-dev.yml
    
  main:
#    允许Bean覆盖
    allow-bean-definition-overriding: true
使用@EnableDiscoveryClient就可以让模块注册进Nacos了
设置配置文件
      Nacos Web端设置配置文件的相关操作我这边直接跳过,说说后端这边,
可以用@Value或@ConfigurationProperties(prefix = "xxx")取值,使用@RefreshScope注解即可开启自动刷新
@Data
@Component
@RefreshScope
public class DynamicProperty {
    @Value("${accounts}")
    private String accounts;
}
      这边说几个坑。有几个List写法我试了几次,后端自动装配没成功(从Nacos获取配置)
      list<Bean>
test:
  list:
    - a:1
      b:1
test:
  list:
    - {a:1,b:1}
List<Map<String,String>>
test:
  list:
    - a:1
      b:1
所以我建议JSON
监听Nacos配置刷新事件
      当Nacos刷新本地项目的配置后会被RefreshEventListener(org.springframework.cloud.endpoint.event.RefreshEventListener)监听,通过参考onApplicationEvent()方法我们可以实现一个监听器
      其中robotService是Bot的控制类,示例中我将机器人账号信息存入Nacos,当相关信息刷新后调用refreshBot()更新相关机器人的状态(上线、下线)
@Configuration
@RequiredArgsConstructor
public class RefreshEventListenerConfig {
    private final RobotService robotService;
    private AtomicBoolean ready = new AtomicBoolean(false);
    @EventListener
    public void listen(ApplicationEvent event) {
        if (event instanceof ApplicationReadyEvent)
            this.ready.compareAndSet(false, true);
        if (event instanceof RefreshEvent && this.ready.get())
            robotService.refreshBot();
    }
}
至此,Nacos相关的适配到此结束
Sentinel
接入Sentinel的目的是进行动态流控,当机器人全部离线时,如果有请求打进来,立刻执行兜底方法,并存储请求参数做延迟处理。当机器人至少有一个在线时则开放端口,同时对已存储的请求进行消费。这块功能未来会持续优化。
整合Sentinel
我们要引入sentinel组件,sentinel对web框架的适配(这里我选择了webflux)以及配置持久化方式
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-spring-webflux-adapter</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
配置Sentinel客户端
在bootstrap.yml中配置Sentinel Dashboard的地址
spring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:38081
        port: 38719
这里只做Sentinel客户端的适配,服务端配置请参考Sentinel官方文档
@Configuration
@RequiredArgsConstructor
public class SentinelConfig {
    private final List<ViewResolver> viewResolvers;
    private final ServerCodecConfigurer serverCodecConfigurer;
    @Bean
    @Order(-1)
    public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() {
        return new SentinelBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
    }
    @Bean
    @Order(-1)
    public SentinelWebFluxFilter sentinelWebFluxFilter() {
        return new SentinelWebFluxFilter();
    }
}
自定义流控返回
先写一个请求参数存储类UnhandledHttpRequest
@Data
@AllArgsConstructor
public class UnhandledHttpRequest implements Serializable {
    private static final long serialVersionUID = 1L;
    @Id
    private String id;
    private String body;
    private String method;
    public UnhandledHttpRequest() {
        id = IdUtil.fastSimpleUUID();
    }
    public UnhandledHttpRequest(String body) {
        id = IdUtil.fastSimpleUUID();
        this.body = body;
    }
}
相关的服务类UnhandledHttpRequestService
@Service
public class UnhandledHttpRequestService {
    @Resource
    private ReactiveRedisOperations<String, UnhandledHttpRequest> reactiveRedisOperations;
    public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
    public Mono<UnhandledHttpRequest> get(String id) {
        return reactiveRedisOperations.opsForHash()
                .get(HASH_KEY, id)
                .map(v -> (UnhandledHttpRequest) v)
                .switchIfEmpty(Mono.defer(() -> Mono.just(new UnhandledHttpRequest())));
    }
    public Flux<UnhandledHttpRequest> getAll() {
        return reactiveRedisOperations.opsForHash()
                .values(HASH_KEY)
                .map(v -> (UnhandledHttpRequest) v);
    }
    public Mono<Boolean> upset(UnhandledHttpRequest unhandledHttpRequest) {
        return reactiveRedisOperations.opsForHash()
                .put(HASH_KEY, unhandledHttpRequest.getId(), unhandledHttpRequest);
    }
    public Mono<Boolean> delete(UnhandledHttpRequest unhandledHttpRequest) {
        return reactiveRedisOperations.opsForHash()
                .remove(HASH_KEY, unhandledHttpRequest.getId())
                .map(v -> v.equals(1L));
    }
}
最后定义一个返回内容处理类,WebFlux这边获取request的body没有Servlet容易,我试了几种方法都失败了,示例代码仅展示获取在URL上的请求参数。
@Component
@RequiredArgsConstructor
public class RobotBlockExceptionHandler implements BlockRequestHandler {
    private final UnhandledHttpRequestService unhandledHttpRequestService;
    @SneakyThrows
    @Override
    public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) {
        URI uri = exchange.getRequest().getURI();
        JSONObject jsonObject = new JSONObject();
        jsonObject.putAll(decodeBody(uri.getRawQuery()));
        UnhandledHttpRequest unhandledHttpRequest = new UnhandledHttpRequest(jsonObject.toJSONString());
        if (uri.getPath().contains("/send/msg")) {
            unhandledHttpRequest.setMethod("/send/msg");
        } else {
            unhandledHttpRequest.setMethod("/collect/msg");
        }
        unhandledHttpRequestService.upset(unhandledHttpRequest);
        jsonObject.clear();
        jsonObject.put("msg", "机器人服务正忙,请稍后重试");
        jsonObject.put("code", HttpStatus.TOO_MANY_REQUESTS.value());
        return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(jsonObject.toJSONString());
    }
    private Map<String, Object> decodeBody(String body) {
        if (StrUtil.isBlank(body)) return new HashMap<>();
        if (body.contains("&") && body.contains("="))
            return Arrays.stream(body.split("&"))
                    .map(s -> s.split("="))
                    .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
        return JSON.parseObject(body)
                .getInnerMap();
    }
}
其他类对Sentinel的适配
逻辑
      我这边仅仅简单实现了,当Bot全部离线时,相关端口全部熔断,当有Bot在线时恢复。这个做法不可实际应用,更好点的做法则是依据request进行的限流。
      BotOfflineHandler
private final SentinelService sentinelService;
if (Bot.getInstances().isEmpty()) {
    FlowRule rule = new FlowRule("/send/msg");
    rule.setCount(0);
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setLimitApp("default");
    sentinelService.saveOrUpdateFlowRule(rule);
    rule.setResource("/collect/msg");
    sentinelService.saveOrUpdateFlowRule(rule);
}
BotOnlineHandler
private final SentinelService sentinelService;
if (Bot.getInstances().isEmpty()) {
    FlowRule rule = new FlowRule("/send/msg");
    rule.setCount(10);
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule.setLimitApp("default");
    sentinelService.saveOrUpdateFlowRule(rule);
    rule.setResource("/collect/msg");
    sentinelService.saveOrUpdateFlowRule(rule);
}
配置持久化
修改bootstrap.yml,这里仅做参考,具体参数详见官网
spring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:38081
        port: 38719
      datasource:
        flow-rule:
          nacos:
            server-addr: localhost:8848
            username: nacos
            password: nacos
            namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
            data-id: qqrobot-sentinel-flow-dev.json
            rule-type: flow
至此,Sentinel相关的适配到此结束
配置参考
Redis配置参考
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    @Bean
    public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string());
    }
    @Bean
    public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) {
        return connectionFactory.getReactiveConnection();
    }
    @Bean
    ReactiveRedisOperations<String, Object> reactiveRedisOperations(ReactiveRedisConnectionFactory factory) {
        return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext());
    }
    @Bean
    ReactiveSetOperations<String, Object> reactiveSetOperations(ReactiveRedisConnectionFactory factory) {
        return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForSet();
    }
    @Bean
    ReactiveZSetOperations<String, Object> reactiveZSetOperations(ReactiveRedisConnectionFactory factory) {
        return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForZSet();
    }
    private static RedisSerializationContext<String, Object> getObjectRedisSerializationContext() {
        return RedisSerializationContext
                .<String, Object>newSerializationContext(new GenericFastJsonRedisSerializer())
                .key(new FastJsonRedisSerializer<>(String.class))
                .value(new FastJsonRedisSerializer<>(Object.class))
                .hashKey(new FastJsonRedisSerializer<>(String.class))
                .hashValue(new FastJsonRedisSerializer<>(Object.class))
                .build();
    }
}
Redisson配置参考
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.setLockWatchdogTimeout(10000L);
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setPassword("password");
        singleServerConfig.setAddress("redis://localhost:6379");
        singleServerConfig.setDatabase(1);
        return Redisson.create(config);
    }
}