艾迪的技术之路

记录博客与成长

1、背景

我们是做物联网设备的公司,设备初始化会向设备发送API接口下发agora license,当时当时一个环境的全部agora设备都下发出现了问题,具体表现为:

  1. 当前的线程池告警持续触发拒绝策略(调度线程处理任务),导致运维要求我们进行排查
  2. 线程池资源告警则增加资源,环境中agora微服务已经增加到了6台,但是仍然线程的队列容量稳步上升,增加资源并不能解决问题
  3. 因为任务队列容量较多,导致大量agora设备下发agora都是在初始化的5-6h以后了,涉及设备面广,影响较大

2、排查方式

  1. 使用 jstack 1查看当前的线程池开头的线程状态与堆栈
  2. 发现全部的线程池的线程状态都是WATING状态,查看堆栈,都是在等待Redisson的锁释放
  3. 查看了下对应线程池设计的流程,发现在下发license时确实会tongguolock方法加锁
  4. 查看 腾讯云 日志,发现同一台设备在日志中频繁向设备下发agora license,10s一次,正常流程应当至少2min才初始化下发一次

经过逻辑分析后发现原因:

多个设备出故障频繁注册+redisson.lock() 强制等待锁释放造成线程池资源耗尽

  1. 以设备20252025为例,当下发设备license时,第一次会创建一个20252025的锁,但是这个设备也有其他在线程池的进程进行下发,因为使用的是lock,是无限期等待锁,所以线程池资源来利用不起来,导致触发拒绝策略

    redisson创建分布式锁的lua脚本是

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    -- Lua 脚本(原子执行)
    if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
    return redis.call('pttl', KEYS[1]);

    可以看到value值(Value)的结构:
    锁的值存储的是一个字符串(String),内容为锁
    持有者的唯一标识符和可重入计数,格式为:
    ::
    ·UUID:Redisson客户端实例的唯一标识符(每个客户端启动时生成)
    ·ThreadID:持有锁的线程ID(支持可重入锁 同一线程多次加锁)
    ·Count:锁的可重入次数(初始为1,每次重入加1,释放时减1)

    uuid为啥不变 估摸着是服务pod没被杀 自己拉起来时候 这个客户端id不变导致的

  2. 在某一次的k8s滚动更新时,可能因为未在对应时间内执行完线程池任务(如最后30s等待关闭时间内,虽然没有新任务进,但是进程1在最后1s释放了锁,进程2在此时拿到了锁),导致finally代码块的解锁逻辑并未执行,导致该锁一直被占用,新pod启动后又给redLock续期导致该锁一直没有被释放

因为这两个原因导致线上出现问题

3、解决方式

  1. 不能使用lock方法而是tryLock,等待一段时间后释放掉锁;并依据业务重要程度业务补救(如本次获取不到锁,很可能是因为当前有正在下发的任务,顾不需要延时任务或者持久化这种补救代码逻辑)
  2. 推广一下,其他组件如ReentrantLock也有lock和trylock方法,在开发时也尽量使用tryLock

扩展阅读

  1. Java程序员必备:jstack命令解析

前置:

  1. 使用spring项目进行开发
  2. 使用线程池进行项目开发
  3. 有dynamic-tp或者Hippo4J或者其他有以下告警检测的工具对线程池进行监控
  4. (最好)有运维团队或者运维方面配置

任务:

  1. 触发告警后怎么修改线程池参数和线程池告警参数(重要)
  2. 有哪些需要的线程池告警
  3. 线程池告警阈值和线程池参数怎么设置
  4. 触发告警后怎么排查
  5. 怎么根据业务增长重新修改资源与线程池参数

常见线程池告警类型如下,本文也从以下几个方面进行展示:

image-20250712122615965

1、触发告警后怎么修改线程池参数和线程池告警参数

先说结论:

根据线程池告警修改参数导图:

image-20250712122624182

1.1、线程活跃度告警

释义:任务过多,使得线程池中活跃线程占比超过阈值(如99%)

如果核心线程数>最大线程数*80%:该告警基本上无用,可以不使用

如果核心线程数占最大线程数较小:该告警有一定用处,但是不如其他线程池告警

1.2、队列容量告警

释义:任务过多,达到任务队列容量的阈值(如70%)

  1. 如果是任务逐渐处理不过来:

    增加pod,增加核心线程数最大线程数,增加任务队列容量

    如果是突发任务:

    增加任务队列大小容量

1.3、拒绝策略告警

释义:任务过多,线程数量达到最大,且任务队列已满,触发线程池拒绝策略

  1. (线程池告警)降低队列容量告警阈值

    增加 任务排队超时告警、任务执行超时告警

  2. 如果是任务逐渐处理不过来:

    增加pod,增加核心线程数最大线程数,增加任务队列容量

    如果是突发任务:

    增加任务队列大小容量

  3. 优化拒绝策略,尝试自定义拒绝策略(从Netty、Dubbo等框架中学习非JDK8的拒绝策略)

1.4、任务排队超时告警

释义:任务从任务队列到线程池中时,时间过长

优化点:

  1. 增加核心线程数与最大线程数(如原有数量*2)

  2. (其他)单个任务逻辑,优化可以从异步,并发等方面入手

    增加pod数量

1.5、任务执行超时告警

释义:执行期间单个任务直接放入线程中执行,但是整个执行期间过长。

优化点有以下:

  1. (线程池告警)该线程池的任务队列告警阈值需要降低阈值优化,如果是核心业务,则需要减少降低限度(如5%),直至可以在运维发现问题后,有足够时间让开发排查;如果不是核心业务,则可以随便修改,甚至不修改

  2. (线程池参数)增加任务队列容量,如果遇到任务激增,则可以尽量避免触发拒绝策略

    线程存活时间可以增加(1000~10000之间是业务上比较合理的数值),避免比核心线程数多的对应数量线程

    增加核心线程数与最大线程数(如原有数量*2)

  3. (其他)单个任务逻辑,优化可以从异步,并发等方面入手

    启动时预热全部核心线程数量,避免创建线程导致任务消耗时间增加

    增加pod数量

2、有哪些需要的线程池告警

调参通知、队列容量告警、拒绝策略告警必然是需要的
核心微服务:
必须加上 调参通知、队列容量告警、拒绝策略告警
如果是对于设备时效性要求较高=>加上 任务排队超时告警、任务执行超时告警
如果是长任务=>加上 任务执行超时告警、线程池活跃度告警

非核心为服务:
一般只需要 调参通知、队列容量告警、拒绝策略告警,后续根据业务需要增加

告警阈值怎么设定?

  1. 可以优先按照Dynamic-TP的默认阈值
  2. 触发频率可以先按照Dynamic-TP的默认阈值
  3. 可以依据部门要求加上其他要求,后续根据线上实践及时修改
  4. 如果需要告警触发频率限流则需要开发同事新增代码实现

2、线程池告警阈值和线程池参数怎么设置

线程池有以下几种形式:

  1. 线程池为单个spring项目公用、或者是全部服务只有2个线程池,CPU密集型线程池与IO密集型线程池
  2. 线程池根据不同业务分为多个线程池,且父线程与子线程在不同线程池中执行

推荐使用方式2

优先基于一些理论得到合适一些的线程池参数

image-20250712122633081

核心微服务:

基于计算得到的理论线程池参数根据Jmeter进行压侧微调,直至获得最合适的线程池参数

以上是理论上的线程池参数,为了可以更好地使用线程池,还需要根据压力测试对线程池参数进行微调,直至可以获得能最大限度利用资源的。
压测-压测报告对比-调参-再压测。。。重复循环,直到获取到最优的线程池参数配置。

非核心微服务:

不如直接根据理论值配好上线,如果有压力可以立刻增加资源、修改线程池参数。
可有可无的微服务,要避免浪费资源,也需要线程池监控查看配置,此时线程池情况可以做一个较好的缩容的指标。

4、触发告警后怎么进行排查

首先需要明确一下几点:

  1. 收到告警时,需要先看告警是否配置正常
  2. 偶发告警基本上不需要关注,只有遇到持续性告警才需要介入处理

排查思路大概如下:

image-20250712122642995

5、怎么根据业务增长重新修改资源与线程池参数

需要关注的点:

  1. 业务增长怎么转换成可以量化的“需要新增的资源量”

  2. 业务增长后,可以依据原有微调后的参数等比例修改,后再进行压侧/根据实际业务微调参数
    核心微服务可以根据压测修改,非核心业务可以根据CPU核数重新计算。

  3. 业务增长后,监控项是否需要增减,是否需要改阈值
    监控项需要根据业务特性是否发生变更做更改;

