MiraiForum

    • 注册
    • 登录
    • 搜索
    • 热门
    • 最新
    • 未解决
    • 标签
    • 群组
    • 友情链接

    Spring Cloud Alibaba + Mirai Core の使用蕉♂流

    使用交流
    spring cloud mirai core spring boot java
    3
    7
    1994
    正在加载更多帖子
    • 从旧到新
    • 从新到旧
    • 最多赞同
    回复
    • 在新帖中回复
    登录后回复
    此主题已被删除。只有拥有主题管理权限的用户可以查看。
    • wssy001
      wssy001 ⭐2021⭐ 最后由 wssy001 编辑

      写在开头

            写这篇教程的目的是为了给那些有毕设需求而准备的,实际开放中应该没多少人会把mirai拉进微服务这个圈子吧?
      本篇中,Mirai Core充当着用户终端的角色,把用户输入传出、系统信息传送至用户;对分布式环境做了初步适配;结合Nacos进行动态更新配置,并利用Sentinel进行端口的动态流控;使用RocketMQ;
             项目默认使用WebFlux、但Servlet我也做了支持,branch中带有-servlet则是完全采用Servlet。
            各层必要方法的逻辑我会详细说明。


      项目地址

            传送门

            PS:实际业务逻辑请以项目为准,文档有时候来不及更新!


      版本预览

            如果最新版代码中有对上版示例代码的进行大量变更时,将在最新版中附上完整的示例代码,否则只会针对需要更新的业务方法进行详细说明。
            若无必要,将不会删除各版间的留言!

            第一版:传送门
            PS:代码我并没有实机测试(特指集群环境),若有BUG,望不吝赐教!

            第二版:传送门
            PS: 正在更新,目前已更新了Mysql版,MongoDB正在调试中。项目中的一些代码我事后回想起来是有坑的,正在结合源码进行填坑…… 更新完毕,暂时解决了消息ID自定义,不过RocketMQ Spring Boot Starter异步回调确实不完美,实际使用还需注意。


      目录

      1. 主要组件版本控制
      2. 其他相关组件版本控制
      3. 主POM
      4. Bot
        4.1 多Bot的坑
        4.2 多Bot的坑(集群)
        4.3 其他业务对多Bot的适配
      5. Nacos
        5.1 整合Nacos
        5.2 接入配置中心
        5.3 整合Nacos
        5.4 设置配置文件
        5.5 监听Nacos配置刷新事件
      6. Sentinel
        6.1 整合Sentinel
        6.2 配置Sentinel客户端
        6.3 自定义流控返回
        6.4 其他类对Sentinel的适配
        6.5 配置持久化
      7. 配置参考
        7.1 Redis配置参考
        7.2 Redisson配置参考

      主要组件版本控制:

      Adopt OpenJDK Spring Boot Spring Cloud Spring Cloud Alibaba Mirai Core
      11.0.10 2.4.2 2020.0.1 2021.1 2.6.8

      其他相关组件版本控制

      Nacos Sentinel Hutool
      1.4.1 1.8.0 5.7.15

      主POM:

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <parent>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-parent</artifactId>
              <version>2.4.2</version>
              <relativePath/>
          </parent>
      
          <groupId>cyou.wssy001.cloud</groupId>
          <artifactId>bot</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <name>bot</name>
          <packaging>pom</packaging>
      
          <properties>
              <java.version>11</java.version>
              <spring.cloud.version>2020.0.1</spring.cloud.version>
              <spring.cloud.alibaba.version>2021.1</spring.cloud.alibaba.version>
              <lombok.version>1.18.20</lombok.version>
              <redisson.version>3.16.4</redisson.version>
              <hutool.version>5.7.15</hutool.version>
              <fastjson.version>1.2.78</fastjson.version>
              <commons.pool2.version>2.11.1</commons.pool2.version>
              <mirai.core.version>2.6.8</mirai.core.version>
          </properties>
      
          <dependencyManagement>
              <dependencies>
                  <!--        Spring Cloud Alibaba-->
                  <dependency>
                      <groupId>com.alibaba.cloud</groupId>
                      <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                      <version>${spring.cloud.alibaba.version}</version>
                      <type>pom</type>
                      <scope>import</scope>
                  </dependency>
      
                  <!--            Spring Cloud-->
                  <dependency>
                      <groupId>org.springframework.cloud</groupId>
                      <artifactId>spring-cloud-dependencies</artifactId>
                      <version>${spring.cloud.version}</version>
                      <type>pom</type>
                      <scope>import</scope>
                  </dependency>
      
                  <!--            Lombok-->
                  <dependency>
                      <groupId>org.projectlombok</groupId>
                      <artifactId>lombok</artifactId>
                      <version>${lombok.version}</version>
                  </dependency>
      
                  <!--            Redisson-->
                  <dependency>
                      <groupId>org.redisson</groupId>
                      <artifactId>redisson</artifactId>
                      <version>${redisson.version}</version>
                  </dependency>
      
                  <!--            Hutool-->
                  <dependency>
                      <groupId>cn.hutool</groupId>
                      <artifactId>hutool-core</artifactId>
                      <version>${hutool.version}</version>
                  </dependency>
      
                  <!--            Hutool-http-->
                  <dependency>
                      <groupId>cn.hutool</groupId>
                      <artifactId>hutool-http</artifactId>
                      <version>${hutool.version}</version>
                  </dependency>
      
                  <!--            FastJSON-->
                  <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>fastjson</artifactId>
                      <version>${fastjson.version}</version>
                  </dependency>
      
                  <!--            Commons Pool2-->
                  <dependency>
                      <groupId>org.apache.commons</groupId>
                      <artifactId>commons-pool2</artifactId>
                      <version>${commons.pool2.version}</version>
                  </dependency>
      
                  <!--            Mirai-->
                  <dependency>
                      <groupId>net.mamoe</groupId>
                      <artifactId>mirai-core-jvm</artifactId>
                      <version>${mirai.core.version}</version>
                  </dependency>
      
              </dependencies>
          </dependencyManagement>
      
      </project>
      

      Bot

            引入Mirai依赖,由于Spring Boot 2.4.2自带的kotlin依赖版本是

      <kotlin.version>1.4.21</kotlin.version>
      <kotlin-coroutines.version>1.4.2</kotlin-coroutines.version>
      

            所以可以直接引入mirai-core-jvm,无需再指定kotlin相关依赖的版本

      <dependency>
          <groupId>net.mamoe</groupId>
          <artifactId>mirai-core-jvm</artifactId>
      </dependency>
      

            定义一个一个存储Bot账户的类

      @Data
      public class BotAccount implements Serializable {
          private static final long serialVersionUID = 1L;
      
          private Long account;
          private String password;
      }
      

            然后有一个用户机器人管理的Service,Bot登录后可以不放入IOC,使用Bot.getInstanceOrNull(botQQNumber)获得指定的Bot实例。

      @Slf4j
      @Service
      @RequiredArgsConstructor
      public class RobotService {
          private final DynamicProperty dynamicProperty;
      
          private final BotOnlineHandler botOnlineHandler;
          private final BotOfflineHandler botOfflineHandler;
          private final GroupMessageHandler groupMessageHandler;
          private final MemberJoinHandler memberJoinHandler;
          private final MemberLeaveHandler memberLeaveHandler;
      
          @PostConstruct
          private void init() {
              login(stringToBotAccountList(dynamicProperty.getAccounts()));
          }
      
          public void refreshBot() {
              List<BotAccount> botAccounts = stringToBotAccountList(dynamicProperty.getAccounts())
                      .stream()
                      .filter(v -> Bot.getInstanceOrNull(v.getAccount()) == null)
                      .collect(Collectors.toList());
      
              login(botAccounts);
          }
      
          private void login(List<BotAccount> botAccounts) {
      
              BotConfiguration botConfiguration = new BotConfiguration();
              botConfiguration.fileBasedDeviceInfo("device.json");
      //        botConfiguration.noNetworkLog();
      //        botConfiguration.noBotLog();
      
              for (BotAccount botAccountInfo : botAccounts) {
      
                  Bot bot = BotFactory.INSTANCE.newBot(botAccountInfo.getAccount(), botAccountInfo.getPassword(), botConfiguration);
                  try {
                      bot.getEventChannel().subscribeAlways(BotOnlineEvent.class, botOnlineHandler::handle);
                      bot.getEventChannel().subscribeAlways(BotOfflineEvent.class, botOfflineHandler::handle);
                      bot.getEventChannel().subscribeAlways(GroupMessageEvent.class, groupMessageHandler::handle);
                      bot.getEventChannel().subscribeAlways(MemberJoinEvent.class, memberJoinHandler::handle);
                      bot.getEventChannel().subscribeAlways(MemberLeaveEvent.class, memberLeaveHandler::handle);
                      bot.login();
                      log.info("******RobotService QQ:{} 登陆成功", botAccountInfo.getAccount());
      
      //                去掉break即可实现多Bot在线
                      break;
                  } catch (LoginFailedException e) {
                      bot.close();
                      log.error("******Bot LoginFailedException:{} ,QQ:{}", e.getMessage(), botAccountInfo.getAccount());
                  }
              }
      
          }
      
          private List<BotAccount> stringToBotAccountList(String s) {
              return JSON.parseArray(s, BotAccount.class);
          }
      }
      

      多Bot的坑

            当你的项目同时启用多个Bot时,可能会遇到同时监听一个群消息的问题,这时我们需要加锁,在监听群消息上强行“单线程”。由于只部署一个实例,所以用JVM级别的锁就行。

      逻辑

            使用synchronized可以确保同一时间只有一个机器人能够调用该方法,同时检查消息ID是否存在于Redis来避免重复消费(这块我使用Redis的有序集合,通过设置scope为过期时间来实现过期,同时利用有序集合对存储已存在数据会执行更新操作且返回0的性质完成对消息IDs的存储和是否全部存在的判断),同时做了优化,只有存在多个机器人同时监听的群才会被强制“单线程”

      @Component
      @RequiredArgsConstructor
      public class GroupMessageHandler2 {
          private final RepetitiveGroupService repetitiveGroupService;
          @Resource
          private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
      
          public void handle(@NotNull GroupMessageEvent event) {
              long groupId = event.getGroup().getId();
              long botId = event.getBot().getId();
              RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
                      .share()
                      .block();
      
              if (hasBeenConsumed(event, groupId)) return;
      
              if (containMultiBots(repetitiveGroup, botId)) {
                  synchronized (this) {
                      consume();
                  }
              } else {
                  consume();
              }
          }
      
          //    检查事件是否以及被消费过
          private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
      
              long second = new Date().getTime();
              Range<Double> range = Range.closed(0.0, second + 0.0);
              reactiveZSetOperations.removeRangeByScore(groupId, range);
      
              int[] ids = event.getSource().getIds();
              List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
              for (int id : ids) {
                  list.add(new DefaultTypedTuple<>(id, second + 10.0));
              }
      
              Long unExist = reactiveZSetOperations.addAll(groupId, list)
                      .share()
                      .block();
      
              return unExist == null || unExist == 0;
          }
      
          //    检查这个群是否有多个机器人监听群消息
          private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
              return repetitiveGroup.getId() == null || !repetitiveGroup.getBotIds().contains(botId);
          }
      
          //    消费事件
          private void consume() {
      
          }
      }
      

      多Bot的坑(集群)

            集群环境下,JVM级别的锁失效,考虑到接入了Redis,所以引用Redisson来实现分布式锁。

      <dependency>
          <groupId>org.redisson</groupId>
          <artifactId>redisson</artifactId>
      </dependency>
      

            代码逻辑大致和非集群的类似,redisson拿到的锁是有超时时间的,时间到了自动释放。当事件A同时被Bot A、Bot B监听时,若Bot A拿到了锁但是消费事件时突然宕机,无法正常释放锁。若锁的超时时间大于事件Ids的有效时间,那Bot B可以正常消费事件。反之Bot B会被系统判定为事件重复消费。

      @Component
      @RequiredArgsConstructor
      public class GroupMessageHandler {
          private final RedissonClient redissonClient;
          private final RepetitiveGroupService repetitiveGroupService;
          @Resource
          private ReactiveZSetOperations<Long, Integer> reactiveZSetOperations;
      
          public void handle(@NotNull GroupMessageEvent event) {
              long groupId = event.getGroup().getId();
              long botId = event.getBot().getId();
              RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
                      .share()
                      .block();
      
              RLock lock = null;
              if (hasBeenConsumed(event, groupId)) return;
      
              if (containMultiBots(repetitiveGroup, botId)) {
                  lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
                  lock.lock(20, TimeUnit.SECONDS);
              }
      
              consume();
      
              if (lock != null) lock.unlock();
          }
      
          //    检查事件是否以及被消费过
          private boolean hasBeenConsumed(@NotNull GroupMessageEvent event, long groupId) {
      
              long second = new Date().getTime();
              Range<Double> range = Range.closed(0.0, second + 0.0);
              reactiveZSetOperations.removeRangeByScore(groupId, range);
      
              int[] ids = event.getSource().getIds();
              List<DefaultTypedTuple<Integer>> list = new ArrayList<>();
              for (int id : ids) {
                  list.add(new DefaultTypedTuple<>(id, second + 10.0));
              }
      
              Long unExist = reactiveZSetOperations.addAll(groupId, list)
                      .share()
                      .block();
      
              return unExist == null || unExist == 0;
          }
      
          //    检查这个群是否有多个机器人监听群消息
          private boolean containMultiBots(RepetitiveGroup repetitiveGroup, Long botId) {
              return repetitiveGroup.getId() != null && repetitiveGroup.getBotIds().contains(botId);
          }
      
          //    消费事件
          private void consume() {
      
          }
      }
      

      其他业务对多Bot的适配

            RepetitiveGroup,存储加入了多个机器人的群

      @Data
      @AllArgsConstructor
      @NoArgsConstructor
      public class RepetitiveGroup implements Serializable {
          private static final long serialVersionUID = 1L;
      
          @Id
          private Long id;
          private List<Long> botIds;
      }
      

            RepetitiveGroupService实现CRUD

      @Service
      public class RepetitiveGroupService {
          @Resource
          private ReactiveRedisOperations<String, RepetitiveGroup> reactiveRedisOperations;
          public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
      
          public Mono<RepetitiveGroup> get(Long groupId) {
              return reactiveRedisOperations.opsForHash()
                      .get(HASH_KEY, groupId)
                      .map(v -> (RepetitiveGroup) v)
                      .switchIfEmpty(Mono.defer(() -> Mono.just(new RepetitiveGroup())));
          }
      
          public Flux<RepetitiveGroup> getAll() {
              return reactiveRedisOperations.opsForHash()
                      .values(HASH_KEY)
                      .map(v -> (RepetitiveGroup) v);
          }
      
          public Mono<Boolean> upset(RepetitiveGroup repetitiveGroup) {
              return reactiveRedisOperations.opsForHash()
                      .put(HASH_KEY, repetitiveGroup.getId(), repetitiveGroup);
          }
      
          public Mono<Boolean> delete(RepetitiveGroup repetitiveGroup) {
              return reactiveRedisOperations.opsForHash()
                      .remove(HASH_KEY, repetitiveGroup.getId())
                      .map(v -> v.equals(1L));
          }
      }
      

            BotOnlineHandler:

      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class BotOnlineHandler {
          private final SentinelService sentinelService;
          private final RepetitiveGroupService repetitiveGroupService;
          @Resource
          private ReactiveSetOperations<String, Long> reactiveSetOperations;
      
          public void handle(@NotNull BotOnlineEvent event) {
              Bot bot = event.getBot();
              long id = bot.getId();
      
              log.info("******BotOnlineHandler:QQ:{}", id);
      
              List<Bot> instances = Bot.getInstances();
      
              Set<Long> groupIds = bot.getGroups()
                      .parallelStream()
                      .map(Group::getId)
                      .collect(Collectors.toSet());
      
              groupIds.parallelStream()
                      .forEach(v -> reactiveSetOperations.add(id + "", v));
      
              instances.removeIf(v -> v.getId() == id);
              instances.parallelStream()
                      .forEach(v -> sinter(id, v.getId()));
      
          }
      
          private void sinter(Long self, Long target) {
              ArrayList<Long> list = new ArrayList<>();
              list.add(self);
              list.add(target);
      
              reactiveSetOperations.intersect(self + "", target + "")
                      .subscribe(v -> insert(v, list));
          }
      
          private void insert(Long groupId, List<Long> botIds) {
              RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(groupId)
                      .share()
                      .block();
      
              if (repetitiveGroup.getId() == null)
                  repetitiveGroup.setBotIds(new ArrayList<>());
      
              Set<Long> set = new HashSet<>(botIds);
              set.addAll(repetitiveGroup.getBotIds());
              repetitiveGroup.setBotIds(new ArrayList<>(set));
              repetitiveGroupService.upset(repetitiveGroup);
          }
      }
      

            BotOfflineHandler:

      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class BotOfflineHandler {
          private final RepetitiveGroupService repetitiveGroupService;
      
          public void handle(@NotNull BotOfflineEvent event) {
              Bot bot = event.getBot();
              long id = bot.getId();
              String reason = "未知";
      
              if (event instanceof BotOfflineEvent.Active)
                  reason = "主动下线";
      
              if (event instanceof BotOfflineEvent.Force)
                  reason = "被挤下线";
      
              if (event instanceof BotOfflineEvent.Dropped)
                  reason = "被服务器断开或因网络问题而掉线";
      
              if (event instanceof BotOfflineEvent.RequireReconnect)
                  reason = "服务器主动要求更换另一个服务器";
      
              log.info("******BotOfflineHandler:QQ:{},Reason:{}", id, reason);
      
              repetitiveGroupService.getAll()
                      .parallel()
                      .runOn(Schedulers.parallel())
                      .sequential()
                      .filter(v -> v.getBotIds().contains(id))
                      .map(v -> updateBotIdList(id, v))
                      .subscribe(this::updateOrDeleteRepetitiveGroup);
      
              bot.close();
          }
      
          @NotNull
          private RepetitiveGroup updateBotIdList(long id, RepetitiveGroup repetitiveGroup) {
              List<Long> botIds = repetitiveGroup.getBotIds();
              botIds.remove(id);
              repetitiveGroup.setBotIds(botIds);
              return repetitiveGroup;
          }
      
          private void updateOrDeleteRepetitiveGroup(RepetitiveGroup repetitiveGroup) {
              if (repetitiveGroup.getBotIds().size() < 2) {
                  repetitiveGroupService.delete(repetitiveGroup);
              } else {
                  repetitiveGroupService.upset(repetitiveGroup);
              }
          }
      }
      

            MemberJoinHandler:

      @Component
      @RequiredArgsConstructor
      public class MemberJoinHandler {
          private final RedissonClient redissonClient;
          private final RepetitiveGroupService repetitiveGroupService;
      
          public void handle(@NotNull MemberJoinEvent event) {
              long memberId = event.getMember().getId();
              Group group = event.getGroup();
              if (event.getBot().getId() == memberId) return;
      
              RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
              lock.lock(30, TimeUnit.SECONDS);
      
              List<Long> idList = Bot.getInstances()
                      .parallelStream()
                      .map(Bot::getId)
                      .collect(Collectors.toList());
      
              if (!idList.contains(memberId)) return;
      
              RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
                      .share()
                      .block();
      
              List<Long> botIds = idList.parallelStream()
                      .filter(group::contains)
                      .collect(Collectors.toList());
      
              if (repetitiveGroup.getId() != null && botIds.size() == repetitiveGroup.getBotIds().size()) return;
      
              repetitiveGroup.setId(group.getId());
              repetitiveGroupService.upset(repetitiveGroup);
      
              lock.unlock();
          }
      }
      

            MemberLeaveHandler:

      @Component
      @RequiredArgsConstructor
      public class MemberLeaveHandler {
          private final RedissonClient redissonClient;
          private final RepetitiveGroupService repetitiveGroupService;
      
          public void handle(@NotNull MemberLeaveEvent event) {
              long memberId = event.getMember().getId();
              Group group = event.getGroup();
              if (event.getBot().getId() == memberId) return;
      
              RLock lock = redissonClient.getLock(IdUtil.fastSimpleUUID());
              lock.lock(30, TimeUnit.SECONDS);
      
              List<Long> idList = Bot.getInstances()
                      .parallelStream()
                      .map(Bot::getId)
                      .collect(Collectors.toList());
      
              if (!idList.contains(memberId)) return;
      
              RepetitiveGroup repetitiveGroup = repetitiveGroupService.get(group.getId())
                      .share()
                      .block();
      
              if (repetitiveGroup.getId() == null) return;
      
              List<Long> botIds = idList.parallelStream()
                      .filter(group::contains)
                      .collect(Collectors.toList());
      
              if (botIds.size() > 1) {
                  repetitiveGroup.setId(group.getId());
                  repetitiveGroup.setBotIds(botIds);
                  repetitiveGroupService.upset(repetitiveGroup);
              }
      
              if (botIds.size() == 1)
                  repetitiveGroupService.delete(repetitiveGroup);
      
              lock.unlock();
          }
      }
      

            至此,Bot相关配置到此结束,下面介绍Nacos↓↓↓


      Nacos

      整合Nacos

            这里只做Nacos Client的相关配置,Nacos Server端的请参阅Nacos官方文档
      相关依赖:(注:因为要配合OpenFeign,所以将ribbon替换成了Loadbalancer)

      <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
      </dependency>
      <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
          <exclusions>
              <exclusion>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
              </exclusion>
          </exclusions>
      </dependency>
      

      接入配置中心

            本项目中使用Nacos的主要目的还是其动态下发配置的便利。
      先设置bootstrap.yml,高版本的Spring Boot不支持bootstrap.yml,可以引入以下依赖:

      <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-starter-bootstrap</artifactId>
      </dependency>
      

            bootstrap.yml配置如下:

      spring:
        application:
          name: qqrobot
      
        cloud:
          nacos:
            server-addr: localhost:8848
            config:
      #       如果在Nacos Web端没设置命名空间的可以删除
              namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
      #       可以指定配置文件名,也可以不配置,用默认规则
              name: qqrobot-dev.yml
          
        main:
      #    允许Bean覆盖
          allow-bean-definition-overriding: true
      

            使用@EnableDiscoveryClient就可以让模块注册进Nacos了

      设置配置文件

            Nacos Web端设置配置文件的相关操作我这边直接跳过,说说后端这边,
      可以用@Value或@ConfigurationProperties(prefix = "xxx")取值,使用@RefreshScope注解即可开启自动刷新

      @Data
      @Component
      @RefreshScope
      public class DynamicProperty {
      
          @Value("${accounts}")
          private String accounts;
      
      }
      

            这边说几个坑。有几个List写法我试了几次,后端自动装配没成功(从Nacos获取配置)
            list<Bean>

      test:
        list:
          - a:1
            b:1
      
      test:
        list:
          - {a:1,b:1}
      

            List<Map<String,String>>

      test:
        list:
          - a:1
            b:1
      

            所以我建议JSON

      监听Nacos配置刷新事件

            当Nacos刷新本地项目的配置后会被RefreshEventListener(org.springframework.cloud.endpoint.event.RefreshEventListener)监听,通过参考onApplicationEvent()方法我们可以实现一个监听器
            其中robotService是Bot的控制类,示例中我将机器人账号信息存入Nacos,当相关信息刷新后调用refreshBot()更新相关机器人的状态(上线、下线)

      @Configuration
      @RequiredArgsConstructor
      public class RefreshEventListenerConfig {
          private final RobotService robotService;
          private AtomicBoolean ready = new AtomicBoolean(false);
      
          @EventListener
          public void listen(ApplicationEvent event) {
              if (event instanceof ApplicationReadyEvent)
                  this.ready.compareAndSet(false, true);
      
              if (event instanceof RefreshEvent && this.ready.get())
                  robotService.refreshBot();
          }
      }
      

            至此,Nacos相关的适配到此结束


      Sentinel

            接入Sentinel的目的是进行动态流控,当机器人全部离线时,如果有请求打进来,立刻执行兜底方法,并存储请求参数做延迟处理。当机器人至少有一个在线时则开放端口,同时对已存储的请求进行消费。这块功能未来会持续优化。

      整合Sentinel

            我们要引入sentinel组件,sentinel对web框架的适配(这里我选择了webflux)以及配置持久化方式

      <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
      </dependency>
      <dependency>
          <groupId>com.alibaba.csp</groupId>
          <artifactId>sentinel-spring-webflux-adapter</artifactId>
      </dependency>
      <dependency>
          <groupId>com.alibaba.csp</groupId>
          <artifactId>sentinel-datasource-nacos</artifactId>
      </dependency>
      

      配置Sentinel客户端

            在bootstrap.yml中配置Sentinel Dashboard的地址

      spring:
        cloud:
          sentinel:
            transport:
              dashboard: localhost:38081
              port: 38719
      

            这里只做Sentinel客户端的适配,服务端配置请参考Sentinel官方文档

      @Configuration
      @RequiredArgsConstructor
      public class SentinelConfig {
          private final List<ViewResolver> viewResolvers;
          private final ServerCodecConfigurer serverCodecConfigurer;
      
          @Bean
          @Order(-1)
          public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() {
              return new SentinelBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
          }
      
          @Bean
          @Order(-1)
          public SentinelWebFluxFilter sentinelWebFluxFilter() {
              return new SentinelWebFluxFilter();
          }
      }
      

      自定义流控返回

            先写一个请求参数存储类UnhandledHttpRequest

      @Data
      @AllArgsConstructor
      public class UnhandledHttpRequest implements Serializable {
          private static final long serialVersionUID = 1L;
      
          @Id
          private String id;
          private String body;
          private String method;
      
          public UnhandledHttpRequest() {
              id = IdUtil.fastSimpleUUID();
          }
      
          public UnhandledHttpRequest(String body) {
              id = IdUtil.fastSimpleUUID();
              this.body = body;
          }
      }
      

            相关的服务类UnhandledHttpRequestService

      @Service
      public class UnhandledHttpRequestService {
          @Resource
          private ReactiveRedisOperations<String, UnhandledHttpRequest> reactiveRedisOperations;
          public static final String HASH_KEY = RepetitiveGroup.class.getSimpleName();
      
          public Mono<UnhandledHttpRequest> get(String id) {
              return reactiveRedisOperations.opsForHash()
                      .get(HASH_KEY, id)
                      .map(v -> (UnhandledHttpRequest) v)
                      .switchIfEmpty(Mono.defer(() -> Mono.just(new UnhandledHttpRequest())));
          }
      
          public Flux<UnhandledHttpRequest> getAll() {
              return reactiveRedisOperations.opsForHash()
                      .values(HASH_KEY)
                      .map(v -> (UnhandledHttpRequest) v);
          }
      
          public Mono<Boolean> upset(UnhandledHttpRequest unhandledHttpRequest) {
              return reactiveRedisOperations.opsForHash()
                      .put(HASH_KEY, unhandledHttpRequest.getId(), unhandledHttpRequest);
          }
      
          public Mono<Boolean> delete(UnhandledHttpRequest unhandledHttpRequest) {
              return reactiveRedisOperations.opsForHash()
                      .remove(HASH_KEY, unhandledHttpRequest.getId())
                      .map(v -> v.equals(1L));
          }
      }
      

            最后定义一个返回内容处理类,WebFlux这边获取request的body没有Servlet容易,我试了几种方法都失败了,示例代码仅展示获取在URL上的请求参数。

      @Component
      @RequiredArgsConstructor
      public class RobotBlockExceptionHandler implements BlockRequestHandler {
          private final UnhandledHttpRequestService unhandledHttpRequestService;
      
          @SneakyThrows
          @Override
          public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) {
              URI uri = exchange.getRequest().getURI();
              JSONObject jsonObject = new JSONObject();
              jsonObject.putAll(decodeBody(uri.getRawQuery()));
      
              UnhandledHttpRequest unhandledHttpRequest = new UnhandledHttpRequest(jsonObject.toJSONString());
              if (uri.getPath().contains("/send/msg")) {
                  unhandledHttpRequest.setMethod("/send/msg");
              } else {
                  unhandledHttpRequest.setMethod("/collect/msg");
              }
      
              unhandledHttpRequestService.upset(unhandledHttpRequest);
              jsonObject.clear();
              jsonObject.put("msg", "机器人服务正忙,请稍后重试");
              jsonObject.put("code", HttpStatus.TOO_MANY_REQUESTS.value());
      
              return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
                      .contentType(MediaType.APPLICATION_JSON)
                      .bodyValue(jsonObject.toJSONString());
          }
      
          private Map<String, Object> decodeBody(String body) {
              if (StrUtil.isBlank(body)) return new HashMap<>();
      
              if (body.contains("&") && body.contains("="))
                  return Arrays.stream(body.split("&"))
                          .map(s -> s.split("="))
                          .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1]));
      
              return JSON.parseObject(body)
                      .getInnerMap();
          }
      }
      

      其他类对Sentinel的适配

      逻辑

            我这边仅仅简单实现了,当Bot全部离线时,相关端口全部熔断,当有Bot在线时恢复。这个做法不可实际应用,更好点的做法则是依据request进行的限流。
            BotOfflineHandler

      private final SentinelService sentinelService;
      
      if (Bot.getInstances().isEmpty()) {
          FlowRule rule = new FlowRule("/send/msg");
          rule.setCount(0);
          rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
          rule.setLimitApp("default");
      
          sentinelService.saveOrUpdateFlowRule(rule);
          rule.setResource("/collect/msg");
          sentinelService.saveOrUpdateFlowRule(rule);
      }
      

            BotOnlineHandler

      private final SentinelService sentinelService;
      
      if (Bot.getInstances().isEmpty()) {
          FlowRule rule = new FlowRule("/send/msg");
          rule.setCount(10);
          rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
          rule.setLimitApp("default");
      
          sentinelService.saveOrUpdateFlowRule(rule);
          rule.setResource("/collect/msg");
          sentinelService.saveOrUpdateFlowRule(rule);
      }
      

      配置持久化

            修改bootstrap.yml,这里仅做参考,具体参数详见官网

      spring:
        cloud:
          sentinel:
            transport:
              dashboard: localhost:38081
              port: 38719
            datasource:
              flow-rule:
                nacos:
                  server-addr: localhost:8848
                  username: nacos
                  password: nacos
                  namespace: 3e4a06ab-3139-46e8-bc82-888154383de0
                  data-id: qqrobot-sentinel-flow-dev.json
                  rule-type: flow
      

            至此,Sentinel相关的适配到此结束


      配置参考

      Redis配置参考

      @Configuration
      public class RedisConfig extends CachingConfigurerSupport {
      
          @Bean
          public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
              return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string());
          }
      
          @Bean
          public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) {
              return connectionFactory.getReactiveConnection();
          }
      
          @Bean
          ReactiveRedisOperations<String, Object> reactiveRedisOperations(ReactiveRedisConnectionFactory factory) {
              return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext());
          }
      
          @Bean
          ReactiveSetOperations<String, Object> reactiveSetOperations(ReactiveRedisConnectionFactory factory) {
              return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForSet();
          }
      
          @Bean
          ReactiveZSetOperations<String, Object> reactiveZSetOperations(ReactiveRedisConnectionFactory factory) {
              return new ReactiveRedisTemplate<>(factory, getObjectRedisSerializationContext()).opsForZSet();
          }
      
          private static RedisSerializationContext<String, Object> getObjectRedisSerializationContext() {
              return RedisSerializationContext
                      .<String, Object>newSerializationContext(new GenericFastJsonRedisSerializer())
                      .key(new FastJsonRedisSerializer<>(String.class))
                      .value(new FastJsonRedisSerializer<>(Object.class))
                      .hashKey(new FastJsonRedisSerializer<>(String.class))
                      .hashValue(new FastJsonRedisSerializer<>(Object.class))
                      .build();
          }
      }
      

      Redisson配置参考

      @Configuration
      public class RedissonConfig {
      
          @Bean
          public RedissonClient redissonClient() {
              Config config = new Config();
              config.setLockWatchdogTimeout(10000L);
              SingleServerConfig singleServerConfig = config.useSingleServer();
              singleServerConfig.setPassword("password");
              singleServerConfig.setAddress("redis://localhost:6379");
              singleServerConfig.setDatabase(1);
              return Redisson.create(config);
          }
      }
      
      1 条回复 最后回复 回复 引用 3
      • 8
        840670339 最后由 编辑

        兄弟,有项目地址吗,我git clone一下

        wssy001 1 条回复 最后回复 回复 引用 0
        • wssy001
          wssy001 ⭐2021⭐ @840670339 最后由 编辑

          @840670339 这个项目后续还在更新,完结了再分享吧

          8 1 条回复 最后回复 回复 引用 0
          • 8
            840670339 @wssy001 最后由 编辑

            @wssy001 好的

            wssy001 1 条回复 最后回复 回复 引用 0
            • Orisland
              Orisland 摸鱼 最后由 编辑

              ohhhh刚好毕设用这个,得想办法把mirai塞毕设里。

              1 条回复 最后回复 回复 引用 0
              • wssy001
                wssy001 ⭐2021⭐ 最后由 wssy001 编辑

                第二版

                写在开头

                      第二版主要是为项目加入了RocketMQ和DB。使用RocketMQ可以缓解高并发给DB(特指Mysql)带来的压力、提供Bot消费时突然宕机的基本解决方案,不过避免重复消费目前仍然需要借助Redis缓存消息ID来实现。DB方面我将采用MongoDB(Spring Data MongoDB Reactive)和Mysql(Mybatis Plus)作为示例。
                      之前在群里问了下,发现大部分人对DB的事务了解得不多、也有问我事务是啥,是否美味……所以本次更新暂未整合Seata,若有需要可以留言,我会尽快更新。
                      P.S. 目前更新了Mysql版,MongoDB版稍后奉献。一些代码实际应用中会有坑,我正在看源码,未来会改正。 代码更新完毕,文档更新略慢。

                目录

                1. 主要组件版本控制
                2. 与第一版的文件变动
                3. Mysql
                  3.1 引入Mybatis Plus
                  3.2 扫描Dao
                4. MongoDB
                  4.1 [引入Spring Data MongoDB Reactive](#引入Spring Data MongoDB Reactive)
                  4.2 编写entity
                5. RocketMQ
                  5.1 配置RocketMQ
                  5.2 GroupMessageHandler
                  5.3 ImageMessageSorter
                  5.4 PlainTextMessageSorter
                  5.5 UnhandledSendMessageSorter
                  5.6 ImageMessageDBConsumer
                  5.7 PlainTextMessageDBConsumer
                  5.8 UnhandledPlainTextMessageConsumer
                6. DTO
                7. 其他
                8. 参考配置
                  6.1 MybatisGeneratorTest

                第二版主要组件版本控制:

                Mybatis Plus Mybatis Plus Generator Velocity Engine Core Rocket MQ
                3.4.3 3.5.1 2.2 2.2.1

                与第一版的文件变动

                      删除了:RepetitiveGroup(entity)
                      删除了:MemberJoinHandler、MemberLeaveHandler(handler)
                      删除了:RepetitiveGroupService(service)

                      修改了:RobotService#handleStoredHttpRequest(service)


                Mysql

                      Mybatis Plus,互联网人都爱它。

                引入Mybatis Plus

                      主POM.xml引入Mybatis Plus、Mybatis Plus Generator以及模板引擎,这里用Velocity

                <properties>
                    <mybatis.plus.version>3.4.3</mybatis.plus.version>
                    <mybatis.plus.generator.version>3.5.1</mybatis.plus.generator.version>
                    <velocity.version>2.2</velocity.version>
                    <rocketMQ.version>2.2.1</rocketMQ.version>
                </properties>
                
                <!--            ORM-->
                <dependency>
                    <groupId>com.baomidou</groupId>
                    <artifactId>mybatis-plus-boot-starter</artifactId>
                    <version>${mybatis.plus.version}</version>
                </dependency>
                <dependency>
                    <groupId>com.baomidou</groupId>
                    <artifactId>mybatis-plus-generator</artifactId>
                    <version>${mybatis.plus.generator.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.velocity</groupId>
                    <artifactId>velocity-engine-core</artifactId>
                    <version>${velocity.version}</version>
                </dependency>
                

                      模块中引入

                <dependency>
                    <groupId>com.baomidou</groupId>
                    <artifactId>mybatis-plus-boot-starter</artifactId>
                </dependency>
                <dependency>
                    <groupId>com.baomidou</groupId>
                    <artifactId>mybatis-plus-generator</artifactId>
                </dependency>
                <dependency>
                    <groupId>mysql</groupId>
                    <artifactId>mysql-connector-java</artifactId>
                    <scope>runtime</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.velocity</groupId>
                    <artifactId>velocity-engine-core</artifactId>
                </dependency>
                

                      连接池我喜欢用hikari(号称全球最快的),所以使用默认设置。然后利用Mybatis Plus Generator生成entity,dao,service……相关类即可,参看配置可以见目录 -> 配置参考。

                扫描Dao

                      这块做法有俩,一:在每个Dao上添加@Mapper注解,Mybatis Plus将会自动扫描。二:配置类上使用@MapperScan("cyou.wssy001.cloud.bot.dao"),Mybatis Plus将会扫描cyou.wssy001.cloud.bot.dao下所有Dao。

                      Mybatis Plus相关的配置到此结束


                MongoDB

                      听说用了MongoDB,妈妈再也不用担心分库分表了。

                引入Spring Data MongoDB Reactive

                      废话不多说,直接上GAV

                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
                </dependency>
                

                application.yml

                spring:
                  data:
                    mongodb:
                      uri: mongodb://root:wssy001@10.60.64.66:27017
                      database: back_up
                    # 项目中没有Dao,所以选none  
                      repositories:
                        type: none
                

                编写entity

                      MongoDB里每张表都自带一个_id字段代表id,不过默认是String类型,当然我们可以自己实现数值类型的自增id。

                大坑

                      MongoDB里没有Mysql的时区概念(一律UTC),如果要存储时间需要特殊处理,否则会发生以下问题(以java.util.Date,北京时间为例)(存储一个time为2021-12-26 20:00:00、id为“111”的对象,存入MongoDB后会存在一条id为"111",time为2021-12-26 12:00:00的记录;从MongoDB获取id为“111”的记录存入本地,打印显示该对象的time为2021-12-26 12:00:00)。当时网上搜了不少解决方案,但没有生效的,后来想到了LocalDateTime。

                @Data
                @Builder
                @Document
                @AllArgsConstructor
                public class TImage {
                
                    /**
                     * ID
                     */
                    @Id
                    private String id;
                
                    /**
                     * mirai消息ID
                     */
                    private Integer miraiId;
                
                    /**
                     * url
                     */
                    private String url;
                
                    /**
                     * 本地路径
                     */
                    private String path;
                
                    /**
                     * 机器人账号
                     */
                    private Integer botNumber;
                
                    /**
                     * 群号
                     */
                    private Integer groupNumber;
                
                    /**
                     * 好友账号
                     */
                    private Integer friendNumber;
                
                    /**
                     * 创建时间
                     */
                    private LocalDateTime createTime;
                
                    /**
                     * 更新时间
                     */
                    private LocalDateTime updateTime;
                
                    /**
                     * 是否启用
                     */
                    private Boolean enable;
                
                    @Override
                    public boolean equals(Object o) {
                        if (this == o) return true;
                        if (o == null || getClass() != o.getClass()) return false;
                        TImage image = (TImage) o;
                        return Objects.equals(id, image.id) &&
                                Objects.equals(miraiId, image.miraiId);
                    }
                
                    @Override
                    public int hashCode() {
                        return Objects.hash(id, miraiId);
                    }
                }
                

                      Spring Data MongoDB Reactive相关的配置到此结束


                RocketMQ

                      本教程不涉及RocketMQ的安装,仅涉及与Mirai相关的适配设置。
                      使用RocketMQ而不是Kafka的主要原因就是前者有事务消息,只是目前不适配Seata。

                配置RocketMQ

                      引入RocketMQ
                主POM

                <properties>
                    <rocketMQ.version>2.2.1</rocketMQ.version>
                </properties>
                
                <!--            RocketMQ-->
                <dependency>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-spring-boot-starter</artifactId>
                    <version>${rocketMQ.version}</version>
                </dependency>
                

                模块

                <dependency>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-spring-boot-starter</artifactId>
                </dependency>
                

                向application.yml增加配置

                rocketmq:
                  name-server: localhost:39876
                  producer:
                    group: group1
                

                      由于我是用的是RocketMQ Spring Boot Starter,翻看DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently#consumeMessage 源码可以得知:使用@RocketMQMessageListener不太好实现批量拉取并消费。但为了网络优化,我们可以注入一个DefaultMQPushConsumer。

                      编写RocketMQConfig

                @Slf4j
                @Setter
                @Configuration
                @RequiredArgsConstructor
                public class RocketMQConfig {
                    @Value("${rocketmq.name-server}")
                    private String nameServer;
                
                    private final ImageMessageDBConsumer imageMessageDBConsumer;
                    private final PlainTextMessageDBConsumer plainTextMessageDBConsumer;
                    private final UnhandledSendMessageSorter unhandledSendMessageSorter;
                
                    @Bean
                    public DefaultMQPushConsumer imageMessageDBBatchConsumer() {
                        DefaultMQPushConsumer consumer = getDefaultBatchConsumer();
                        consumer.setNamesrvAddr(nameServer);
                        consumer.registerMessageListener(imageMessageDBConsumer);
                        consumer.setConsumerGroup("image-db");
                        try {
                            consumer.subscribe("image-message", "");
                            consumer.start();
                        } catch (Exception e) {
                            log.info("******Exception:{}", e.getMessage());
                        }
                        return consumer;
                    }
                
                    @Bean
                    public DefaultMQPushConsumer plainTextMessageDBBatchConsumer() {
                        DefaultMQPushConsumer consumer = getDefaultBatchConsumer();
                        consumer.registerMessageListener(plainTextMessageDBConsumer);
                        consumer.setConsumerGroup("plain-text-db");
                        try {
                            consumer.subscribe("plain-text-message", "");
                            consumer.start();
                        } catch (Exception e) {
                            log.info("******Exception:{}", e.getMessage());
                        }
                        return consumer;
                    }
                
                    @Bean
                    public DefaultMQPushConsumer unhandledGroupMessageConsumer() {
                        DefaultMQPushConsumer consumer = getDefaultBatchConsumer();
                        consumer.registerMessageListener(unhandledSendMessageSorter);
                        consumer.setConsumerGroup("group-message");
                        try {
                            consumer.subscribe("unhandled-group-message", "");
                            consumer.start();
                        } catch (Exception e) {
                            log.info("******Exception:{}", e.getMessage());
                        }
                        return consumer;
                    }
                
                    private DefaultMQPushConsumer getDefaultBatchConsumer() {
                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
                        consumer.setNamesrvAddr(nameServer);
                        consumer.setPullInterval(2000);
                        // 设置拉取消息的线程最大,最小值
                        consumer.setConsumeThreadMax(2);
                        consumer.setConsumeThreadMin(1);
                        consumer.setPullBatchSize(16);
                        consumer.setConsumeMessageBatchMaxSize(16);
                        return consumer;
                    }
                }
                

                LogSendCallbackService

                      MQ我喜欢用异步发送,因此需要自己编写一个回调方法,这里贴上一个LogSendCallbackService,但是异步发送时只有消费成功才会给相关消息的信息。

                @Slf4j
                @Service
                public class LogSendCallbackService implements SendCallback {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        log.info("******发送成功!,消息ID:{}", sendResult.getMsgId());
                    }
                
                    @Override
                    public void onException(Throwable e) {
                        log.info("******发送失败!,错误信息:{}", e.getMessage());
                    }
                }
                

                GroupMessageHandler

                      GroupMessageHandler,单纯的producer。获取所有的群消息,封装后发送至MQ,交给后续的Sorter分拣。
                      rocketMQTemplate.asyncSend()方法中可以直接传入一个Bean,但是会自动生成消息Id,故我选择手动封装一个消息,自定义消息Id更适合业务场景。

                 @Component
                @RequiredArgsConstructor
                public class GroupMessageHandler {
                    @Resource
                    private ReactiveRedisOperations<String, MessageChainDto> reactiveRedisOperations;
                
                    private final RocketMQTemplate rocketMQTemplate;
                    private final LogSendCallbackService logSendCallbackService;
                
                    public void handle(@NotNull GroupMessageEvent event) {
                        String miraiCode = MiraiCode.serializeToMiraiCode(event.getMessage().iterator());
                
                        StringBuffer stringBuffer = new StringBuffer();
                        int[] ids = event.getSource().getIds();
                        Arrays.stream(ids)
                                .forEach(stringBuffer::append);
                
                        String key = stringBuffer.toString();
                
                        MessageChainDto messageChainDto = new MessageChainDto();
                        messageChainDto.setIds(ids);
                        messageChainDto.setBotAccount(event.getBot().getId());
                        messageChainDto.setGroupNumber(event.getGroup().getId());
                        messageChainDto.setMiraiCode(miraiCode);
                
                        Boolean nonExist = reactiveRedisOperations.opsForValue()
                                .setIfAbsent(key, messageChainDto)
                                .share()
                                .block();
                
                        if (!nonExist) return;
                
                        Duration ttl = Duration.ofSeconds(5);
                        reactiveRedisOperations.expire(key, ttl);
                
                        Message<MessageChainDto> message = MessageBuilder.withPayload(messageChainDto)
                                .setHeader("KEYS", "GroupMessage_" + key)
                                .build();
                        rocketMQTemplate.asyncSend("group-message", message, logSendCallbackService);
                    }
                
                }
                

                ImageMessageSorter

                      ImageMessageSorter,监听GroupMessageEvent中发送的QQ群消息,从中获取Image相关的,封装处理后发送至DBConsumer。

                @Component
                @RequiredArgsConstructor
                @RocketMQMessageListener(topic = "group-message", consumerGroup = "image-message")
                public class ImageMessageSorter implements RocketMQListener<String> {
                    private final RocketMQTemplate rocketMQTemplate;
                    private final LogSendCallbackService logSendCallbackService;
                
                    @Override
                    public void onMessage(String message) {
                        MessageChainDto messageChainDto = JSON.parseObject(message, MessageChainDto.class);
                        int[] ids = messageChainDto.getIds();
                
                        MessageChain messageChain = MiraiCode.deserializeMiraiCode(messageChainDto.getMiraiCode());
                        ImageDto imageDto = new ImageDto();
                        imageDto.setBotAccount(messageChainDto.getBotAccount());
                        imageDto.setGroupNumber(messageChainDto.getGroupNumber());
                
                        List<ImageDto> imageDtoList = new ArrayList<>();
                        for (int i = 0; i < messageChain.size(); i++) {
                            SingleMessage singleMessage = messageChain.get(i);
                            if (singleMessage instanceof Image)
                                imageDtoList.add(saveImage(imageDto, (Image) singleMessage, ids[i]));
                        }
                
                        if (imageDtoList.isEmpty()) return;
                        send(imageDtoList);
                    }
                
                
                    //    存储图片
                    private ImageDto saveImage(ImageDto imageDto, Image image, Integer miraiId) {
                        imageDto.setMiraiId(miraiId);
                        String url = Image.queryUrl(image);
                
                        return imageDto;
                    }
                
                    @SneakyThrows
                    private void send(List<ImageDto> imageDtoList) {
                        List<Message<ImageDto>> messageList = imageDtoList.parallelStream()
                                .map(v -> MessageBuilder.withPayload(v)
                                        .setHeader("KEYS", "ImageDto_" + v.getId())
                                        .build())
                                .collect(Collectors.toList());
                
                        rocketMQTemplate.asyncSend("image-message", messageList, logSendCallbackService);
                    }
                }
                

                PlainTextMessageSorter

                      PlainTextMessageSorter,监听GroupMessageEvent中发送的QQ群消息,从中获取Plain Text相关的,封装处理后发送至DBConsumer。

                @Component
                @RequiredArgsConstructor
                @RocketMQMessageListener(topic = "group-message", consumerGroup = "plain-text-message")
                public class PlainTextMessageSorter implements RocketMQListener<String> {
                    private final RocketMQTemplate rocketMQTemplate;
                    private final LogSendCallbackService logSendCallbackService;
                
                    @Override
                    public void onMessage(String message) {
                        MessageChainDto messageChainDto = JSON.parseObject(message, MessageChainDto.class);
                        int[] ids = messageChainDto.getIds();
                
                        MessageChain messageChain = MiraiCode.deserializeMiraiCode(messageChainDto.getMiraiCode());
                        PlainTextDto plainTextDto = new PlainTextDto();
                        plainTextDto.setBotAccount(messageChainDto.getBotAccount());
                        plainTextDto.setGroupNumber(messageChainDto.getGroupNumber());
                
                        List<PlainTextDto> plainTextDtoList = new ArrayList<>();
                        for (int i = 0; i < messageChain.size(); i++) {
                            SingleMessage singleMessage = messageChain.get(i);
                            if (singleMessage instanceof PlainText)
                                plainTextDtoList.add(box(plainTextDto, (PlainText) singleMessage, ids[i]));
                        }
                
                        if (plainTextDtoList.isEmpty()) return;
                        send(plainTextDtoList);
                    }
                
                    private PlainTextDto box(PlainTextDto plainTextDto, PlainText plainText, Integer miraiCode) {
                        plainTextDto.setMiraiId(miraiCode);
                        plainTextDto.setText(plainText.getContent());
                        return plainTextDto;
                    }
                
                    @SneakyThrows
                    private void send(List<PlainTextDto> plainTextDto) {
                
                        List<Message<PlainTextDto>> messageList = plainTextDto.parallelStream()
                                .map(v -> MessageBuilder.withPayload(v)
                                        .setHeader("KEYS", "PlainTextDto_" + v.getId())
                                        .build())
                                .collect(Collectors.toList());
                
                        rocketMQTemplate.asyncSend("plain-text-message", messageList, logSendCallbackService);
                    }
                }
                

                UnhandledSendMessageSorter

                      UnhandledSendMessageSorter,处理由于Sentinel流控导致未能及时消费Http请求的信息。获取UnhandledHttpRequestDto,封装后发送至UnhandledPlainTextMessageConsumer。

                @Component
                @RequiredArgsConstructor
                public class UnhandledSendMessageSorter implements MessageListenerConcurrently {
                    private final RocketMQTemplate rocketMQTemplate;
                    private final LogSendCallbackService logSendCallbackService;
                
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                        List<Message<UnhandledHttpRequestDto>> messageList = messageExtList.parallelStream()
                                .map(v -> JSON.parseObject(new String(v.getBody()), UnhandledHttpRequest.class))
                                .map(this::toUnhandledHttpRequestDto)
                                .filter(Objects::nonNull)
                                .map(v -> MessageBuilder.withPayload(v)
                                        .setHeader("KEYS", "UnhandledHttpRequestDto_" + v.getId())
                                        .build())
                                .collect(Collectors.toList());
                
                        rocketMQTemplate.asyncSend("unhandled-group-plain-text-message", messageList, logSendCallbackService);
                
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                
                    private UnhandledHttpRequestDto toUnhandledHttpRequestDto(UnhandledHttpRequest unhandledHttpRequest) {
                        if (!unhandledHttpRequest.getMethod().equals("/send/msg") || unhandledHttpRequest.getGroupId() == null)
                            return null;
                        MessageChainBuilder append = new MessageChainBuilder()
                                .append(new PlainText(unhandledHttpRequest.getMsg()))
                                .append(new At(unhandledHttpRequest.getQQ()));
                
                        return new UnhandledHttpRequestDto(unhandledHttpRequest.getId(), MiraiCode.serializeToMiraiCode(append), unhandledHttpRequest.getGroupId());
                
                    }
                
                }
                

                ImageMessageDBConsumer

                      ImageMessageDBConsumer,批量接收来自ImageMessageSorter的信息,批量存入数据库

                @Component
                @RequiredArgsConstructor
                public class ImageMessageDBConsumer implements MessageListenerConcurrently {
                    private final TImageService tImageService;
                
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                        List<TImage> imageList = messageExtList.parallelStream()
                                .map(v -> JSON.parseObject(new String(v.getBody()), ImageDto.class))
                                .map(this::toTImage)
                                .collect(Collectors.toList());
                
                        tImageService.saveBatch(imageList)
                                .share()
                                .collectList()
                                .block();
                
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                
                    private TImage toTImage(ImageDto imageDto) {
                        TImage image = TImage.builder().build();
                        BeanUtil.copyProperties(imageDto, image);
                        return image;
                    }
                }
                

                PlainTextMessageDBConsumer

                      PlainTextMessageDBConsumer,批量接收来自PlainTextMessageSorter的信息,批量存入数据库

                @Component
                @RequiredArgsConstructor
                public class PlainTextMessageDBConsumer implements MessageListenerConcurrently {
                    private final TPlainTextService tPlainTextService;
                
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                
                        List<TPlainText> plainTextList = messageExtList.parallelStream()
                                .map(v -> JSON.parseObject(new String(v.getBody()), PlainTextDto.class))
                                .map(this::toTPlainText)
                                .collect(Collectors.toList());
                
                        tPlainTextService.saveBatch(plainTextList)
                                .share()
                                .collectList()
                                .block();
                
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                
                    private TPlainText toTPlainText(PlainTextDto plainTextDto) {
                        TPlainText plainText = TPlainText.builder().build();
                        BeanUtil.copyProperties(plainTextDto, plainText);
                        return plainText;
                    }
                }
                

                UnhandledPlainTextMessageConsumer

                      UnhandledPlainTextMessageConsumer,通过翻看上层源码,发现只有抛异常才会触发消费失败,然后触发重复投递。

                @Component
                @RequiredArgsConstructor
                @RocketMQMessageListener(topic = "unhandled-group-plain-text-message", consumerGroup = "plain-text-message")
                public class UnhandledPlainTextMessageConsumer implements RocketMQListener<String> {
                    @Resource
                    private ReactiveRedisOperations<String, Long> reactiveRedisOperations;
                
                    @Override
                    @SneakyThrows
                    public void onMessage(String message) {
                        if (Bot.getInstances().isEmpty()) return;
                
                        UnhandledHttpRequestDto unhandledHttpRequestDto = JSON.parseObject(message, UnhandledHttpRequestDto.class);
                
                        Bot bot = Bot.getInstances()
                                .stream()
                                .filter(v -> v.getGroup(unhandledHttpRequestDto.getGroupId()) != null)
                                .findFirst()
                                .orElse(null);
                
                        if (bot == null) throw new RuntimeException("无Bot在目标群");
                
                        String msgId = unhandledHttpRequestDto.getId();
                        Boolean nonExist = reactiveRedisOperations.opsForValue()
                                .setIfAbsent(msgId, bot.getId())
                                .share()
                                .block();
                
                        if (!nonExist) throw new RuntimeException("重复消费");
                
                        Duration ttl = Duration.ofSeconds(5);
                        reactiveRedisOperations.expire(msgId, ttl);
                
                        MessageChain messageChain = MiraiCode.deserializeMiraiCode(unhandledHttpRequestDto.getMiraiCode());
                        bot.getGroup(unhandledHttpRequestDto.getGroupId())
                                .sendMessage(messageChain);
                
                        Thread.sleep(500);
                    }
                }
                

                      RocketMQ相关的配置到此结束


                DTO

                      这里主要是贴一下上文所用到的DTO
                      BaseMessageDto

                @Data
                public abstract class BaseMessageDto {
                    private Long id;
                    private Integer miraiId;
                    private Long botAccount;
                    private Long groupNumber;
                    private Long friendNumber;
                }
                

                      ImageDto

                @Data
                @AllArgsConstructor
                @NoArgsConstructor
                @EqualsAndHashCode(callSuper = true)
                public class ImageDto extends BaseMessageDto implements Serializable {
                    static final long serialVersionUID = 1L;
                
                    private String url;
                    private String path;
                }
                

                      MessageChainDto

                @Data
                @AllArgsConstructor
                @NoArgsConstructor
                @EqualsAndHashCode(callSuper = true)
                public class MessageChainDto extends BaseMessageDto implements Serializable {
                    static final long serialVersionUID = 1L;
                
                    private int[] ids;
                    private String miraiCode;
                
                }
                

                      PlainTextDto

                @Data
                @AllArgsConstructor
                @NoArgsConstructor
                @EqualsAndHashCode(callSuper = true)
                public class PlainTextDto extends BaseMessageDto implements Serializable {
                    static final long serialVersionUID = 1L;
                
                    private String text;
                }
                

                      UnhandledHttpRequestDto

                @Data
                @AllArgsConstructor
                @NoArgsConstructor
                public class UnhandledHttpRequestDto implements Serializable {
                    private static final long serialVersionUID = 1L;
                
                    private String id;
                    private String miraiCode;
                    private Long groupId;
                }
                

                其他

                      RobotService#handleStoredHttpRequest

                private void handleStoredHttpRequest() {
                    List<Message<UnhandledHttpRequest>> messageList = unhandledHttpRequestService.getAll()
                            .filter(Objects::nonNull)
                            .map(v -> MessageBuilder.withPayload(v).build())
                            .share()
                            .collectList()
                            .block();
                
                    if (messageList == null) return;
                
                    rocketMQTemplate.asyncSend("unhandled-group-message", messageList, logSendCallbackService);
                }
                

                参考配置

                      MybatisGeneratorTest

                @Slf4j
                class MybatisGeneratorTest {
                
                    @Test
                    void mybatisPlusGenerator() {
                        FastAutoGenerator.create(getDataSourceConfig())
                                // 全局配置
                                .globalConfig(getGlobalConfig())
                                // 包配置
                                .packageConfig(getPackageConfig())
                                // 策略配置
                                .strategyConfig(getStrategyConfig())
                                .execute();
                    }
                
                    private DataSourceConfig.Builder getDataSourceConfig() {
                        return new DataSourceConfig.Builder("jdbc:mysql://localhost:3306/test?useUnicode=true" +
                                "&useSSL=false&autoReconnect=true&characterEncoding=utf-8&serverTimezone=GMT%2B8" +
                                "&rewriteBatchedStatements=true", "root", "root")
                                .typeConvert(new MySqlTypeConvert())
                                .keyWordsHandler(new MySqlKeyWordsHandler());
                    }
                
                    private Consumer<StrategyConfig.Builder> getStrategyConfig() {
                        return builder -> builder
                //                添加表名
                                .addInclude(
                                        "t_image",
                                        "t_plain_text")
                
                                .entityBuilder()
                                .disableSerialVersionUID()
                                .enableChainModel()
                                .enableLombok()
                                .enableRemoveIsPrefix()
                                .enableTableFieldAnnotation()
                                .enableActiveRecord()
                                // 乐观锁
                                .versionColumnName("version")
                                .versionPropertyName("version")
                                // 逻辑删除
                                .logicDeleteColumnName("enable")
                                .logicDeletePropertyName("enable")
                                .naming(NamingStrategy.underline_to_camel)
                                .columnNaming(NamingStrategy.underline_to_camel)
                                .addTableFills(new Column("create_time", FieldFill.INSERT))
                                .addTableFills(new Column("update_time", FieldFill.INSERT_UPDATE))
                                .idType(IdType.AUTO)
                
                                .serviceBuilder()
                                .formatServiceFileName("%sService")
                                .formatServiceImplFileName("%sServiceImp")
                
                                .mapperBuilder()
                                .formatMapperFileName("%sDAO");
                    }
                
                    private BiConsumer<Function<String, String>, PackageConfig.Builder> getPackageConfig() {
                
                        return (scanner, builder) -> builder.parent("org.test")
                                .moduleName("test")
                                .entity("entity")
                                .service("service")
                                .serviceImpl("service.impl")
                                .mapper("dao")
                                .controller("controller")
                                .other("other");
                    }
                
                    private BiConsumer<Function<String, String>, GlobalConfig.Builder> getGlobalConfig() {
                
                        return (scanner, builder) -> builder.author("yourName")
                                .disableOpenDir()
                                .dateType(DateType.ONLY_DATE)
                                .outputDir(System.getProperty("user.dir") + "/src/main/java")
                                // Swagger 2 API Doc相关注解,Knife4j适用
                //                .enableSwagger()
                                .fileOverride();
                    }
                }
                
                1 条回复 最后回复 回复 引用 0
                • wssy001
                  wssy001 ⭐2021⭐ @840670339 最后由 编辑

                  @840670339 项目公开了,帖子最前面有地址。代码仅供参考

                  1 条回复 最后回复 回复 引用 0
                  • 1 / 1
                  • First post
                    Last post
                  Powered by Mamoe Technologies & NodeBB | 友情链接 | 服务监控 | Contact