public abstract class AbstractBatchJobProcessor implements JobProcessor
{
private final static ExecutorService executors = Executors.newCachedThreadPool();
private ScheduledExecutorService schedulerExecutors = Executors.newScheduledThreadPool(1);
private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
/**
* 处理包大小
*
* <p>
* TODO 方法功能描述
*
* @return
* @return int
*/
abstract public int getPackageSize();
/**
* 处理周期(单位:分)
*
* <p>
* TODO 方法功能描述
*
* @return
* @return int
*/
abstract public int getCycle();
/**
* 获取任务列表
*
* <p>TODO 方法功能描述
*
* @return
* @return List<Runnable>
*/
abstract public List<Runnable> getTaskList();
// 每天凌晨0点触发
@Override
public void process(ProcessContext context)
{
List<Runnable> taskList = getTaskList();
if(taskList == null || taskList.size() == 0)
{
return;
}
for(Runnable task : taskList)
{
addTask(task);
}
start();
}
public void start()
{
schedulerExecutors.scheduleAtFixedRate(new TaskWorker(), 1, getCycle(), TimeUnit.MINUTES);
}
private class TaskWorker implements Runnable
{
@Override
public void run()
{
int i = 0;
while (!queue.isEmpty() && i++ < getPackageSize())
{
Runnable nextTask = nextTask();
if (nextTask != null)
{
executors.execute(nextTask);
}
}
}
}
public Runnable nextTask()
{
try
{
return queue.take();
}
catch (InterruptedException e)
{
return null;
}
}
public void addTask(Runnable task)
{
try
{
queue.put(task);
}
catch (InterruptedException e)
{
}
}
}
该类的意思是使用Quartz每天凌晨0点触发,从数据库取出2000个数据,这样子类实现的方法getTaskList();就有2000个任务,然后调用start()方法。 每隔getCycle()分钟执行getPackageSize();个线程。 请问 schedulerExecutors.scheduleAtFixedRate(new TaskWorker(), 1, getCycle(), TimeUnit.MINUTES);这个方法是不是只调用一次就行了。 |
|
![]() 20分 |
private final static ExecutorService executors = Executors.newCachedThreadPool();
这里的池子没有控制大小,每天0点任务调度的时候你把executors池子的线程数量打印出来看看。 还有你这里的定时任务只有一个,没有必要使用schedulerExecutors来做定时。可以采用Timer来做。 |
![]() |
那我怎么保证下一次0点调度的时候,之前的Timer是否还在运行呢?如果是的话?怎么重复利用呢? |
![]() |
那么你判断下。/
|
![]() |
你为什么要知道上一次的Timer还在运行?难道任务执行12个小时还不够处理的吗?这种情况都会做超时处理! |
![]() |
你要知道一点,一个Timer下面可以调度N个TimerTask;你当然可以获取到Timer下面任务的数量。
需要知道的是Timer是单线程对定时处理! |
![]() |
1.process里不能再调用start了,start应该在系统启动的时候配置Listener调用一次就OK了。多次调用会导致存在多个定时任务。
2.没有看到你设置ExecutorService的核心进程与最大、最小进程数的限制,当大量任务存储进程池时可能会创建超出系统承受范围的进程数量。 3.你的定时器的用法有问题,根据你写的业务来看根本就没有用。你的taskQueue是每天0点时候才有的数据,而taskWorker每分钟执行的时候直接清空任务队列,这样推测一天你的定时器仅有一次触发是有效触发。 正确设计应该在addTask方法时直接executors.execute(nextTask);。取消定时器功能。 总结: 1.设置线程池的初始线程数、最小线程数、最大线程数。 2.取消掉start方法,取消该类中对定时器的使用。 3.检查确定每个task的执行时间与系统计算资源情况并调整线程池的相关参数。 4.你自己的queue变量在这么改后也没有用了。 |
![]() |
非常感谢你的详细分析。请再帮我解答下心中的疑惑,非常感谢 |
![]() 80分 |
把定时器里添加任务的代码放到addTask中的话就是由线程池来调度运行。至于阻塞问题,如果你的task是线程安全的,且不需要和其他Task竞争的话是不会有问题的,线程池会按顺序来排列Task的运行。 |