阈值由于是一个百分比,可以不变,后续再根据业务预警做修改。

6、推荐阅读

  1. Java线程池实现原理及其在美团业务中的实践
  2. 线程数突增!领导说再这么写就gc掉我
  3. 我决定蹭一下“线程数突增…”这篇文章的热度。

前言

之前我有写过一篇文章,讲述的是怎么使用Dynamic-TP 做一些节省开发的一些问题,但是我感觉不能就直接这么算了,我还想看一下这个框架的一些内部逻辑,避免鸽太久自己以及忘了这回事情或者是还要花比较大的学习成本重新看一下源码,所以我打算花点时间狠狠的看下源码,整理一下自己认为有用的信息,毕竟要知其然更要知其所以然嘛~

我阅读的Dtp源码是1.1.9

之前使用Dynamic-TP 主要是为了使用它的以下功能:

  1. 动态调参
  2. 实时监控
  3. 通知告警
  4. 告警拓展
  5. 框架自带线程池
  6. 线程池注册
  7. 其他

所以我也打算从以下几个方面看下相关的一些源码

img

Dtp总览

1、动态调参

虽然动态调参JDK1.8就自带的,但是实际上的Dynamic-TP动态调参实际上有一定的修改的

据我观察Dynamic-TP使用的是这组代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void doRefresh(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
ExecutorAdapter<?> executor = executorWrapper.getExecutor();
doRefreshPoolSize(executor, props);
if (!Objects.equals(executor.getKeepAliveTime(props.getUnit()), props.getKeepAliveTime())) {
executor.setKeepAliveTime(props.getKeepAliveTime(), props.getUnit());
}
if (!Objects.equals(executor.allowsCoreThreadTimeOut(), props.isAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(props.isAllowCoreThreadTimeOut());
}
// update queue
updateQueueProps(executor, props);

if (executorWrapper.isDtpExecutor()) {
doRefreshDtp(executorWrapper, props);
return;
}
doRefreshCommon(executorWrapper, props);
}
  1. doRefreshPoolSize()方法是修改核心线程数和最大线程数,期间涉及到核心线程是是要不如最大线程数大的,不然会报问题,最终调用JDK的修改参数的方法(不过实际上ThreadPoolExecutor本身就是支持校验核心线程数和最大线程数之间比较的)
  2. Objects.equals(executor.getKeepAliveTime(props.getUnit()), props.getKeepAliveTime()))是由于配置文件里面默认是TimeUnit是秒,所以此处的比较还是根据秒做比较,当然Dynamic-TP自己有自己的线程池

由于是修改的非核心线程的最大保活信息,底层修改的时候,会使用LockSupport、CAS方式关掉线程池(JBoss),或者是通过ReentrantLock遍历关闭对应超过最大时间的线程(Tomcat)

  1. 修改queue的时候,如果是内存安全线程则会修改最大使用内存,如果不相等才会修改任务队列;此外会修改 [OrderedDtpExecutor] 的信息

  2. 方法doRefreshCommon修改了拒绝策略等信息,并且如果有配置事件

  3. AbstractDtpAdapter类监听了配置更新的事件,只有配置更新的时候才会调用以上的方法。出发方式的话,如果是nacos的话,则会通过某种方式实现的发布事件让Java程序监听。

    该方式在CloudNacosRefresher中

    1
    2
    3
    4
    5
    6
    @Override
    public void onApplicationEvent(@NonNull ApplicationEvent event) {
    if (needRefresh(((EnvironmentChangeEvent) event).getKeys())) {
    refresh(environment);
    }
    }

2、实时监控

