十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
最简单的可以利用java.util.concurrent.Executors
创新互联于2013年开始,是专业互联网技术服务公司,拥有项目网站制作、网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元环翠做网站,已为上家服务,为环翠各地企业和个人服务,联系电话:18982081108
调用Executors.newCachedThreadPool()获取缓冲式线程池
Executors.newFixedThreadPool(int nThreads)获取固定大小的线程池
一:newCachedThreadPool
(1)缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建立一个新的线程加入池中;
(2)缓存型池子,通常用于执行一些生存周期很短的异步型任务;因此一些面向连接的daemon型server中用得不多;
(3)能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
(4)注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止
二:newFixedThreadPool
(1)newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
(2)其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
(3)和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
(4)从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
三:ScheduledThreadPool
(1)调度型线程池
(2)这个池子里的线程可以按schedule依次delay执行,或周期执行
四:SingleThreadExecutor
(1)单例线程,任意时间池中只能有一个线程
(2)用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool;
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run();
}
//PooledThread java
package polarman threadpool;
import java util Collection; import java util Vector;
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public PooledThread(ThreadPool pool) { this pool = pool;
}
public void putTask(ThreadTask task) { tasks add(task);
}
public void putTasks(ThreadTask[] tasks) { for(int i= ; itasks length; i++) this tasks add(tasks[i]);
}
public void putTasks(Collection tasks) { this tasks addAll(tasks);
}
protected ThreadTask popTask() { if(tasks size() ) return (ThreadTask)tasks remove( );
else
return null;
}
public boolean isRunning() {
return running;
}
public void stopTasks() {
stopped = true;
}
public void stopTasksSync() {
stopTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true;
}
public void pauseTasksSync() {
pauseTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt();
else
killed = true;
}
public void killSync() {
kill();
while(isAlive()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true;
this notify();
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空闲 ); this wait(); }else {
ThreadTask task;
while((task = popTask()) != null) { task run(); if(stopped) {
stopped = false;
if(tasks size() ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );
break;
}
}
if(paused) {
paused = false;
if(tasks size() ) { System out println(Thread currentThread() getId() + : Tasks are paused );
break;
}
}
}
running = false;
}
if(killed) {
killed = false;
break;
}
}
}catch(InterruptedException e) {
return;
}
//System out println(Thread currentThread() getId() + : Killed );
}
}
//ThreadPool java
package polarman threadpool;
import java util Collection; import java util Iterator; import java util Vector;
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize;
protected int initPoolSize;
protected Vector threads = new Vector();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;
}
public void init() {
initialized = true;
for(int i= ; iinitPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize);
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize); this maxPoolSize = maxPoolSize;
if(maxPoolSize getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size;
return;
}else if(size getPoolSize()) { for(int i=getPoolSize(); isize imaxPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
}else if(size getPoolSize()) { while(getPoolSize() size) { PooledThread th = (PooledThread)threads remove( ); th kill();
}
}
//System out println( 重设线程数 线程数= + threads size());
}
public int getPoolSize() { return threads size();
}
protected void notifyForIdleThread() {
hasIdleThread = true;
}
protected boolean waitForIdleThread() {
hasIdleThread = false;
while(!hasIdleThread getPoolSize() = maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {
return false;
}
}
return true;
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())
return th;
}
if(getPoolSize() maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
return thread;
}
//System out println( 线程池已满 等待 );
if(waitForIdleThread() == false)
return null;
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread();
if(th != null) { th putTask(task); th startTasks();
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReader; import java io IOException; import java io InputStreamReader;
import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 启动任务A 时长为 秒 ); System out println( size 设置当前线程池大小为 ); System out println( max 设置线程池最大线程数为 ); System out println();
final ThreadPool pool = new ThreadPool( ); pool init();
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in));
while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) words length = ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) words length = ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) words length = ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace();
}
}
}
};
cmdThread start();
/**//*
for(int i= ; i ; i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName;
private int timeLen;
public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + );
try { Thread sleep(timeLen); } catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + );
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( ); pool init();
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixinzhi/Article/program/Java/hx/201311/27203
下面给你介绍4种线程池:
1、newCachedThreadPool:
底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;unit为TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)
通俗:当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
适用:执行很多短期异步的小程序或者负载较轻的服务器
2、newFixedThreadPool:
底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread,corePoolSize为nThread,maximumPoolSize为nThread;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueueRunnable() 无解阻塞队列
通俗:创建可容纳固定数量线程的池子,每隔线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
适用:执行长期的任务,性能好很多
3、newSingleThreadExecutor
底层:FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例,corePoolSize为1;maximumPoolSize为1;keepAliveTime为0L;unit为:TimeUnit.MILLISECONDS;workQueue为:new LinkedBlockingQueueRunnable() 无解阻塞队列
通俗:创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)
适用:一个任务一个任务执行的场景
4、NewScheduledThreadPool:
底层:创建ScheduledThreadPoolExecutor实例,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0;unit为:TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构
适用:周期性执行任务的场景
最后给你说一下线程池任务执行流程:
当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
当workQueue已满,且maximumPoolSizecorePoolSize时,新提交任务会创建新线程执行任务
当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
package tender nmem eris drawexpert service impl;
import java util List;
import ncurrent ExecutorService;
import ncurrent Executors;
import llections map ListOrderedMap;
/**
* 类 线程池类 控制程序线程实例个数 并实例线程
* @author yangtb
* 时间 / /
*
*/
public class ThreadPool {
private ExecutorService exe=null;//线程池
public ThreadPool(int pool_size)
{
exe=Executors newFixedThreadPool(pool_size);//创建线程池
System out println( the server is ready );
}
/**
* 运行循环实例线程 根据要实例的线程个数 传入条件ID
* @param worknum
*/
public void server(int worknum String id)
{
int i= ;
while(iworknum)
{
//实例指定个线程
WorkerThreadImpl t = new WorkerThreadImpl(id);
exe execute(t);//放入线程池
i++;
}
}
lishixinzhi/Article/program/Java/hx/201311/26215
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()