自动分发架构方案
前言: 架构基于策略模式,适用于模型/事件/消息的分发,提供一种实现方案或思路
场景: 系统在做数据埋点,记录各种用户行为,存在大量的insert请求
问题: 高并发情况下会占用数据库连接数
解决: 通过缓冲池收集请求,再批次入库
# 架构设计

# 组件
- 事件处理类加载器(处理类加载器)
- 事件分发处理器(分发处理器)
- 事件接收者
# 处理类加载器
**作用:**扫描上下文中被@EventTopic标注的,并实现了EventHandler接口的处理类,并装载进map中,便于后续分发处理器调用
/**
* 事件主题注解
* 用法:加在对应的事件处理类上并指定事件类型
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface EventTopic {
/**
* 事件主题类型
*/
EventTopicEnum value();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
public enum EventTopicEnum implements BaseEnum {
LOGIN_LOGOUT(1, "登录登出事件"),
QC_READ(2, "读图事件"),
;
private final int type;
private final String typeName;
EventTopicEnum(int type, String title) {
this.type = type;
this.typeName = title;
}
@Override
public int getType() {
return type;
}
@Override
public String getTypeName() {
return typeName;
}
public static EventTopicEnum getType(int type) {
for (EventTopicEnum v : values()) {
if (v.getType() == type) {
return v;
}
}
throw new IllegalArgumentException();
}
public static boolean isNotExist(int type) {
for (EventTopicEnum v : values()) {
if (v.getType() == type) {
return false;
}
}
return true;
}
public static List<EventTopicEnum> getList() {
return Arrays.asList(values());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 事件存储(消费者)
*/
public interface EventHandler {
/**
* 事件存储(消费)
* @param events 事件
*/
void eventHandle(List<EventTracking> events);
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
import com.google.common.collect.Maps;
import com.ydhl.hxpx.api.event.inf.EventHandler;
import com.ydhl.hxpx.api.event.model.EventTopicEnum;
import com.ydhl.hxpx.api.event.model.EventTopic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.AnnotationUtils;
import java.util.Map;
/**
* 事件处理类加载器
*/
@Slf4j
@Configuration
public class EventHandlerClassLoader implements ApplicationContextAware {
private final Map<Integer, EventHandler> eventHandlerMap = Maps.newHashMap();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
log.info("[事件处理类加载器]正在加载事件处理类...");
// 从上下文中获取带有@Topic注解的事件处理类
Map<String, Object> topicAnnotatedBeans = applicationContext.getBeansWithAnnotation(EventTopic.class);
// 放入Map中,以Topic的事件类型为key,处理类为value
for (Object handler : topicAnnotatedBeans.values()) {
// 判断是否实现了EventHandler接口
if (!EventHandler.class.isAssignableFrom(handler.getClass())) {
continue;
}
// 获取类上的主题事件类型
EventTopic topicAnnotation = AnnotationUtils.findAnnotation(handler.getClass(), EventTopic.class);
if (topicAnnotation != null) {
EventTopicEnum eventType = topicAnnotation.value();
// 放入加载器map
eventHandlerMap.put(eventType.getType(), (EventHandler) handler);
}
}
}
@Bean
public Map<Integer, EventHandler> getEventHandlerMap() {
log.info("[事件处理类加载器]已扫描的事件处理器:{}", eventHandlerMap);
return eventHandlerMap;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
处理类标注示例:
@EventTopic(value = EventTopicEnum.QC_READ)
@Service
@Slf4j
public class ReadRecordLogServiceImpl implements ReadRecordLogService, EventHandler {
}
1
2
3
4
5
2
3
4
5
# 分发处理器
**作用:**从缓冲池(redis)中获取事件,根据事件类型分发给对应的处理器
/**
* 事件跟踪堆栈实体
*/
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class EventTracking {
// 用户id
private Long userId;
// 时间戳
private Long timestamp;
// 事件主题
private Integer eventTopic;
// 事件参数
private String eventParams;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 事件分发处理器
*/
@Slf4j
@Component
public class EventDistributeHandler {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private Map<Integer, EventHandler> eventHandlerMap;
/**
* 监听事件并分发
*/
public void listenAndDistributeEvent() {
// 加锁(5分钟)
Boolean isUnLock = stringRedisTemplate.opsForValue().setIfAbsent(RedisKeyCacheConstant.EVENT_DISTRIBUTE_LOCK, "1", 5, TimeUnit.MINUTES);
if (Boolean.TRUE.equals(isUnLock)) {
try {
// 从缓存的事件缓冲队列中拉取事件
List<String> eventJsonList = stringRedisTemplate.opsForList().range(RedisKeyCacheConstant.EVENT_TRACK_QUEUE, 0, -1);
if (CollectionUtils.isNotEmpty(eventJsonList)) {
log.info("[事件分发处理器] 本次分发事件总数:{}", eventJsonList.size());
// JSON转事件实体,并根据事件类型分组,用于后续事件派发
Map<Integer, List<EventTracking>> eventTrackingMap = eventJsonList.stream()
.map(str -> {
try {
return JSON.parseObject(str, EventTracking.class);
} catch (Exception e) {
log.error("[事件分发处理器] 事件JSON格式化捕获异常:", e);
return null;
}
}).filter(Objects::nonNull)
.filter(eventTrackingReq -> {
if (eventTrackingReq.getEventTopic() == null || StringUtils.isBlank(eventTrackingReq.getEventParams())) {
log.error("[事件分发处理器] 事件格式错误:{}", eventTrackingReq);
return false;
}
return true;
}).collect(Collectors.groupingBy(EventTracking::getEventTopic));
// 事件分发
try {
// 分发处理事件(异步提高消费效率)
CompletableFutureUtil.runAsyncForEach(eventTrackingMap.entrySet(), entry -> {
try {
Integer eventType = entry.getKey();
List<EventTracking> eventQueue = entry.getValue();
// 从事件处理类加载器中获取加载的处理类
EventHandler handler = eventHandlerMap.get(eventType);
if (handler != null) {
// 消费事件
handler.eventHandle(eventQueue);
}
} catch (Exception e) {
log.error("[事件分发处理器] 异步消费事件捕获异常:", e);
}
});
} catch (Exception e) {
log.error("[事件分发处理器] 异步消费事件捕获异常:", e);
}
// 删除事件(事件是往尾部push的,所以删除开头到本次获取长度-1的索引范围)
stringRedisTemplate.opsForList().trim(RedisKeyCacheConstant.EVENT_TRACK_QUEUE, eventJsonList.size(), -1);
log.info("[事件分发处理器] 分发完毕");
}
} finally {
// 移除锁
stringRedisTemplate.delete(RedisKeyCacheConstant.EVENT_DISTRIBUTE_LOCK);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 事件接受者
**作用:**接收埋点事件请求,转换为统一事件格式放入缓冲池
/**
* 事件接收者(生产者)
*/
@Component
public class EventReceiver {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 事件接收
* @param eventType 事件类型
* @param userId 用户id
* @param eventParams 事件参数
*/
public void receive(EventTopicEnum eventType, Long userId, Object eventParams) {
EventTracking event = EventTracking.builder()
.timestamp(System.currentTimeMillis())
.eventTopic(eventType.getType())
.userId(userId)
.eventParams(JSON.toJSONString(eventParams))
.build();
receive(event);
}
/**
* 事件接收
* @param event 事件
*/
public void receive(EventTracking event) {
// 往队列中push元素
stringRedisTemplate.opsForList().rightPush(RedisKeyCacheConstant.EVENT_TRACK_QUEUE, JSON.toJSONString(event));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
上次更新: 2023/12/29 11:32:56