实时监控是通过DtpMonitor类实现的,此处实际上就是通过Spring的事件机制进行事件监控与触发。DtpMonitor自带一个定时任务线程池,最大线程数只有一个。DtpMonitor继承了OnceApplicationContextEventListener抽象类,这个类实现了以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (isOriginalEventSource(event) && event instanceof ApplicationContextEvent) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextStartedEvent) {
onContextStartedEvent((ContextStartedEvent) event);
} else if (event instanceof ContextStoppedEvent) {
onContextStoppedEvent((ContextStoppedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
}

onContextRefreshedEvent、onContextStartedEvent、onContextStoppedEvent、onContextClosedEvent是Dynamic-TP需要的事件,不过DtpMonitor只实现了onContextRefreshedEvent方法

  1. onContextRefreshedEvent会起一个0秒后启动的定时任务,这个任务会给全部的注册在DtpRegistry的线程池发告警消息,AlarmManager会使用自己对象中的线程池ALARM_EXECUTOR进行发布告警消息,告警消息我看只有查看线程活度和任务队列容量,遍历进行告警。
  2. 此处使用了spring的事件机制,这个使用来说就是现在存在事件和事件监听器,只要逐注册一个事件监听器,实现 ApplicationListener接口,接口泛型是 你想监听的类。

只要ApplicationContext发布一个你想监听的类的对象,对应想监听这个类的监听器就能监听到,做对应的事情,不过范围只是Java程序内的,做不到分布式的。Dynamic-tp中OnceApplicationContextEventListener就是我们需要监听的事件

  1. 言归正传,实际上就是起一个任务分别发布告警和监控的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void run() {
Set<String> executorNames = DtpRegistry.getAllExecutorNames();
checkAlarm(executorNames);
collectMetrics(executorNames);
}
private void checkAlarm(Set<String> executorNames) {
executorNames.forEach(name -> {
ExecutorWrapper wrapper = DtpRegistry.getExecutorWrapper(name);
AlarmManager.tryAlarmAsync(wrapper, SCHEDULE_NOTIFY_ITEMS);
});
publishAlarmCheckEvent();
}
private void collectMetrics(Set<String> executorNames) {
if (!dtpProperties.isEnabledCollect()) {
return;
}
executorNames.forEach(x -> {
ExecutorWrapper wrapper = DtpRegistry.getExecutorWrapper(x);
doCollect(ExecutorConverter.toMetrics(wrapper));
});
publishCollectEvent();
}
  1. 我们分别查看,发出的监控和告警都会到达DtpAdapterListener类,该类会在以下方法中监听对应事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}

处理时会先看是否有DtpAdapter的实现类对于对应的线程池,发送对应的告警,注意DtpAdapter只会对告警做发送,筛选什么的还是在alarm模块里面做。

数据采集会有一个专门的类CollectorHandler用于处理数据采集,这个数据采集也是调用一个类,遍历collectorTypeList的时候调用该方法,会有具体实现类进行实现的。

怎么进行告警的?

通过eventBus(虽然据说是做出来给Android用的,发布者需要使用post(),然后需要一个接收的,通过使用@subscirbe进行event类型的获取,怎么看是否是拿到了合适的,需要通过实现applicationevent后,接受提需要入参是对应类型的event类才能拿到,这种方法一般是void方法)

dynamic-tp中自定义event是这么写的

1
2
3
4
5
6
public class CustomContextRefreshedEvent extends EventObject {

public CustomContextRefreshedEvent(Object source) {
super(source);
}
}

在dtpMonitor中,会获取刷新配置的时间,通过这种方式(我才创建的时候会在lifecycle里面先创建时间与事件监听,再发布一个刷新配置的事件)会DtpMonitor中创建一个定时任务,这个定时任务会发布一个AlarmCheckEvent的事件

image-20241117155135973

此处会发现它指向了这个context ->相关的代码段,在这个代码段中,会获取全部的dtpadpter(因为第三方组件也会可能使用dtp组件),获取全部dtpadpter的全部线程池wrapper(通常存放着线程池的信息以及一些通知平台和告警项),接着会对全部线程池wrapper查看liveness和capacity,如果到了时间且出发了liveness和capacity,则会进行通知告警,走通知告警的逻辑。

liveness的判断逻辑就是单纯的除法

3、通知告警

告警主要是看AbstractNotifier、AlarmManager这两个Java类。

  1. AbstractNotifier用于处理发送功能,该方式解决了通过send()方法实现发送调参通知、发送告警通知、创建告警内容、创建通知内容、通过MDC获取trace信息、配置receiver的信息(发送调参通知、发送告警通知会使用)、高亮、获取Const类里面的文本信息。

AbstractNotifier类本身是继承了DtpNotifier,DtpNotifier只存储平台信息、发送告警信息、发送变更信息。其他通知类的具体实现类则是实现了send方法,当然也可以实现其他方法。

  1. AlarmManager类里面给IOC容器注册了一个线程池,最大线程数量就1个,任务队列数量是2000个、NoticeManager则是创建了一个任务队列是100个的线程池,一开始创建该类时候会分别执行一次static方法块中的方法,为的是创建过滤器查看哪些平台需要通知。我们以AlarmManager为主要学习类。

值得学习的事情是,遍历该告警那些平台选择过滤条件的时候是从末往前遍历,在更细化一层的时候拿到的信息如果没有的话,会优先使用上一层的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
@SafeVarargs
public static<T> InvokerChain<T> buildInvokerChain(Invoker<T> target, Filter<T>... filters) {

InvokerChain<T> invokerChain = new InvokerChain<>();
Invoker<T> last = target;
for (int i = filters.length - 1; i >= 0; i--) {
Invoker<T> next = last;
Filter<T> filter = filters[i];
last = context -> filter.doFilter(context, next);
}
invokerChain.setHead(last);
return invokerChain;
}

是在InvokerChainFactory类中做的过滤,常见的过滤方式存储在AlarmBaseFilter/NoticeBaseFilter/NotifyRedisRateLimiterFilter中,我们当前只看AlarmBaseFilter。

AlarmBaseFilter做过滤操作的时候,会需要ALarm的上下文信息(AlarmCtx,就是存储通知信息以及哪个线程池需要做通知)、Alarm的触发器(AlarmInvoker会配置对应的上下文信息,发送对应alarm信息,重置告警的触发条件,告警信息存储在ConcurrentHashMap,是线程安全的。AlarmCounter),

一开始就会调用invoke方法?

本身是实现了一个链表的invoke的实现,最终创建了一个invokeChain

这句话可能会去掉。配置文件DtpBaseBeanConfiguration会创建一个DtpLifecycle,DtpLifecycle会从注册器中取的全部的注册在DTPRegistry中executor的对象进行初始化(此处会使用CAS保证线程安全),每个DTP线程池遍历使用DtpLifecycleSupport初始化该线程池ExecutorWrapper,ExecutorWrapper初始化的时候不仅会初始化AwareManger,也会初始化线程池对象,线程池对象初始化会优先使用NotifyHelper初始化自己的通知系统,获取DtpProperties和通知平台信息。因为通过哪个平台通知和触发什么情况的时候通知是解耦的,所以只有 触发解耦的条件 会被用来遍历告警频率限制初始化、告警计数器初始化,这两个对象都会存储一个ConcurrentHashMap。

  1. NotifierHandler和AlarmInvoker、NoticeInvoker相关,NotifierHandler在初始化的时候会默认使用ExtensionServiceLoader获取全部的DtpNotifier的对象,存储在HashMap中

NotifierHandler也会写sendNotice、sendAlarm,这就将其和AlarmInvoker、NoticeInvoker联系起来了。

具体的告警如下所示:

  1. 有些告警是通过定时任务去触发的,Dtp使用的是Timer时间轮做的效果,通过使用HashedWheelTimer.run()方法进行查看,如活跃度、队列容量告警都是适用的这个方式
  2. 拒绝策略告警出发是AwareManager通过beforeReject方法让AlarmManager进行通知。DtpExecutor线程池用到的RejectedExecutionHandler是经过动态代理包装过的
  3. 任务队列超时告警、任务执行超时告警官网上说是通过重写重写ThreadPoolExecutor的execute()方法和beforeExecute()方法实现的。当然现在新版本代码是通过Timer时间轮实现的,在AlarmManager中统一进行查看修改。并且Dtp线程池在创建线程等操作的时候会通过AwareManager主动开启一个任务
高亮是怎么实现的?

如告警过程中,通过buildAlarmContent方法创建内容时候,会执行高亮API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private String highlightAlarmContent(String content, NotifyItemEnum notifyItemEnum) {
if (StringUtils.isBlank(content) || Objects.isNull(getColors())) {
return content;
}

Set<String> colorKeys = getAlarmKeys(notifyItemEnum);
Pair<String, String> pair = getColors();
for (String field : colorKeys) {
content = content.replace(field, pair.getLeft());
}
for (String field : getAllAlarmKeys()) {
content = content.replace(field, pair.getRight());
}
return content;
}

会将获取需要高亮的信息(存储在NotifyItemEnum中),进行颜色替换

4、告警拓展

  1. 告警拓展具体来讲就是自己写了个新告警,但是新告警需要使用JDK SPI机制,JDK SPI是因为我们传输的Alarm信息不能被IOC容器管理,因为我们是一个告警需要产生一个类的,Spring SPI靠的是将JDK SPI的类加载IOC容器中实现的可加载模式的。
  2. 为什么是使用JDK SPI?因为使用Spring SPI是需要托管到IOC容器的,但是NotifierHandler本身会处理告警通知,但是如果如果要做到动态修改配置与个数,则需要做到实时的创建与删除,变成Bean是不够合理的,所以才会使用JDK SPI方式,通过以下方式实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void invoke(BaseNotifyCtx context) {
try {
DtpNotifyCtxHolder.set(context);
val noticeCtx = (NoticeCtx) context;
NotifierHandler.getInstance().sendNotice(noticeCtx.getOldFields(), noticeCtx.getDiffs());
} finally {
DtpNotifyCtxHolder.remove();
}
}

public static NotifierHandler getInstance() {
return NotifierHandlerHolder.INSTANCE;
}

private static class NotifierHandlerHolder {
private static final NotifierHandler INSTANCE = new NotifierHandler();
}

private NotifierHandler() {
List<DtpNotifier> loadedNotifiers = ExtensionServiceLoader.get(DtpNotifier.class);
loadedNotifiers.forEach(notifier -> NOTIFIERS.put(notifier.platform(), notifier));

DtpNotifier dingNotifier = new DtpDingNotifier(new DingNotifier());
DtpNotifier wechatNotifier = new DtpWechatNotifier(new WechatNotifier());
DtpNotifier larkNotifier = new DtpLarkNotifier(new LarkNotifier());
NOTIFIERS.put(dingNotifier.platform(), dingNotifier);
NOTIFIERS.put(wechatNotifier.platform(), wechatNotifier);
NOTIFIERS.put(larkNotifier.platform(), larkNotifier);
}

可以看到实际上是通过创建一个NotifierHandler进行通知,每次创建都会通过获取通过任何方式提供的DtpNotifier服务,后将其注册到这个创建的NotifierHandler的Map中。

5、框架自带线程池

框架自带线程池均继承自DtpExecutor,DtpExecutor继承自ThreadPoolExecutor,顺便实现了SpringExecutor, TaskEnhanceAware, ExecutorAdapter,具体原理是在executebeforeExecuteafterExecute上面做了增强,包装成了DtpExecutor自己的

源码中,除了Tomcat第三方线程池有不同外,其他的都是如下

1
2
3
4
5
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
}

可以看到是总是会先调用ThreadPoolExecutor的beforeExecute方法,然后会使用AwareManager.beforeExecute()方法,这个方法会先删除掉原先的Runnable,然后在开启一个新的Runnable,因为要设置线程的期限运行时间,所以会使用Timer时间轮做定时关掉的任务

执行execute方法时候,则必定会调用AwareManager.execute方法,只有被DynamicTp增强后的任务才会被DTP自己加载监控等信息,然后再开启任务。

1
2
3
4
@Override
public void afterExecute(Executor executor, Runnable r, Throwable t) {
Optional.ofNullable(statProviders.get(executor)).ifPresent(p -> p.completeTask(r));
}

可见也是先调用ThreadPoolExecutor的afterExecute方法,再看对应Executor是否存储在DTP里面,有的话再调用该Executor删除掉对应任务。

6、线程池注册

虽然看起来线程池注册是在整个框架中比较不常见的,但是这个也非常的重要,值得我们学习与阅读。DtpRegistry也继承了OnceApplicationContextEventListener,这代表

  1. 线程池注册在DtpRegistry里面,里面的话是一个ConcurrentHashMap

    public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, PriorityOrdered

    b. 另外就是会将获取DtpRegistry中线程池的一些操作API给放这个类里面了;监听事件刷线配置的操作也存放在这个类里面了。这个刷新方法会同时刷新Dtp线程池和不是Dtp但是托管给了Dtp的线程池。

7、其他

7.1、对接第三方线程池

adapter模块目前已经接入了SpringBoot内置的三大WebServer(Tomcat、Jetty、Undertow)的线程池管理,实现层面也是和核心模块做了解耦,利用spring的事件机制进行通知监听处理。通过initialize()方法拿到webServer的线程池

1
2
3
4
5
6
7
8
9
10
11
@Override
protected void initialize() {
super.initialize();
if (executors.get(getTpName()) == null) {
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
WebServer webServer = ((WebServerApplicationContext) applicationContext).getWebServer();
doEnhance(webServer);
log.info("DynamicTp adapter, web server {} executor init end, executor: {}",
getTpName(), ExecutorConverter.toMainFields(executors.get(getTpName())));
}
}

