Mushroom Notes Mushroom Notes
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档

kinoko

一位兴趣使然的热心码农
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档
  • 方案专题

    • 认证鉴权
    • 接口规范
    • 自动分发架构
      • 高性能计数服务
      • 消息未读数服务
      • 缓存数据库双写一致性问题
      • 优雅的后台操作日志
      • 签到功能
      • 秒杀库存扣减
    • 算法专题

    • BUG专题

    • 安装专题

    • 网安专题

    • 面试专题

    • 专题
    • 方案专题
    kinoko
    2023-12-19
    目录

    自动分发架构方案

    前言: 架构基于策略模式,适用于模型/事件/消息的分发,提供一种实现方案或思路
    场景: 系统在做数据埋点,记录各种用户行为,存在大量的insert请求
    问题: 高并发情况下会占用数据库连接数
    解决: 通过缓冲池收集请求,再批次入库

    # 架构设计

    image.png

    # 组件

    • 事件处理类加载器(处理类加载器)
    • 事件分发处理器(分发处理器)
    • 事件接收者

    # 处理类加载器


    **作用:**扫描上下文中被@EventTopic标注的,并实现了EventHandler接口的处理类,并装载进map中,便于后续分发处理器调用

    /**
     * 事件主题注解
     * 用法:加在对应的事件处理类上并指定事件类型
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface EventTopic {
    
        /**
         * 事件主题类型
         */
        EventTopicEnum value();
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public enum EventTopicEnum implements BaseEnum {
    
        LOGIN_LOGOUT(1, "登录登出事件"),
        QC_READ(2, "读图事件"),
        ;
    
        private final int type;
    
        private final String typeName;
    
        EventTopicEnum(int type, String title) {
            this.type = type;
            this.typeName = title;
        }
    
        @Override
        public int getType() {
            return type;
        }
    
        @Override
        public String getTypeName() {
            return typeName;
        }
    
        public static EventTopicEnum getType(int type) {
            for (EventTopicEnum v : values()) {
                if (v.getType() == type) {
                    return v;
                }
            }
            throw new IllegalArgumentException();
        }
    
        public static boolean isNotExist(int type) {
            for (EventTopicEnum v : values()) {
                if (v.getType() == type) {
                    return false;
                }
            }
            return true;
        }
    
        public static List<EventTopicEnum> getList() {
            return Arrays.asList(values());
    }
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    /**
     * 事件存储(消费者)
     */
    public interface EventHandler {
    
        /**
         * 事件存储(消费)
         * @param events 事件
         */
        void eventHandle(List<EventTracking> events);
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import com.google.common.collect.Maps;
    import com.ydhl.hxpx.api.event.inf.EventHandler;
    import com.ydhl.hxpx.api.event.model.EventTopicEnum;
    import com.ydhl.hxpx.api.event.model.EventTopic;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.annotation.AnnotationUtils;
    
    import java.util.Map;
    
    /**
     * 事件处理类加载器
     */
    @Slf4j
    @Configuration
    public class EventHandlerClassLoader implements ApplicationContextAware {
    
        private final Map<Integer, EventHandler> eventHandlerMap = Maps.newHashMap();
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            log.info("[事件处理类加载器]正在加载事件处理类...");
            // 从上下文中获取带有@Topic注解的事件处理类
            Map<String, Object> topicAnnotatedBeans = applicationContext.getBeansWithAnnotation(EventTopic.class);
            // 放入Map中,以Topic的事件类型为key,处理类为value
            for (Object handler : topicAnnotatedBeans.values()) {
                // 判断是否实现了EventHandler接口
                if (!EventHandler.class.isAssignableFrom(handler.getClass())) {
                    continue;
                }
                // 获取类上的主题事件类型
                EventTopic topicAnnotation = AnnotationUtils.findAnnotation(handler.getClass(), EventTopic.class);
                if (topicAnnotation != null) {
                    EventTopicEnum eventType = topicAnnotation.value();
                    // 放入加载器map
                    eventHandlerMap.put(eventType.getType(), (EventHandler) handler);
                }
            }
        }
    
        @Bean
        public Map<Integer, EventHandler> getEventHandlerMap() {
            log.info("[事件处理类加载器]已扫描的事件处理器:{}", eventHandlerMap);
            return eventHandlerMap;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50

    处理类标注示例:

    @EventTopic(value = EventTopicEnum.QC_READ)
    @Service
    @Slf4j
    public class ReadRecordLogServiceImpl implements ReadRecordLogService, EventHandler {
    }
    
    1
    2
    3
    4
    5

    # 分发处理器


    **作用:**从缓冲池(redis)中获取事件,根据事件类型分发给对应的处理器

    /**
     * 事件跟踪堆栈实体
     */
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    @Data
    public class EventTracking {
    
        // 用户id
        private Long userId;
        // 时间戳
        private Long timestamp;
        // 事件主题
        private Integer eventTopic;
        // 事件参数
        private String eventParams;
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
     * 事件分发处理器
     */
    @Slf4j
    @Component
    public class EventDistributeHandler {
    
        @Resource
        private StringRedisTemplate stringRedisTemplate;
        @Resource
        private Map<Integer, EventHandler> eventHandlerMap;
    
        /**
         * 监听事件并分发
         */
        public void listenAndDistributeEvent() {
            // 加锁(5分钟)
            Boolean isUnLock = stringRedisTemplate.opsForValue().setIfAbsent(RedisKeyCacheConstant.EVENT_DISTRIBUTE_LOCK, "1", 5, TimeUnit.MINUTES);
            if (Boolean.TRUE.equals(isUnLock)) {
                try {
                    // 从缓存的事件缓冲队列中拉取事件
                    List<String> eventJsonList = stringRedisTemplate.opsForList().range(RedisKeyCacheConstant.EVENT_TRACK_QUEUE, 0, -1);
                    if (CollectionUtils.isNotEmpty(eventJsonList)) {
                        log.info("[事件分发处理器] 本次分发事件总数:{}", eventJsonList.size());
                        // JSON转事件实体,并根据事件类型分组,用于后续事件派发
                        Map<Integer, List<EventTracking>> eventTrackingMap = eventJsonList.stream()
                                .map(str -> {
                                    try {
                                        return JSON.parseObject(str, EventTracking.class);
                                    } catch (Exception e) {
                                        log.error("[事件分发处理器] 事件JSON格式化捕获异常:", e);
                                        return null;
                                    }
                                }).filter(Objects::nonNull)
                                .filter(eventTrackingReq -> {
                                    if (eventTrackingReq.getEventTopic() == null || StringUtils.isBlank(eventTrackingReq.getEventParams())) {
                                        log.error("[事件分发处理器] 事件格式错误:{}", eventTrackingReq);
                                        return false;
                                    }
                                    return true;
                                }).collect(Collectors.groupingBy(EventTracking::getEventTopic));
                        // 事件分发
                        try {
                            // 分发处理事件(异步提高消费效率)
                            CompletableFutureUtil.runAsyncForEach(eventTrackingMap.entrySet(), entry -> {
                                try {
                                    Integer eventType = entry.getKey();
                                    List<EventTracking> eventQueue = entry.getValue();
                                    // 从事件处理类加载器中获取加载的处理类
                                    EventHandler handler = eventHandlerMap.get(eventType);
                                    if (handler != null) {
                                        // 消费事件
                                        handler.eventHandle(eventQueue);
                                    }
                                } catch (Exception e) {
                                    log.error("[事件分发处理器] 异步消费事件捕获异常:", e);
                                }
                            });
                        } catch (Exception e) {
                            log.error("[事件分发处理器] 异步消费事件捕获异常:", e);
                        }
                        // 删除事件(事件是往尾部push的,所以删除开头到本次获取长度-1的索引范围)
                        stringRedisTemplate.opsForList().trim(RedisKeyCacheConstant.EVENT_TRACK_QUEUE, eventJsonList.size(), -1);
                        log.info("[事件分发处理器] 分发完毕");
                    }
                } finally {
                    // 移除锁
                    stringRedisTemplate.delete(RedisKeyCacheConstant.EVENT_DISTRIBUTE_LOCK);
                }
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72

    # 事件接受者


    **作用:**接收埋点事件请求,转换为统一事件格式放入缓冲池

    /**
     * 事件接收者(生产者)
     */
    @Component
    public class EventReceiver {
    
        @Resource
        private StringRedisTemplate stringRedisTemplate;
    
        /**
         * 事件接收
         * @param eventType     事件类型
         * @param userId        用户id
         * @param eventParams   事件参数
         */
        public void receive(EventTopicEnum eventType, Long userId, Object eventParams) {
            EventTracking event = EventTracking.builder()
                    .timestamp(System.currentTimeMillis())
                    .eventTopic(eventType.getType())
                    .userId(userId)
                    .eventParams(JSON.toJSONString(eventParams))
                    .build();
            receive(event);
        }
    
        /**
         * 事件接收
         * @param event 事件
         */
        public void receive(EventTracking event) {
            // 往队列中push元素
            stringRedisTemplate.opsForList().rightPush(RedisKeyCacheConstant.EVENT_TRACK_QUEUE, JSON.toJSONString(event));
        }
    
    }	
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    #消息分发#自动订阅
    上次更新: 2023/12/29 11:32:56
    接口规范
    高性能计数服务

    ← 接口规范 高性能计数服务→

    最近更新
    01
    JVM 底层
    09-13
    02
    JVM 理论
    09-13
    03
    JVM 应用
    09-13
    更多文章>
    Theme by Vdoing | Copyright © 2022-2024 kinoko | MIT License | 粤ICP备2024165634号
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式