一.线程池管理类
/** * 线程池管理类 */ public class ThreadPoolFactory { private static final Log logger = LogFactory.getLog(ThreadPoolFactory.class); /** * 放入管理线程池线程间隔时间 */ private final static int PUT_DEFAULTPOOL_INTERVAL = 3; /** * 线程池保持对象 */ private static Map<String, ThreadPool> poolMap = new HashMap<String, ThreadPool>(); /** * 请求Request缓冲队列(缓存队列满时) */ public static final Queue<Runnable> msgQueue = new ConcurrentLinkedQueue<Runnable>(); static { // 访问请求Request缓存的调度线程 final Runnable accessBufferThread = new Runnable() { public void run() { // 查看是否有待定请求,如果有,则添加到线程池中 if (!msgQueue.isEmpty()) { Runnable r = msgQueue.poll(); logger.error("缓存队列线程放入默认线程池执行" + r); getThreadPool("defaultThreadPool").execute(r); } } }; // 调度线程池 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 定时调度 scheduler.scheduleAtFixedRate(accessBufferThread, 0, PUT_DEFAULTPOOL_INTERVAL, TimeUnit.SECONDS); } /** * 根据key取得对应线程池实例 * */ public static ThreadPool getThreadPool(String key) { ThreadPool obj = null; if (StringUtils.isNotBlank(key)) { obj = poolMap.get(key); if (obj == null) { synchronized (ThreadPoolFactory.class) { if (obj == null) { logger.error("线程池" + key + "不存在,已被创建"); obj = new ThreadPool(key); poolMap.put(key, obj); } } } else { logger.info("线程池" + key + "存在,已返回"); } } return obj; } /** * 静态工厂不允许被实例化 */ private ThreadPoolFactory() { } }二.线程池类
/** * 线程池类 */ public class ThreadPool { /** * 日志组件 */ private final Log logger = LogFactory.getLog(ThreadPool.class); /** * 线程池名字 */ private String name; /** * 线程池维护线程的最少数量 */ private final static int CORE_POOL_SIZE = 5; /** * 线程池维护线程的最大数量 */ private final static int MAX_POOL_SIZE = 100; /** * 线程池维护线程所允许的空闲时间 */ private final static int KEEP_ALIVE_TIME = 180; /** * 线程池所使用的缓冲队列大小 */ private final static int WORK_QUEUE_SIZE = 5000; /** * handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 */ final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { logger.error(name + "已满, 线程被放入公共队列中等待执行 " + r); boolean result = ThreadPoolFactory.msgQueue.offer(r); if (!result) { logger.error("放入等待队列失败,系统发生严重错误"); } } }; /** * 线程池 */ private ThreadPoolExecutor threadPool; /** * 构造方法 */ ThreadPool(String name) { logger.error("CORE_POOL_SIZE=" + CORE_POOL_SIZE + ", MAX_POOL_SIZE=" + MAX_POOL_SIZE + ", KEEP_ALIVE_TIME=" + KEEP_ALIVE_TIME + ", WORK_QUEUE_SIZE=" + WORK_QUEUE_SIZE); this.name = name; threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler); } public void execute(Runnable task) { threadPool.execute(task); } }3.场景应用
//日志类(使用本地缓存实现线程隔离) public class LogManager { private static ThreadPoolManager threadPool = ThreadPoolManager.getInstance("logManager"); private static ThreadLocal local = new ThreadLocal(); /** * 初始化 */ static { initialize(); } /** * 初始化 * @see [类、类#方法、类#成员] */ public static void initialize() { SystemLogBean log = new SystemLogBean(); local.set(log); setOperstatus("0"); } /** * 返回logbean对象 * @return * @see [类、类#方法、类#成员] */ public static SystemLogBean getLog() { SystemLogBean log = (SystemLogBean) local.get(); if (log == null) { log = new SystemLogBean(); setLog(log); setOperstatus("1"); } return log; } public static String getSessionId() { SystemLogBean log = getLog(); return log.getSessionid(); } public static String getReqSeq() { SystemLogBean log = getLog(); return log.getReq_seq(); } public static void setReqSeq(String reqSeq) { SystemLogBean log = getLog(); log.setReq_seq(reqSeq); setLog(log); } /** * 获取对象 * @return */ public static Object getObject() { SystemLogBean log = getLog(); return log.getObject(); } /** * 设置对象 * @param obj */ public static void setObject(Object obj) { SystemLogBean log = getLog(); log.setObject(obj); setLog(log); } /** * 设置手机号 * @param telnum */ public static void setTelnum(String telnum) { SystemLogBean log = getLog(); log.setTelnum(telnum); setLog(log); } /** * 存入日志对象 * @param log * @see [类、类#方法、类#成员] */ public static void setLog(SystemLogBean log) { local.set(log); } public static void setProxyIp(String proxyIp) { SystemLogBean log = getLog(); log.setProxyIp(proxyIp); setLog(log); } /** * <一句话功能简述>设置array转发ip <功能详细描述> * @param forwardIp * @see [类、类#方法、类#成员] */ public static void setForwardIp(String forwardIp) { SystemLogBean log = getLog(); log.setForwardIp(forwardIp); setLog(log); } /** * <一句话功能简述>设置用户浏览器信息 <功能详细描述> * @param userAgent * @see [类、类#方法、类#成员] */ public static void setUserAgent(String userAgent) { SystemLogBean log = getLog(); log.setUserAgent(subStrByte(userAgent, 150)); setLog(log); } public static String getUserAgent() { SystemLogBean log = getLog(); return log.getUserAgent(); } /** * 设置业务类型 * @param value * @see [类、类#方法、类#成员] */ public static void setBiz_definition_id(String biz_definition_id) { SystemLogBean log = getLog(); log.setBiz_definition_id(subStrByte(biz_definition_id, 32)); setLog(log); } /** * 设置操作记录信息 * @param describe * @see [类、类#方法、类#成员] */ public static void setDescribe(String describe) { SystemLogBean log = getLog(); describe = subStrByte(describe, 512); log.setDescribe(describe); setLog(log); } /** * 增加操作记录信息 * @param describe * @see [类、类#方法、类#成员] */ public static void appendDescribe(String describe) { SystemLogBean log = getLog(); describe = (log.getDescribe() == null ? "" : log.getDescribe() + "~") + describe; describe = subStrByte(describe, 768); log.setDescribe(describe); setLog(log); } /** * 增加异常信息 * @param describe * @see [类、类#方法、类#成员] */ public static void appendExceptionInfo(Exception e) { SystemLogBean log = getLog(); // 保存错误堆栈信息 String message = e.toString() + "<br/>"; StackTraceElement[] trace = e.getStackTrace(); for (int i = 0; i < trace.length; i++) { message += "\tat " + trace[i].toString() + "<br/>"; } message = (log.getExceptionInfo() == null ? "" : log.getExceptionInfo() + "~") + message; message = subStrByte(message, 4000); log.setExceptionInfo(message); setLog(log); } /** * 增加请求参数 * @param describe * @see [类、类#方法、类#成员] */ public static void appendRequestParam(String params) { if (params != null && params.trim().length() > 0) { SystemLogBean log = getLog(); params = (log.getRequest_param() == null ? "" : log.getRequest_param() + "~") + params; params = subStrByte(params, 3900); log.setRequest_param(params); setLog(log); } } /** * 增加记录响应参数 * @param describe * @see [类、类#方法、类#成员] */ public static void appendResponseParam(String params) { if (params != null && params.trim().length() > 0) { SystemLogBean log = getLog(); params = (log.getResponse_param() == null ? "" : log.getResponse_param() + "~") + params; params = subStrByte(params, 3900); log.setResponse_param(params); setLog(log); } } /** * 增加请求流水号记录 * @param isrelaid * @see [类、类#方法、类#成员] */ public static void appendChannelRela(ChannelRelaBean channelReala) { if (null != channelReala) { SystemLogBean log = getLog(); log.getRealaList().add(channelReala); setLog(log); } } /** * 增加记录接口ID * @param describe * @see [类、类#方法、类#成员] */ public static void appendProcessCode(String processCode) { SystemLogBean log = getLog(); processCode = (log.getProcess_code() == null ? "" : log.getProcess_code() + "~") + processCode; log.setProcess_code(processCode); setLog(log); } /** * 设置操作结果状态 * @param value * @see [类、类#方法、类#成员] */ public static void setOperstatus(String value) { SystemLogBean log = getLog(); log.setOperstatus(value); setLog(log); } /** * 设置seesionid * @param value * @see [类、类#方法、类#成员] */ public static void setSessionid(String value) { SystemLogBean log = getLog(); log.setSessionid(value); setLog(log); } /** * 设置biz_begindate */ public static void setBiz_begindate(String value) { SystemLogBean log = getLog(); log.setBiz_begindate(value); setLog(log); } /** * 设置biz_enddate */ public static void setBiz_endate(String value) { SystemLogBean log = getLog(); log.setBiz_enddate(value); setLog(log); } /** * 设置action_url */ public static void setAction_url(String value) { SystemLogBean log = getLog(); log.setAction_url(value); setLog(log); } /** * 设置interface_param */ public static void setInterface_param(String interfaceParam) { SystemLogBean log = getLog(); interfaceParam = subStrByte(interfaceParam, 2000); log.setInterface_param(interfaceParam); setLog(log); } /** * 增加interfaceParam * @param interfaceParam */ public static void appendInterface_param(String interfaceParam) { SystemLogBean log = getLog(); interfaceParam = (log.getInterface_param() == null ? "" : log.getInterface_param() + "~") + interfaceParam; log.setInterface_param(subStrByte(interfaceParam, 2000)); setLog(log); } /** * 设置retcode */ public static void setRetcode(String value) { SystemLogBean log = getLog(); log.setRetcode(value); setLog(log); } /** * 设置resultMsg */ public static void setResultmsg(String resultMsg) { SystemLogBean log = getLog(); resultMsg = subStrByte(resultMsg, 2000); log.setResultmsg(resultMsg); setLog(log); } /** * 增加resultMsg * @param resultMsg */ public static void appendResultmsg(String resultMsg) { SystemLogBean log = getLog(); resultMsg = (log.getResultmsg() == null ? "" : log.getResultmsg() + "~") + resultMsg; log.setResultmsg(subStrByte(resultMsg, 2000)); setLog(log); } /** * 设置ncode(如果有的话) */ public static void setNcode(String value) { SystemLogBean log = getLog(); log.setNcode(value); setLog(log); } /** * 设置call_startdate(如果有的话) */ public static void setCall_startdate(String value) { SystemLogBean log = getLog(); log.setCall_startdate(value); setLog(log); } /** * 设置ncode(如果有的话) */ public static void setCall_enddate(String value) { SystemLogBean log = getLog(); log.setCall_enddate(value); setLog(log); } public static String getIpAdd() { SystemLogBean log = getLog(); return log.getIpaddr(); } /** * 写日志 */ public static void write(boolean write) { SystemLogBean log = getLog(); log.setWrite(write); setLog(log); } /** * 写入日志表 * @see [类、类#方法、类#成员] */ public static void logging() { SystemLogBean log = (SystemLogBean) local.get(); // 设置业务类型的业务可以写入操作日志 if (log.isWrite() && log.getBiz_definition_id() != null && !"".equals(log.getBiz_definition_id())) { threadPool.addTask(new LogWriter(log)); } } /** * 实例销毁 * @see [类、类#方法、类#成员] */ public static void destory() { local.remove(); } /** * 获取客户端IP地址 * @return String 客户端IP地址 */ public static String getIpAdd(HttpServletRequest request) { String ip = request.getHeader("x-forwarded-for"); // 在反向代理 X-Forwarded-For中获取客户端真实的IP地址 if (null != ip && 0 < ip.length() && !"unknown".equalsIgnoreCase(ip)) { String[] ipArr = ip.split(","); for (int i = 0; i < ipArr.length; i++) { if (!"unknown".equalsIgnoreCase(ipArr[i]) && !"".equals(ipArr[i].trim())) { ip = ipArr[i]; break; } } } if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) { ip = request.getHeader("Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) { ip = request.getHeader("WL-Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) { ip = request.getRemoteAddr(); } return ip; } /** * <一句话功能简述>获取代理ip <功能详细描述> * @param request * @return * @see [类、类#方法、类#成员] */ public static String getProxyIp(HttpServletRequest request) { String ip = request.getHeader("x-forwarded-for"); return ip; } /** * <一句话功能简述>获取array转发ip <功能详细描述> * @param request * @return * @see [类、类#方法、类#成员] */ public static String getForwardIp(HttpServletRequest request) { String ip = request.getRemoteAddr(); return ip; } /** * 按字节截取字符串 * @param str * @param subSLength * @return */ public static String subStrByte(String str, int subLength) { if (StringUtils.isBlank(str)) return ""; byte[] data; try { data = str.getBytes("GBK"); if (data.length < subLength) { return str; } return new String(ArrayUtils.subarray(data, 0, subLength - 1), "GBK"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ""; } } public static void setSystemLogBean(SystemLogBean log) { setLog(log); } } ======================================================================================= //线程池管理类 public class ThreadPoolManager { private static Hashtable<String, ThreadPoolManager> tb = new Hashtable<String, ThreadPoolManager>(); // 线程池维护线程的最少数量 private final static int CORE_POOL_SIZE = 30; // 线程池维护线程的最大数量 private final static int MAX_POOL_SIZE = 60; // 线程池维护线程所允许的空闲时间 private final static int KEEP_ALIVE_TIME = 180; // 线程池所使用的缓冲队列大小 private final static int WORK_QUEUE_SIZE = 10; // 请求Request缓冲队列 public Queue<Runnable> msgQueue = new LinkedList<Runnable>(); // 访问请求Request缓存的调度线程 final Runnable accessBufferThread = new Runnable() { public void run() { // 查看是否有待定请求,如果有,则添加到线程池中 if (hasMoreAcquire()) { System.out.println("有待定请求,添加到线程池中..."); // SearchTask task = ( SearchTask ) msgQueue.poll(); threadPool.execute(msgQueue.poll()); } } }; // handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 final RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r + " request 放入队列中重新等待执行 " + r); msgQueue.offer(r); } }; // 管理线程池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler); // 调度线程池 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 定时调度 final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBufferThread, 0, 1, TimeUnit.SECONDS); /** * 根据key取得对应实例 * * @param key * @return */ public static synchronized ThreadPoolManager getInstance(String key) { ThreadPoolManager obj = tb.get(key); if (obj == null) { // System.out.println("new thread pool :" + key); obj = new ThreadPoolManager(); tb.put(key, obj); //new ThreadPoolManager().poolMonitor(); } return obj; } private ThreadPoolManager() { } private boolean hasMoreAcquire() { //System.err.println("待执行数量:"+msgQueue.size()); return !msgQueue.isEmpty(); } public void addTask(Runnable task) { //System.err.println("目前线程池活动线程数量:"+threadPool.getActiveCount()); // System.out.println("准备处理线程..."+task); threadPool.execute(task); //System.out.println("目前线程池活动线程数量:"+threadPool.getActiveCount()); // System.out.println("threadPool.getActiveCount():"+threadPool.getActiveCount()+" , MAX_POOL_SIZE:"+MAX_POOL_SIZE+", QUENE_SIZE:"+threadPool.getQueue().size()); } /** * 判断线程池是否已满 * @return true 已满,false 未满 */ public boolean isPoolFull(){ // System.out.println("threadPool.getActiveCount():"+threadPool.getActiveCount()+" , MAX_POOL_SIZE:"+MAX_POOL_SIZE+", QUENE_SIZE:"+threadPool.getQueue().size()); return threadPool.getActiveCount()>=MAX_POOL_SIZE; } public ThreadPoolExecutor getThreadPool(){ return threadPool; } /* private void poolMonitor(){ new Thread() { public void run() { while(true){ try { sleep(1*1000L); ThreadPoolExecutor t=ThreadPoolManager.getInstance("tmallNotify").getThreadPool(); System.out.println("monitor>>>threadPool.getActiveCount():"+t.getActiveCount()+" , MAX_POOL_SIZE:"+MAX_POOL_SIZE+", QUENE_SIZE:"+t.getQueue().size()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }.start(); }*/ public <V> Future<V> submit(Callable<V> task) { return threadPool.submit(task); } }