在这个方法中,doEnhance()方法会修改WebServer的线程池为Dtp自己的线程池,且该线程池会受到对应webserver线程池参数的影响

其他的则和我们在项目中使用的Dtp线程池具体代码实现相差无几

7.2、Dynamic-TP对于Spring的使用

DtpPostProcessor

具体来讲这个类就是在单例化Bean的时候,如果发现不是ThreadPoolTaskExecutor、ThreadPoolExecutor就不做操作,如果是DtpExecutor或者是ThreadPoolTaskExecutor、ThreadPoolExecutor就做一组操作进行包装。

@1:如果bean不是ThreadPoolExecutor或者ThreadPoolTaskExecutor,那么就不对bean做任何处理,直接返回

@2:如果bean是DtpExecutor,调用registerDtp方法填充DTP_REGISTRY这个map

@3:如果说bean上面的DynamicTp注解,使用注解的值作为线程池的名称,没有的话就使用bean的名称

@4:如果bean是spring中的ThreadPoolTaskExecutor的话,那么就通过getThreadPoolExecutor()方法拿到ThreadPoolExecutor注册到COMMON_REGISTRY中

到这里执行完,就针对所有线程池对象完成了增强

看起来复杂实际上简单

img

AwareManager
  1. 这个其实是相当重要的一块,涉及到了以关闭线程池、终止线程池、执行一个任务、注册、添加、更新线程池、拒绝策略为切面实现了一组操作
  2. 涉及到监控、触发拒绝等问题,因为线程池是实时在变化的,参数调整、任务的创建和完成等操作,都会涉及到监控指标,拒绝策略的修改,因此为了更好地使用资源,所以才会有AwareManager。
DtpInterceptorRegistry

根据注解@DtpInterceptor获取对应拦截器信息,基于此提供了对应的API

DtpInterceptorProxyFactory

存储着一些动态代理的方法,如getSignatureMap获取注解的信息修改

总结

笔者看了下美团的动态线程池实践,发现了JDK8的线程池的内部维护的逻辑,看起来还是很不错的

内部有个worker,这个worker里面有thread和runnable,所以既有任务又有线程去维护,本身线程池是 内部有很多信息 的

线程池本身有五种状态,其他四种是为了结束线程池而特意准备的

线程有线程状态和线程数量 这俩状态,在高并发的情况下,使用两个变量去存储则会比较麻烦,不如使用一个AotmicInteger,高三位存储线程池中线程状态,低26位存储线程数量。

也有worker的回收线程。

线程池怎么使用,最好还是按照不同业务来分配线程池,避免不同任务之间有父子级关系,导致OOM,也有可能会因为一部分业务占用资源过多导致出现其他业务饥饿的现象

dynamic-tp本身使用了eventBus,我个人觉得很不错。通过使用@subscribe注解实现的,这个注解有些属性,ThreadMode 线程模式,主要是看发布者和订阅者执行的一个方式,posting就是一个发送事件谁执行事件的一种方式。sticky属性类似于广播中的粘性广播。可以先将事件发出,留存在内存中,后面等到有订阅方法时,再接受订阅。priority 优先级可以使得优先级更高的获取到对应信息。

线程池有submit和execute这两者的区别在于execute只能提交Runnable类型的任务;submit既能提交Runnable类型任务也能提交Callable类型任务。

  • execute会直接报错,需要trycatch,和普通执行任务一样;但是submit会吞掉异常只能通过future.get()拿到对应信息。
  • execute没有返回值,submit有返回值。
  • 如果提交任务不需要结果的话则不需要使用submit,execute更快

引用文章

  1. dynamictp官网
  2. 动态线程池框架(DynamicTp),监控及源码解析篇
  3. SpringBoot:Bean生命周期介绍
  4. Spring Bean的生命周期
  5. Spring揭秘:ApplicationContextAware应用场景及实现原理!
  6. 观察者模式Spring之publishEvent事件处理
  7. Java 创建事件(Event)、事件监听器(EventListener)、事件发布(publishEvent)详解和相关demo
  8. java中的“&”、“|”、“^”、“~”运算符怎么用?
  9. 基于开源的配置中心的轻量动态线程池dynamic-tp实践与源码原理分析
  10. Java 中的定时器类Timer详解
  11. SoftReference 到底在什么时候被回收 ? 如何量化内存不足 ?
  12. com.google.common.base.Joiner,谷歌提供的字符串处理工具
  13. applicationContext.getBeansOfType(class)获取某一接口的所有实现类,应用于策略模式简单demo | 掘金
  14. 玩转Spring生命周期之Lifecycle和SmartLifecycle
  15. SpringBoot的SmartLifecycle生命周期接口
  16. java 非常好用的一个缓存(Google Guava的Cache)
  17. https://blog.csdn.net/qq_24654501/category_11384283.html
  18. ApplicationEvent 的简介及简单使用
  19. 动态线程池DynamicTp系列四之监控告警
  20. SpringBoot实现静态、动态定时任务,本地动态定时任务调度
  21. Google Java编程风格规范(2020年4月原版翻译)
  22. 提升编程效率的利器: 解析Google Guava库之集合篇Immutable(一)
  23. 事件驱动模型的最佳实践-SpringEvent【(二)原理篇】
  24. 【Spring Boot 源码学习】ConditionEvaluationReport 日志记录上下文初始化器
  25. SpringBoot-5.ApplicationListener子接口GenericApplicationListener, SmartApplicationListener
  26. 谷歌工具导包-com.google.common.collect
  27. Google Collections:Java开发者必备的集合处理库Guava教程
  28. 一文读懂Spring的SPI机制
  29. SpringBoot加载自定义yml文件
  30. org.apache.commons.collections4下的CollectionUtils工具类
  31. Java工具库——commons-lang3的50个常用方法)集合开发工具大利器
  32. Spring Aware机制
  33. 那些年背过的题目:Spring ApplicationContext- IOC容器初始化过程
  34. 一张长图透彻理解SpringBoot 启动原理,架构师必备知识,不为应付面试!
  35. beandefinition与beanfactory
  36. 面试篇之什么是静态代理?什么是动态代理?
  37. Spring IOC容器分析(3) – DefaultListableBeanFactory
  38. Timing Wheel 定时轮算法
  39. Java开发利器Commons Lang之元组

1、背景

告警页面由APP+ H5开发转纯H5页面优化

好处

  1. 开发与维护需要服务端+APP+H5同时开发,对于项目管理复杂
  2. 转成H5可以减少APP的开发压力

1.1、问题现象

  1. 慢查询16K/天

    2e714778e23510985c77506bd312df31

  2. 线上MySQL CPU有激增现象,不过缓存命中率,内存利用率无明显变化

    dad9dd59dcd8a89c4f7740769393f056

    bdb64a65f6f6b66224f0433610086d82

2、排查方式

结论先行,以下是该列表的优化流程

flowchart 
    A[有先查看MySQL优化建议,依据智能优化作为基础优化方式] -->B(分析慢查询与CPU关系,发现解决慢查询即可优化CPU问题)
    B --> C[分析慢查询统计,排查该优化哪个慢查询sql,对应到Java代码]
    C --> D[explain分析具体优化点]
  1. 直接使用购买MySQL实例的腾讯云上面的智能优化

    可以优先查看AI的优化建议进行快速修复的凭证

    87a79fd8faf9089e282428144daccbfe

  2. 查看慢sql的统计情况,发现慢sql和CPU趋势较为相同

    这说明如果优化了慢查询,则MysqlCPU较高问题也可以解决

    994a8ff4419f284e41339ad008f0d530

  3. 导出慢查询日志,进行sql统计分析,发现其中99%以上是开发的告警信息列表查询语句

    这表明优化单一告警即可优化Mysql的慢查询

  4. 在线上环境使用explain对查询语句进行查看,发现以下现象:

    partition占用较多——打算减少partition

    type为range,不是ref、eq_ref、const、system等——怀疑索引效果差,需要优化索引

    key有值——有索引

    rows大,且filtered较小,只有2%——扫描200条数据需要进而扫描大量条数数据,不合理,需修改查询条件

    extra——有Using filesort,排序未触发索引,需优化索引

    4873f5fb0e88d3f8c86bf327d2fd566e

  5. 随机查看其他用户,发现告警量大的用户稳定出现慢查询,告警总量大约是20W-50W,告警量少的用户,即使是跨分区也不会出现慢查询

3、解决方式

