Java多线程队列实现
项目中某些请求频率过高,需要放到队列中排队执行
本示例使用多线程去消费指定队列数据
调用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
public static void main(String[] args) { Handler<TestTheardQueue, Map> fun = new Handler<TestTheardQueue, Map>() { @Override public void handler(Map event) { System.out.println(event.get("key")); try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; //方式1 使用默认线程池创建队列,功能函数是定义好的变量 QueueChannel<Map> channel = QueueChannel.build("test1", fun); Map<String, String> map = new HashMap<String, String>(); for(int i=0; i<1500; i++) { map = new HashMap<String, String>(); map.put("key", "第" + i + "个====test1" ); channel.offerLast(map); } //方式1 使用独立线程池(线程上限为3)创建队列,功能函数是定义好的变量 QueueChannel<Map> channel2 = QueueChannel.build("test2", fun, 3); Map<String, String> map2 = new HashMap<String, String>(); for(int i=0; i<1500; i++) { map2 = new HashMap<String, String>(); map2.put("key", "第" + i + "个====test2" ); channel2.offerLast(map2); } //方式1 使用默认线程池创建队列,功能函数在参数中定义 QueueChannel<Map> channel3 = QueueChannel.build( "test3", new Handler<TestTheardQueue, Map>(){ @Override public void handler(Map event) { System.out.println(event.get("key")); } }, 3); Map<String, String> map3 = new HashMap<String, String>(); for(int i=0; i<1500; i++) { map3 = new HashMap<String, String>(); map3.put("key", "第" + i + "个====test3" ); channel3.offerLast(map3); } class TestHandler extends Handler<TestTheardQueue, Map> { @Override public void handler(Map event) { System.out.println(event.get("key")); } } //方式1 使用默认线程池创建队列,功能函数可以是一个继承Handler的类 QueueChannel<Map> channel4 = QueueChannel.build("test4", new TestHandler()); Map<String, String> map4 = new HashMap<String, String>(); for(int i=0; i<150; i++) { map4 = new HashMap<String, String>(); map4.put("key", "第" + i + "个====test4" ); channel4.offerLast(map4); } } |
实现类:
QueueChannel.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
package net.code2048.common.theadQueue; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import net.code2048.common.function.Handler; /** * 多线程队列服务 * @author 破晓(www.code2048.net) */ public class QueueChannel<D> { /** 所有队列 */ private static Map<String, QueueChannel<?>> allQueue = new HashMap<String, QueueChannel<?>>(); /** 默认线程池 */ private static ThreadFactory defaultThreadFactory = new ThreadFactory(4); /** * 获取队列实例 * @param key * @return */ @SuppressWarnings("unchecked") public static <D> QueueChannel<D> getInstance(String key) { if(!allQueue.containsKey(key)) allQueue.put(key, new QueueChannel<D>()); return (QueueChannel<D>) allQueue.get(key); } /** * 构建队列 * @param key * @param fun 队列执行内容 * @return */ public static <D> QueueChannel<D> build(String key, Handler<?, D> fun) { return build(key, fun, false); } /** * 构建队列 * @param key * @param fun 队列执行内容 * @param prior 是否优先执行 * @return */ public static <D> QueueChannel<D> build(String key, Handler<?, D> fun, boolean prior) { QueueChannel<D> channel = QueueChannel.getInstance(key); channel.callBack = fun; channel.factory = defaultThreadFactory; channel.factory.addChannel(channel, prior); channel._Check = true; return channel; } /** * 构建队列,该队列拥有独占线程池 * @param key * @param fun 队列执行内容 * @param theardCount 独占线程池的线程数量 * @return */ public static <D> QueueChannel<D> build(String key, Handler<?, D> fun, int theardCount) { QueueChannel<D> channel = QueueChannel.getInstance(key); channel.callBack = fun; channel.factory = new ThreadFactory(theardCount); channel.factory.addChannel(channel, false); channel._Check = true; return channel; } public Handler<?, D> callBack; private ThreadFactory factory; /** 双向堵塞队列 */ private BlockingDeque<D> queues = new LinkedBlockingDeque<D>(); /** 队列是否有效 */ private boolean _Check = false; /** * 队列是否有效 * @return */ public boolean isCheck() { return _Check; } private QueueChannel() { } /** * 将数据添加到表头部,返回状态true,false * @param message */ public void offerFirst(D message) { if(!_Check) throw new IllegalAccessError("请先使用QueueChannel.build 构建实例"); queues.offerFirst(message); factory.execute(); } /** * 将数据添加到表尾部,返回状态true,false * @param message */ public void offerLast(D message){ if(!_Check) throw new IllegalAccessError("请先使用QueueChannel.build 构建实例"); queues.offerLast(message); factory.execute(); } public int size(){ return queues.size(); } public boolean callBack(TheadComsume theadComsume) throws Exception { // System.out.println("线程工作:" + theadComsume.hashCode() + "======" + queues.size()); if(queues.size() > 0) { D message = queues.takeFirst(); callBack.handler(message); return true; } return false; } /** * 取出队列第一个元素并从队列中移除, * @return */ public D takeFirst(){ D message= null; try { message = queues.takeFirst(); int size = queues.size(); if(size > 0) System.err.println(size+"队列长度"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return message; } } |
ThreadFactory.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
package net.code2048.common.theadQueue; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * * @author 破晓(www.code2048.net) * */ public class ThreadFactory { private List<QueueChannel<?>> queueChannel = new ArrayList<QueueChannel<?>>(); private List<TheadComsume> theads = new ArrayList<TheadComsume>(); private ExecutorService executorService = Executors.newFixedThreadPool(16); public ThreadFactory(int count) { TheadComsume item; for(int i=0; i<count; i++) { item = new TheadComsume(this); // theads.add(item); executorService.execute(item); } } /** * 给线程添加可执行队列 * @param channel 队列 * @param prior 是否优先执行 */ public synchronized void addChannel(QueueChannel<?> channel, boolean prior) { if(prior) queueChannel.add(0, channel); else queueChannel.add(channel); } /** * 获取非空队列,如果所有队列都是空则返回第一个队列 * @param theadComsume * @return * @throws Exception */ public synchronized QueueChannel<?> getChannel(TheadComsume theadComsume) throws Exception { for(QueueChannel<?> item : queueChannel) { if(item.callBack(theadComsume)) return item; } return null; } /** * 回收执行完的线程 * @param item */ public void push(TheadComsume item) { // System.out.println("回收线程:" + item.hashCode()); theads.add(item); } public void execute() { if(theads.size() > 0) { TheadComsume item = theads.remove(0); // System.out.println("激活线程:" + item.hashCode()); item.redo(); } } } |
TheadComsume.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package net.code2048.common.theadQueue; import org.apache.log4j.Logger; /** * * @author 破晓(www.code2048.net) * */ public class TheadComsume implements Runnable { private final static Logger log = Logger.getLogger(TheadComsume.class); private ThreadFactory _parent; public TheadComsume(ThreadFactory parent) { _parent = parent; } public synchronized void redo() { notify(); } @Override public void run() { try { synchronized(this) { while (true) { QueueChannel<?> channel = _parent.getChannel(this); if(channel == null) { _parent.push(this); wait(); } } } } catch (Exception e) { log.error("目标线程:"+hashCode(), e); e.printStackTrace(); } } } |
其中 net.code2048.common.function.Handler 可以查看下面的文章
在这里贴一下:
Handler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package net.code2048.common.function; /** * * @author 破晓(www.code2048.net) * * @param <P> * @param <D> */ public abstract class Handler<P,D> extends Function<Object,P> { public Handler(P parent) { super(parent); } public Handler() { super(null); } @SuppressWarnings("unchecked") @Override public Object apply(Object[] argArray) { handler((D) argArray[0]); return null; } public abstract void handler(D event); } |
Function.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package net.code2048.common.function; /** * * @author 破晓(www.code2048.net) * * @param <T> * @param <P> */ public abstract class Function<T, P> implements IFunction<T, P> { public Function() { this(null); } protected P _parent; public Function(P parent) { _parent = parent; } @Override public abstract T apply(Object[] argArray); @Override public T call(Object... args) { return apply(args); } @Override public P getParent() { return _parent; } } |
IFunction.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
package net.code2048.common.function; /** * * @author 破晓(www.code2048.net) * * @param <T> * @param <P> */ public interface IFunction<T, P> { T apply(Object[] argArray); T call(Object... args); P getParent(); } |