diff --git a/api/src/main/java/org/xnio/LockFreeMultiQueue.java b/api/src/main/java/org/xnio/LockFreeMultiQueue.java index 283c6dbe0b..3263d08cf3 100644 --- a/api/src/main/java/org/xnio/LockFreeMultiQueue.java +++ b/api/src/main/java/org/xnio/LockFreeMultiQueue.java @@ -29,12 +29,16 @@ public class LockFreeMultiQueue implements BlockingQueue { private AtomicInteger writeSeqIO; private AtomicInteger writeSeqAero; private int capacity; - private boolean threadAffinity; + private boolean threadAffinityIO; + private boolean threadAffinityWorker; private StringBuffer stringBuffer; + private int aeroStartID; - public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapacity) { + public LockFreeMultiQueue(int queueCount, boolean threadAffinityIO, boolean threadAffinityWorker, int queueCapacity, int aeroStartID) { this.capacity = queueCount; - this.threadAffinity = threadAffinity; + this.threadAffinityIO = threadAffinityIO; + this.threadAffinityWorker = threadAffinityWorker; + this.aeroStartID = aeroStartID; manyToManyConcurrentArrayQueues = new ArrayList<>(); for (int c = 0; c < capacity; c++) { manyToManyConcurrentArrayQueues.add( @@ -45,7 +49,7 @@ public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapac writeThreadMap = new ConcurrentHashMap<>(); readSeq = new AtomicInteger(0); writeSeqIO = new AtomicInteger(0); - writeSeqAero = new AtomicInteger(8); + writeSeqAero = new AtomicInteger(aeroStartID); stringBuffer = new StringBuffer(); logQueueSizes(); } @@ -203,10 +207,10 @@ private ManyToManyConcurrentArrayQueue getWriteQueue() { if(!writeThreadMap.containsKey(tid)) { if(Thread.currentThread().getName().contains("nioEventLoopGroup")) { writeThreadMap.put(tid, writeSeqAero.getAndIncrement()); - writeSeqAero.compareAndSet(12, 8); + writeSeqAero.compareAndSet(capacity, aeroStartID); } else { writeThreadMap.put(tid, writeSeqIO.getAndIncrement()); - writeSeqIO.compareAndSet(8, 0); + writeSeqIO.compareAndSet(aeroStartID, 0); } index = writeThreadMap.get(tid); acquireAndLogIfRequired(tid, true); @@ -224,15 +228,20 @@ private ManyToManyConcurrentArrayQueue getWriteQueue() { private void acquireAndLogIfRequired(Long tid, boolean isWrite) { Map threadMap = isWrite ? writeThreadMap : readThreadMap; String mapType = isWrite ? "writeThreadMap" : "readThreadMap"; - if (threadAffinity) { + + if(isWrite && threadAffinityIO) { + AffinityLock affinityLock = AffinityLock.acquireLock(true); + logger.info( + Thread.currentThread().getName() + " : Assigned " + mapType + " thread id : " + tid + + " : queue id : " + threadMap.get(tid) + " : cpu id : " + affinityLock.cpuId()); + stringBuffer.append(affinityLock.cpuId()); + stringBuffer.append(' '); + } + else if((!isWrite) && threadAffinityWorker) { AffinityLock affinityLock = AffinityLock.acquireLock(true); logger.info( Thread.currentThread().getName() + " : Assigned " + mapType + " thread id : " + tid + " : queue id : " + threadMap.get(tid) + " : cpu id : " + affinityLock.cpuId()); - if (isWrite) { - stringBuffer.append(affinityLock.cpuId()); - stringBuffer.append(' '); - } } else { logger.info( diff --git a/api/src/main/java/org/xnio/Options.java b/api/src/main/java/org/xnio/Options.java index 7f75da548e..2dc3d004ca 100644 --- a/api/src/main/java/org/xnio/Options.java +++ b/api/src/main/java/org/xnio/Options.java @@ -564,12 +564,22 @@ private Options() {} public static final Option WATCHER_POLL_INTERVAL = Option.simple(Options.class, "WATCHER_POLL_INTERVAL", Integer.class); /** - * thread affinity default false + * thread affinity io default false */ - public static final Option THREAD_AFFINITY = Option.simple(Options.class, "THREAD_AFFINITY", Boolean.class); + public static final Option THREAD_AFFINITY_IO = Option.simple(Options.class, "THREAD_AFFINITY_IO", Boolean.class); + + /** + * thread affinity worker default false + */ + public static final Option THREAD_AFFINITY_WORKER = Option.simple(Options.class, "THREAD_AFFINITY_WORKER", Boolean.class); /** * size of one2one lock free queue */ public static final Option QUEUE_SIZE = Option.simple(Options.class, "QUEUE_SIZE", Integer.class); + + /** + * aero thread start id + */ + public static final Option AERO_THREAD_START_ID = Option.simple(Options.class, "AERO_THREAD_START_ID", Integer.class); } diff --git a/api/src/main/java/org/xnio/XnioWorker.java b/api/src/main/java/org/xnio/XnioWorker.java index 9c0981ca15..a811082218 100644 --- a/api/src/main/java/org/xnio/XnioWorker.java +++ b/api/src/main/java/org/xnio/XnioWorker.java @@ -111,8 +111,10 @@ protected XnioWorker(final Xnio xnio, final ThreadGroup threadGroup, final Optio name = workerName; taskQueue = new LockFreeMultiQueue<>( optionMap.get(Options.WORKER_TASK_MAX_THREADS, 16), - optionMap.get(Options.THREAD_AFFINITY, false), - optionMap.get(Options.QUEUE_SIZE, 10000) + optionMap.get(Options.THREAD_AFFINITY_IO, false), + optionMap.get(Options.THREAD_AFFINITY_WORKER, false), + optionMap.get(Options.QUEUE_SIZE, 10000), + optionMap.get(Options.AERO_THREAD_START_ID, 8) ); this.coreSize = optionMap.get(Options.WORKER_TASK_CORE_THREADS, 4); final boolean markThreadAsDaemon = optionMap.get(Options.THREAD_DAEMON, false);