3.1、自己可以修改的

  1. 改变查询条件顺序,减少查询条件

    expire_time >now()——删除,因为每天会将超过过期时间的数据删掉

    alarm_time between and——需要放在最后,因为联合索引在>,<,between and后会失效

    设备id ——放到稍微靠前的位置,因为该字段在线上验证时,发现该字段独立度更高,如id 是114514时占总数据的1-5%,而告警类型2000可以占10%左右,所以设备id需要放在比告警类型更靠前的位置

  2. 优化联合索引顺序,个数

    app的最常见查询条件作为联合索引的个数与顺序

    通过腾讯云迁移数据到测试环境数据库验证解决

    迁移单个表,单表200w数据,去其中的告警量50w单个用户进行压力测试

  3. 尝试增加资源——MySQL实例已经是购买的最贵的实例,无法优化

  4. paritition删除——删除会导致线上长时间停顿20min以上,且涉及到删除过期数据逻辑,无法优化

3.2、需要配合的

产品经理

  1. 重构时默认查询7天,涉及多个partition,可不可以改成默认查询1天——为了在功能上对标海康,大华,不能进行修改

app

  1. 缺少偏移量,期望入参增加偏移量,避免后续出现的深分页问题——app暂时没人力投入,不进行修改

4、问题回溯

  1. 新问题单开发同事开发时直接在原有查询中间添加查询条件

  2. 以下为测试环境压力测试时CPU,内存图片,无问题

    推测可能是因为测试环境压测数据量不够导致问题

    为什么没有准备合适数据,原因为慢查询多为数据量较少用户

    22b3ee0971451c532addaba0d71b9b62

    80f718900c5797ba180e97d6b72d6f36

5、优化效果

  1. 慢查询explain优化

    filtered查询行数降低,筛选率变高

    extra没有using filesort

    type虽然还是range,但是实际上速度已经可以来到200ms左右,顾先不优化,因为本次目的是减少慢查询降低MySQL压力

    image-20250706185239859

  2. 每日慢查询数量降低,从16k/天 -> 500/天

    b658a861b656ca8040d7f77e3f480cc7

6、后续优化

  1. APP侧辅助修改提问题单
  2. 后续排查发现告警有子模块呼叫查询,也会使用tbl_alarm表,为了减少单表压力,提单将对应查询优化到专门存储呼叫的表中进行查询

7、推荐阅读

  1. Mysql Documentation
  2. MySQL documentation explain

Java循环操作哪个快?

2024年10月2日

开发的时候我发现个问题,就是在学习玩streamAPI和lambda表达式后,我就变得越来越喜欢直接使用streamAPI,而不是使用for循环这种方式了,但是这种方式也有一定的缺点,但是直到某一次代码review,我的同事点醒了我,“小火汁,你的stream流写的是挺好,但是问题是为什么从同一个源取相似的对象,要分别写两次stream,你不觉得有点多余了吗?程序员不只是写代码,反而是最初的设计阶段就要把全局流程想好,要避免再犯这种错误哦~”,这句话点醒了我,所以我打算先看一下stream遍历、for循环、增强for循环、迭代器遍历、并行流parallel stream遍历的时间消耗,查看一下这几种方式的异同。

使用stream主要是在做什么?

此时我们先准备一个类

1
2
3
4
5
6
7
8
9
@Data
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
class Item {
private Integer name;

private Integer value;
}
  1. list转成map

    1
    list.stream().collect(Collectors.toMap(Item::getName, Item::getValue, (newValue, oldValue) -> newValue))
  2. List过滤,返回新List

    1
    List<Item> collect = list.stream().filter(x -> x.getValue() > 50).collect(Collectors.toList());
  3. 模拟多次stream,因为我在开发中经常出现这种问题

    1
    2
    Map<Integer, Integer> collect = list.stream().collect(Collectors.toMap(Item::getName, Item::getValue, (newValue, oldValue) -> newValue));
    Map<Integer, Integer> collect3 = list.stream().collect(Collectors.toMap(Item::getName, Item::getValue, (newValue, oldValue) -> newValue));
  4. 取出list<类>中某一个属性的值,转成新的list

    1
    List<Integer> collect = list.stream().map(Item::getValue).collect(Collectors.toList());
  5. list<类>中进行一组操作,并且转成新的list

    1
    2
    3
    4
    5
    6
    List<Item> collect1 = list.stream().parallel().map(x -> {
    Integer temp = x.getName();
    x.setName(x.getValue());
    x.setValue(temp);
    return x;
    }).collect(Collectors.toList());

实际消耗

49c98bb4e7df8b966d8feb3818701e1b

选择1、10、100、100_00、100_000的原因

1、10、100主要是业务决定的,实际代码编写中这块的数据量是占大头的,10_000,100_000是因为为了查看实际的大数据量情况下的效果。

结果结论如下:

  1. 如果只是用filter的API,则建议只使用普通for循环,其他情况下数据量较少时,虽然stream和for循环都是10ms以内,但是性能上会差着3-4倍
  2. 普通for循环可以使用for (Item item : list),因为这个是for (int i = 0; i < ; i++)的语法糖,内部是iterator实现
  3. 增强for循环底层是Iterator接口,但是实际的验证时发现特别慢,暂时没发现原因,但是不推荐使用
  4. stream串行流转成并行流操作后普遍还是不如串行流快,速度如下:执行时间:串行流转并行流>串行流>并行流,所以串行流转并行流不推荐使用
  5. 串行流转并行流和并行流都会使用ForkJoinsPool.commonPool(),这是个进程共用的CPU型线程池,且数据不方便修改,我记得是需要在启动的时候进行修改
  6. 串行流转并行流和并行流均会产生线程争抢资源与线程安全问题
  7. 在单次stream多次中继操作的情况下,执行速度和单次中继操作差不多
  8. iterator显式使用,在大数据量(1_000_000_000等次循环)的情况下,效率会急速下降,目前原因暂时不清楚。

总结

  1. 写一次stream操作耗时较少,但是会导致开发人员无意之间多次使用stream流做类似操作(如从订单类中多次取不一致但是相似的一组对象),从而导致可读性变差,不利于后续拓展
  2. 尽量使用普通for循环做遍历,迭代器循环做删除或者使用collection的remove、removeIf等API实现(如果只需要删除的话)
  3. 使用普通for循环比stream流节省时间,因此在提高性能的角度看开发中尽量使用普通for循环。

2025年7月12日

实际上stream不只是替代循环操作这么简单,更多的是对集合操作的逻辑进行了抽象。因此stream代码量相比于写for循环更少,并且不只是逻辑处理被抽象了,并发处理也同样可以被抽象;

开发时需要抽象业务实现与并发实现,再进行开发。

引用文章:

  1. 【有趣的实验】JAVA 遍历数组的几种方式的耗时对比「建议收藏」
  2. java foreach循环的底层实现原理

前言

随着业务的不断拓展,一个项目再也不是只需要运行一段时间就可以了,也不是一个很小的项目,随着业务的增长,最终开发的代码会变成一个很大的项目,此时如果不能做好很好的重构,会造成越来越难的拓展,最后导致无法拓展。所以DDD(Domain Driven Design,领域驱动设计)就是从领域业务这个方面对代码做重构与设计的方法论之一。

另外不可能将DDD用于任何一处代码,原因如下:

  1. 成本不允许,一般只有项目的核心领域才会考虑使用DDD重构,培养DDD团队会消耗比较长的时间,且整个时间成本是需要持续投入的。

  2. 因为DDD无法使用于任何一个环境,只使用DDD无法做到满足全部的业务场景,比如G端企业做数据化,很多时候是只需要从数仓查询数据;但是DDD只是一个业务思想,单纯的数据也无法做到成为业务领域

  3. DDD要求可以将业务分的很清晰,需要一个“聚合根”将一组业务模型组起来,这要求需要一组业务专家作为开发主力,这通常意味着需要将团队中比较聪明的几位来将项目解构成DDD的模型,很多时候这群人的工作重点不在这里。

  4. 就算使用DDD也不可能将其全部的教条应用在代码中,因为工作主要是为了盈利,如果使用DDD会帮助提升代码价值,全使用DDD会降低代码价值。在这种情况下,必然是只会使用一部分的DDD思想。

Model-Driven Design

好的模型可以反应真实的业务情况

  1. 好的模型需要统一的语言,在一个开发团队中,需要让每个开发者对同一个术语的概念是了解一致的,且需要业务专家将对应信息传达下去。

    如“数据资产”,数据专家和一个应届生想到的肯定是不一样的。

  2. 模型类与其他非DDD层的POJO需要分离开来,尤其需要和数据库的POJO分离开来,DDD是软件设计思想,与数据库、技术组件、远程调用是完全不相关的。

  3. 除了核心组件外,其他组件,如MVC的DAO层,因为其确实是需要做到和固定数据库做好对接,所以面向结果编程是完全值得提倡的。

  4. DDD中一组很重要的思想是Entity和Value Object,Entity可以被认为是数据库中一张表的自增主键,具有连续性和唯一性,Value Object相当于是一个更关注其数据属性的一个组件,因此只能依附于一个Entity

  5. Entity和Value Object只是提供了一组事物信息,如广告转化率、广告使用地区等。但是另外还需要非“事物”的部分引入到模型中,如行为,如一组模型相关的业务,如广告投放业务中,如果需要对广告相关的信息做校验,其中就包括业务校验和技术校验,如果将这两个模块混淆了,则会导致业务层被“腐化”,无法实现很好的解耦与以后的拓展,因此需要将业务操作和技术操作分离出来,我们可以将基于Entity和Value Object的操作称之为“Service”。

    模型中的Service有应用层面的Service、领域层面的Service(因为需要将真实业务层和技术实现层解耦,在Java的开发中可以使用依赖反转实现)、技术层的Service,应用层面的Service、领域层面的Service无法很好的区分,有时候可能不需要做区分,但是一旦涉及到业务领域核心,且不已不同应用场景为变更,则可能需要区分。

    以下有一个例子:

    在C端进行广告投放业务,会需要做到以下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public class AdvertisementService {

    @Resource
    OnlineRetailersFeign onlineRetailersFeign;

    @Resource
    AdvertisementDAO advertisementDAO;

    @Resource
    AdvertisementDomainService advertisementDomainService;

    public Result<AdvertisementDTO> showAdvertisementInfo(AdvertisementRequestAO advertisementRequestAO) {

    String reginId = advertisementRequestAO.getReginId();
    //查看广告校验是否有效
    if (checkAdvertisementDisable(reginId)) {
    return new Result<AdvertisementDTO>();
    }
    //进行本业务相关操作
    /**
    * 1.查看投放地域、用户信息,查看用户所处环境(APP、Web等)
    * 2.从数据库中取出用户个人画像、当地广告服务商提供的服务
    * 3.通过算法取得合适的投放信息,并且按照需要从电商微服务等服务获取信息包装
    */
    advertisementDomainService.domainOperate();
    //最后进行写入数据库操作
    advertisementDAO.insert();

    //给其他微服务发送对应消息
    onlineRetailersFeign.notify();
    //最后返回信息
    return new Result<AdvertisementDTO>();
    }

    /**
    * 校验广告地址信息
    *
    * @param reginId
    * @return
    */
    private boolean checkAdvertisementDisable(String reginId) {
    return true;
    }

    }

    其中最重要的是advertisementDomainService.domainOperate(),这个可以认为是领域相关Service,不能算作是应用Service,应用Serivce可以是advertisementDomainService.domainOperate()中的实现,通过状态模式/策略模式,实现具体逻辑。因为取得服务可能因为地区、用户不同、具体的实现逻辑是不一样的,调用的组件是不一样的,所以细节上的实现需要应用层去协调和跟技术层Service实现,但是总的来说,投放一个广告就是需要这几步,因此这一组需要作为领域层的Service。

    Service是当需要Entity和Value Object做一组操作时候出现的,且Service最好是无状态的。

  6. 最后的Entity和Value Object应当以一组模块的形式出现。

业务域

  1. 设计好模型后就需要将模型组合成为领域,那么就需要一个model作为聚合根实现聚合。其中占主导作用的,和外部做主要交互的model作为聚合根,是这个领域存在的前提条件。
  2. 开发一个领域就是为了让隐性概念显性化,让隐形上下文显性化,配置多对象行为。
  3. 需要根据业务需要将领域划分为核心子域、支撑子域、通用子域。这些领域之间需要做到可以使用限界上下文、通用语言、上下文映射图做好划分。
  4. 限界上下文是当一个领域形成时候,需要将业务与其他业务进行物理隔离,只能通过几个固定方式进行交互,这个边界就是限界上下文

战略战术

总结

  1. 只学习DDD并不能帮你更好的架构与设计,更好的方式应当是学习软件常见的架构设计模式、自己使用的编程语言的基本特性、然后再积极的使用
  2. (另外就是Eric Evans的软件驱动设计:软件核心复杂性应对之道不推荐看,因为翻译比较捉襟见肘,很多时候不知道在讲什么)
  3. DDD思想知易行难,期间需要花大量精力打通多个团队,是个人硬实力与软实力的体现,另外就是需要正确的构建出模型,需要可以满足实际的需要。实践DDD思想是一项风险收益都较高的事情。
  • 避免胶水依赖
  • 修改目标:接口语义明确,可拓展性强,最好带有自检性;参数校验逻辑复用,内聚;参数校验一场和业务逻辑异常解耦。
  • 单元测试可行性(至少要满足从跑CI)
  • 让隐性概念显性化,让隐形上下文显性化,配置多对象行为
  • DP(Domain Primitive)
  • 将其中的技术框架、中间件等五控制权,可以替换的、非核心业务域的组件给去除掉。
  • 比如,业务类和数据库DO类就不是同一个类。
  • 一个系统需要和它使用的数据库进行解耦
  • Repository(数据访问抽象层)只定义动作。
  • 下水道逻辑(Dao层,可以想做什么就做什么)
  • 如果不使用合理的耦合方式,那么就会出现M个业务如果进行修改且相互耦合,那就代表一次拓展需要做M²次修改,如果遵循各种架构的优化方式,则可以做到一次拓展只需要修改2M次服务。
  • 业务总览->业务划分->业务编排Entity、Domain Service、最后做一些解耦中的具体实现
  • 聚合根、限界上下文
  • value Object与Domain Service

引用文章

  1. DDD在软件开发实践中的应用(一)- Ubiquitous Language

增强 guava 的 Maps. uniqueIndex方法
guava 的 Maps. uniqueIndex方法可以实现: 将 list 转成 Map<K , V> K 需要自己指定, V不能指定
本方法实现了:
将 list 转成 Map<K , M> K 需要自己指定, M需要自己指定
其中K不能重复,若重复,则会报错
Params:
values – 需要转换的集合 可以是任何实现了 Iterable 接口的集合(如List, Set, Collection) keyFunction – 转换后Map的键的转换方式 valueFunction – 转换后Map的值的转换方式
Returns:
唯一的map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static <K, V, M> ImmutableMap<K, M> uniqueIndex(Iterable<V> values, Function<? super V, K> keyFunction, Function<? super V, M> valueFunction) {
Iterator<V> iterator = values.iterator();
checkNotNull(keyFunction);
checkNotNull(valueFunction);
ImmutableMap.Builder<K, M> builder = ImmutableMap.builder();
while (iterator.hasNext()) {
V value = iterator.next();
builder.put(keyFunction.apply(value), valueFunction.apply(value));
}
try {
return builder.build();
} catch (IllegalArgumentException duplicateKeys) {
throw new IllegalArgumentException(
duplicateKeys.getMessage()
+ ".若要在键下索引多个值,请使用: Multimaps.index.", duplicateKeys);
}
}

一个key 对应多个值的map 结构: key -> [value1, value2, …]
Params:
values – 需要转换的集合 可以是任何实现了 Iterable 接口的集合(如List, Set, Collection) keyFunction – 转换后Map的键的转换方式 valueFunction – 转换后Map的值的转换方式
Returns:
唯一的map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static <K, V, M> Multimap<K, M> iterableToMultiMap(Iterable<V> values, Function<? super V, K> keyFunction, Function<? super V, M> valueFunction) {
Iterator<V> iterator = values.iterator();
checkNotNull(keyFunction);
checkNotNull(valueFunction);

Multimap<K, M> builder = ArrayListMultimap.create();
while (iterator.hasNext()) {
V value = iterator.next();
builder.put(keyFunction.apply(value), valueFunction.apply(value));
}
try {
return builder;
} catch (IllegalArgumentException duplicateKeys) {
throw new IllegalArgumentException(
duplicateKeys.getMessage()
+ ".若要在键下索引多个值,请使用: Multimaps.index.", duplicateKeys);
}
}

转换 Map 的 K 和 V
Params:
map – map
Returns:
反转的map

1
2
3
4
5
6
7
8
public static <K, V> Map<V, K> inverse(Map<K, V> map) {
if (MapUtil.isEmpty(map)) {
return Collections.emptyMap();
}
HashBiMap<K, V> biMap = HashBiMap.create();
map.forEach(biMap::forcePut);
return biMap.inverse();
}

按照分隔符切割list
Params:
list – 集合 function – 转换器 separator – 分隔符
Returns:
分割后的集合

1
2
3
4
5
6
7
public static <T> List<String> split(Collection<T> list, Function<? super T, ?> function, CharSequence separator) {
if (CollUtil.isEmpty(list)) {
return new ArrayList<>();
}
return list.parallelStream().map(function).map(item -> StrUtil.splitToArray(String.valueOf(item), separator))
.flatMap(Arrays::stream).filter(ObjectUtil::isNotEmpty).distinct().collect(Collectors.toList());
}

按照分隔符切割list
Params:
list – 集合 separator – 分隔符
Returns:
分割后的集合

1
2
3
4
5
6
7
public static <T> List<String> split(Collection<String> list, CharSequence separator) {
if (CollUtil.isEmpty(list)) {
return new ArrayList<>();
}
return list.parallelStream().map(item -> StrUtil.splitToArray(item, separator))
.flatMap(Arrays::stream).filter(ObjectUtil::isNotEmpty).distinct().collect(Collectors.toList());
}

  • 除了Java自带默认线程池,基于Java的spring框架也有很多自带的默认线程池,有时候我们的一些需要线程池的操作可能通过提供的api和注解隐藏掉了,不熟悉的话会出现线程相关的问题

1 Future和CompletableFuture

Future和CompletableFuture都是Java提供的用来执行异步方法的情况。其中,Future只能包装一个异步任务,最后还是要把这个任务提交给特定线程池。

CompletableFuture则会使用默认线程池,但是比较复杂:

  • 无JVM参数前提下:

    若服务器的核心数小于等于2,commonParallelism 则为1,即useCommonPool 为false,new 一个线程池ThreadPerTaskExecutor 。

    若服务器的核心数大于2,commonParallelism 则为 核心数 - 1,即useCommonPool 为true,使用ForkJoinPool线程池。

  • 有JVM参数情况下:

    以设置参数为准。大于1小于等于32767。和上面判断一致

以上CompletableFuture的总结来自 这篇文章 从用法到源码再到应用场景:全方位了解CompletableFuture及其线程池 有兴趣深挖一下的可以看下

综上我们推荐使用CompletableFuture时候也需要主动指定线程池,没有指定线程池的情况下,CompletableFuture要么使用ForkJoinPool,要么不使用线程池,ForkJoinPool做CPU密集型任务好用,我们有时候做IO密集型任务则不需要他,需要指定IO型线程池。

2 hutool的异步任务

hutool在提交任务的时候,会自动提交到一个默认的ExecutorService对象中,这个对象会使用如下的类的builder(0方法,这个对象的参数如下

一些小项目直接使用OK的,但是如果大项目使用的话,风险会较大,他的最大线程数是最大值,任务队列也是1024,不一定满足业务需要,核心线程数是0

image-20240818161541264

3 @Async

springboot 2.1.0 之前:simpleasynctaskexecutor,这个线程池会是不安全的线程池,很容易出现OOM,因为他创建新线程、无限、不重用

springboot 2.1.0 之后:taskexecutor这个是springboot的一个自带bean,类型是TaskExecutor,一般是有的,如果没有才会找simpleasynctaskexecutor

4 @Schedule

在Spring Boot中,@Scheduled注解是基f于Java的ThreadPoolExecutor和ScheduledThreadPoolExecutor实现的。当我们配置了一个定时任务后,Spring Boot会首先创建一个ScheduledThreadPoolExecutor线程池,并将定时任务添加到该线程池中等待执行。然后,在指定的时间到来之后,线程池会为该定时任务分配一个线程来执行。如果该定时任务还未执行完毕,在下一个周期到达时,线程池会为该任务再次分配一个线程来执行。通过这种方式,@Scheduled可以非常方便地实现周期性的定时任务f于Java的ThreadPoolExecutor和ScheduledThreadPoolExecutor实现的。

当然schedule注解可以指定线程池,需要实现SchedulingConfigurer 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
@DependOn("executor")
public class ScheduleConfig implements SchedulingConfigurer {
@Resource
private ThreadPoolExecutor executor;

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//具体逻辑需要自己写
ThreadPoolTaskScheduler threadPoolTaskScheduler = executor;
}
}

或者可以再schedule注解上再加一个 @Async注解,做到异步线程中去做。

5 stream Parallel

parallelStream不能使用自定义线程池,默认情况下,Stream使用的是ForkJoinPool.commonPool(),这是一个公用的线程池,被整个程序所使用。

但是stream parallel本身就符合ForkJoinPool的Fork/Join思想,一般不会去进行修改。但是如果有需要,我们可以单独给并行流创建一个线程池供他们使用。

如创建一个并行度为4的ForkJoinPool,通过给该ForkJoinPool提交lambda表达式任务来解决这个问题

6 ForkJoinPool

详情可以参照该文章 线程池ForkJoinPool简介

7 @Transactional

使用该注解时不能做和多线程/异步线程相关的事情,否则事务容易失效,详情请见 注解@Transactional 原理和常见的坑

引用文章:

1、Superset安装步骤

superset官网 http://superset.apache.org/

superset对于python有版本要求,最好是最新版的python,因此需要安装python虚拟环境,方面部署。

superset有window安装方法,Linux的虚拟python环境安装方法,以及Linux的docker安装方法,一下为Linux虚拟python环境安装docker的方法

1.1 superset安装步骤

安装superset的完全步骤如下:

安装miniconda->在miniconda中为superset创建python虚拟环境->安装superset

安装过程需要全程联网

2、安装Miniconda

为什么不用anaconda或者是conda?

因为anaconda环境内容较多,安装superset不需要较多工具

下载Miniconda(Python3版本)的地址为:https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

按照以下命令进行安装,根据提示进行操作完成安装

拉取安装脚本

1
bash Miniconda3-latest-Linux-x86_64.sh 

在一开始阅读协议的时候可以按空格键快速阅读

笔者在使用命令安装时的安装miniconda地址为

1
/opt/module/miniconda3

加载配置

加载环境变量配置文件,使之生效

1
source ~/.bashrc

Miniconda安装完成后,每次打开终端都会激活其默认的base环境,我们可通过以下命令,禁止激活默认base环境。

1
conda config --set auto_activate_base false

3、创建python 3.10.4环境

配置conda镜像

1
2
3
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
conda config --set show_channel_urls yes

创建python环境

1
conda create --name superset python=3.10.4

conda环境管理常用命令

1
2
3
创建环境:conda create -n env_name
查看所有环境:conda info --envs
删除一个环境:conda remove -n env_name --all

激活环境

1
conda activate superset

如果想退出当前环境,使用如下命令

1
conda deactivate

4、安装superset

安装依赖

1
sudo yum install -y gcc gcc-c++ libffi-devel python-devel python-pip python-wheel python-setuptools openssl-devel cyrus-sasl-devel openldap-devel

安装(更新)setuptools和pip

1
pip install --upgrade setuptools pip -i https://pypi.douban.com/simple/

安装superset

1
pip install apache-superset -i https://pypi.douban.com/simple/

说明:-i的作用是指定镜像,这里选择国内镜像
注:如果遇到网络错误导致不能下载,可尝试更换镜像

1
pip install apache-superset --trusted-host https://repo.huaweicloud.com -i https://repo.huaweicloud.com/repository/pypi/simple

初始化Supetset数据库

1
superset db upgrade

创建管理员账户

1
2
3
export FLASK_APP=superset
superset fab create-admin
superset init

修改密码

1
superset fab reset-password --username admin --password 123456

username后面需要改成你想修改的用户的用户名,不需要修改

password后面为后续想改成的密码

修改完成后需要执行

1
superset init

重启

5、启动superset

安装gunicorn

1
pip install gunicorn -i https://pypi.douban.com/simple/

说明:gunicorn是一个Python Web Server,可以和java中的TomCat类比

启动Superset
首先确保当前conda环境为superset,及下图所示

https://pic.imgdb.cn/item/649d24da1ddac507cc198c90.jpg

启动

1
gunicorn --workers 5 --timeout 120 --bind 172.16.150.14:5000  "superset.app:create_app()" --daemon 

说明:
–workers:指定进程个数,python脚本语言没有线程概念
–timeout:worker进程超时时间,超时会自动重启
–bind:绑定本机地址,即为Superset访问地址
–daemon:后台运行

需要保证后续可以登录,需要保证防火墙关闭(不推荐)或者是开启端口并且重启防火墙

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 重启防火墙
firewall-cmd --reload
# 开启防火墙
systemctl start firewalld
# 关闭防火墙
systemctl stop firewalld
# 防火墙版本
firewall-cmd --version
# 防火墙状态
systemctl status firewalld
firewall-cmd --query-port=80/tcp
firewall-cmd --permanent --list-ports
firewall-cmd --permanent --add-port=8080/tcp
firewall-cmd --permanent --add-port=8083-8085/tcp
firewall-cmd --permanent --remove-port=8083-8085/tcp

6、遇到的问题以及解决

报错1
1
2
3
4
(superset) [hyj@hadoop102 ~]$ superset db upgrade
#然后包这些错误
from markupsafe import soft_unicode
ImportError: cannot import name 'soft_unicode' from 'markupsafe' (/opt/module/miniconda3/envs/superset/lib/python3.7/site-packages/markupsafe/__init__.py)

解决方法

1
2
3
4
5
6
#查看markupsafe这个包是否存在
(superset) [hyj@hadoop102 ~]$ pip show markupsafe
Name: MarkupSafe
Version: 2.1.2
#下载2.0.1版本的markupsafe (pip会帮我们卸载之前版本的并下载2.0.1版本的markupsafe)
(superset) [hyj@hadoop102 ~]$ pip install markupsafe==2.0.1
报错2
1
2
3
4
(superset) [hyj@hadoop102 ~]$ superset db upgrade
#但是会报这个错误
re.compile(r"'(''|\\\\|\\|[^'])*'", sqlparse.keywords.FLAGS).match,
AttributeError: module 'sqlparse.keywords' has no attribute 'FLAGS'

解决方法

1
pip install sqlparse=='0.4.3'
报错3
1
2
3
4
superset db upgrade
#报错
Error: Could not locate a Flask application.
You did not provide the "FLASK_APP" environment variable, and a "wsgi.py" or "app.py" module was not found in the current directory.

解决方法

1
export FLASK_APP=superset
报错4
1
2
3
4
5
6
7
8
9
10
11
superset db upgrade
#但是遇到如下问题
--------------------------------------------------------------------------------
WARNING
--------------------------------------------------------------------------------
A Default SECRET_KEY was detected, please use superset_config.py to override it.
Use a strong complex alphanumeric string and use a tool to help you generate
a sufficiently random sequence, ex: openssl rand -base64 42
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Refusing to start due to insecure SECRET_KEY

解决方法

进入到miniconda3的安装路径下的envs/superset/lib/python+版本号/目录

1
(superset) [hyj@hadoop102 ~]$ cd /opt/module/miniconda3/envs/superset/lib/python3.8/

创建superset_config.py

1
vim superset_config.py

添加如下行为

1
2
3
4
5
SECRET_KEY = "ZT2uRVAMPKpVkHM/QA1QiQlMuUgAi7LLo160AHA99aihEjp03m1HR6Kg" 
用命令 openssl rand -base64 42 来生成一个强密钥
openssl rand -base64 42
4xPjBius42o5Y/pbgRmkjKZ3im5CeHcRXM93TWm+FboEJOll0XMMgDRW
所以SECRET_KEY =“4xPjBius42o5Y/pbgRmkjKZ3im5CeHcRXM93TWm+FboEJOll0XMMgDRW”
报错5

yum命令无法查找到镜像mirror,下载不了文件的问题

解决

要么配置本地源(不推荐),要么网络拉取镜像源(推荐),下面有网络拉取镜像源的步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#安装wget工具
yum intall -y wget
#转到yum源文件存放路径
cd /etc/yum.repos.d/
#创建一个网络yum源文件备份
tar zcf backup_repo.tar.gz *
#删除所有yum源文件
rm -rf CentOS*
#下载yum源文件
wget http://mirrors.aliyun.com/repo/Centos-7.repo
#清除缓存
yum clean all
#重新搭载yum源
yum makecache

登录

访问http://hadoop102:5000,并使用之前安装superset过程中设置的用户名密码登录

停止superset

如果想要停止superset则使用如下命令

1
ps -ef | awk '/superset/ && !/awk/{print $2}' | xargs kill -9

使用superset启停脚本进行项目的启停

其中IP地址和端口需要根据实际情况进行更改

1
vim superset.sh

内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!/bin/bash

superset_status(){
result=`ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | wc -l`
if [[ $result -eq 0 ]]; then
return 0
else
return 1
fi
}
superset_start(){
source ~/.bashrc
superset_status >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
conda activate superset ; gunicorn --workers 5 --timeout 120 --bind IP地址:端口 --daemon 'superset.app:create_app()'
else
echo "superset正在运行"
fi

}

superset_stop(){
superset_status >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
echo "superset未在运行"
else
ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | xargs kill -9
fi
}


case $1 in
start )
echo "启动Superset"
superset_start
;;
stop )
echo "停止Superset"
superset_stop
;;
restart )
echo "重启Superset"
superset_stop
superset_start
;;
status )
superset_status >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
echo "superset未在运行"
else
echo "superset正在运行"
fi
esac

加执行权限

1
chmod +x superset.sh

怎么使用这些指令

1
2
3
4
5
6
# 启动superset
superset.sh start
# 停止superset
superset.sh stop
# 重启superset
superset.sh restart

自助分析平台使用教程

自助分析平台自身使用软件自带的sqlite软件对于信息进行存储
应用效果如图所示

其中主要部分用于展示自制的页面列表,自助分析平台有如下几个重要的组成元素:

  • 仪表盘(dashboard)
  • 各种数据图形(chart)
  • 数据集(dataset)
  • 数据源(database)

而创建一个完整的图表过程下所示

1、配置展示的数据源

点击右上角setting,在下拉框列表中勾选database connection

再点击

可以添加数据源,填写好配置后既可使用该数据源拿取数据

2、创建页面

在主页中点击 添加database,即可添加新页面

创建页面的页面效果如图所示

其中charts为可以用于填充页面的图表,需要自己制作,layout elements是布局元素,可以添加一定的预设填充页面,比如一部分markdown编辑组件,规定自制图表大小和位置的页面结构

如果没有图表,需要自己创作

3、创建图表

点击任何和图标相关的信息即可跳转到创建图表页面

分为两步,第一步选择数据集,第二步为选择图表样式

3.1创建数据集

点击add a dataset后可以跳转到安装数据源的页面,这个功能的意思就是从设置的数据源中拿到一张表,作为一个数据源

点击dataset,会让你从已经设置的数据源中选择一个作为数据源,

schema则为让你在选择的数据源中选择一个schema,

选择完成后会产生新的下拉框选项

即为选择相应的表

选择后点击右下角的 create dataset and create chart,即为创建完成

3.2选择图表

4、编辑图表

编辑图表界面如图所示

主要的部分为1、2、3、4共4各部分,其中

1为图表名称

2为数据集的信息展示,对于此部分了解的人员可以隐藏不看该部分

3为确认横坐标和纵坐标的信息

time与横坐标信息强绑定了,如果使用时间顺序折线图的话,必须从表中选择时间类型的属性作为横坐标,其中

time column为选择哪一个数据集中的属性作为x轴

time grain为单位,是哪一个时间维度作为单位

time range为查询的时间跨度是多少,可以不过滤

Query为y轴部分

METRIC可以用于选择哪一个数据集中的属性作为y轴信息,以及这个字段中数据以什么样的聚合函数形式展示,比如平均,还是求和,还是极值

FILTERS可以用于作为过滤条件,即对于数据进行条件判断,适合条件的才会做处理

第4部分为数据的可视化展示

处理完成后即可进行保存

值得注意的是,目前自主分析平台可以制作多种图表,已经可以满足各种类型图表的制作

5、填充页面并保存

再次回到页面编辑界面,此时我们已经有了数个图表

右侧为已经创建的图表,使用鼠标拖拽即可将图标移动到网页上

5.1改变图表大小

鼠标移动到图表的右侧或者下侧时,鼠标会变成特殊符号,此时按住鼠标进行拖拽即可改变图表大小

5.2各种布局元素的使用

除了自制图表,自助分析平台会提供6种预设布局元素,以供使用

从上到下的功能依次为

tabs: 将其放置于页面上时会创建一行信息,这行中可以创建多个sheet页,每一页可以填充不同的图表,每一个sheet页均可命名

row:可以创建一行空行,通常与column元素一起使用,达到一行中有多层的效果

Header:在页面上为单独一行,可以作为标题行使用

text:一块页面组件,大小初始默认和自制的图表大致相同,但是里面可以填充markdown语法,并且可以分别展示代码形式和预览形式

divider:在页面中以一条直线展示,可以作为分隔符使用

编辑完成后点击右上角的save按键,即可保存页面效果

有一点需要注意的,成型的页面只会默认创建页面的时候拉取一次数据并展示,后期不会主动拉取数据进行分析展示,如果想要展示,则需要右上角的…,除了可以点击refresh dashboard进行一次手动刷新数据外,可以点击set auto-fresh interval按键选择自动刷新的时间间隔,选择哪一个可以根据业务需求

0%