From 3404d3b4f324303809a89ce0e74f3d13aa39ee51 Mon Sep 17 00:00:00 2001 From: RT Date: Fri, 5 Apr 2019 12:42:48 +0530 Subject: [PATCH 1/3] making aero thread start id configurable --- api/src/main/java/org/xnio/LockFreeMultiQueue.java | 10 ++++++---- api/src/main/java/org/xnio/Options.java | 5 +++++ api/src/main/java/org/xnio/XnioWorker.java | 3 ++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/org/xnio/LockFreeMultiQueue.java b/api/src/main/java/org/xnio/LockFreeMultiQueue.java index 283c6dbe0b..528cb8835a 100644 --- a/api/src/main/java/org/xnio/LockFreeMultiQueue.java +++ b/api/src/main/java/org/xnio/LockFreeMultiQueue.java @@ -31,10 +31,12 @@ public class LockFreeMultiQueue implements BlockingQueue { private int capacity; private boolean threadAffinity; private StringBuffer stringBuffer; + private int aeroStartID; - public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapacity) { + public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapacity, int aeroStartID) { this.capacity = queueCount; this.threadAffinity = threadAffinity; + this.aeroStartID = aeroStartID; manyToManyConcurrentArrayQueues = new ArrayList<>(); for (int c = 0; c < capacity; c++) { manyToManyConcurrentArrayQueues.add( @@ -45,7 +47,7 @@ public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapac writeThreadMap = new ConcurrentHashMap<>(); readSeq = new AtomicInteger(0); writeSeqIO = new AtomicInteger(0); - writeSeqAero = new AtomicInteger(8); + writeSeqAero = new AtomicInteger(aeroStartID); stringBuffer = new StringBuffer(); logQueueSizes(); } @@ -203,10 +205,10 @@ private ManyToManyConcurrentArrayQueue getWriteQueue() { if(!writeThreadMap.containsKey(tid)) { if(Thread.currentThread().getName().contains("nioEventLoopGroup")) { writeThreadMap.put(tid, writeSeqAero.getAndIncrement()); - writeSeqAero.compareAndSet(12, 8); + writeSeqAero.compareAndSet(capacity/2, aeroStartID); } else { writeThreadMap.put(tid, writeSeqIO.getAndIncrement()); - writeSeqIO.compareAndSet(8, 0); + writeSeqIO.compareAndSet(aeroStartID, 0); } index = writeThreadMap.get(tid); acquireAndLogIfRequired(tid, true); diff --git a/api/src/main/java/org/xnio/Options.java b/api/src/main/java/org/xnio/Options.java index 7f75da548e..80dd8df6f2 100644 --- a/api/src/main/java/org/xnio/Options.java +++ b/api/src/main/java/org/xnio/Options.java @@ -572,4 +572,9 @@ private Options() {} * size of one2one lock free queue */ public static final Option QUEUE_SIZE = Option.simple(Options.class, "QUEUE_SIZE", Integer.class); + + /** + * aero thread start id + */ + public static final Option AERO_THREAD_START_ID = Option.simple(Options.class, "AERO_THREAD_START_ID", Integer.class); } diff --git a/api/src/main/java/org/xnio/XnioWorker.java b/api/src/main/java/org/xnio/XnioWorker.java index 9c0981ca15..983d2cec88 100644 --- a/api/src/main/java/org/xnio/XnioWorker.java +++ b/api/src/main/java/org/xnio/XnioWorker.java @@ -112,7 +112,8 @@ protected XnioWorker(final Xnio xnio, final ThreadGroup threadGroup, final Optio taskQueue = new LockFreeMultiQueue<>( optionMap.get(Options.WORKER_TASK_MAX_THREADS, 16), optionMap.get(Options.THREAD_AFFINITY, false), - optionMap.get(Options.QUEUE_SIZE, 10000) + optionMap.get(Options.QUEUE_SIZE, 10000), + optionMap.get(Options.AERO_THREAD_START_ID, 8) ); this.coreSize = optionMap.get(Options.WORKER_TASK_CORE_THREADS, 4); final boolean markThreadAsDaemon = optionMap.get(Options.THREAD_DAEMON, false); From 96f7c728f523194bef9b1f73adb5c8ea10d645bd Mon Sep 17 00:00:00 2001 From: RT Date: Fri, 5 Apr 2019 13:17:21 +0530 Subject: [PATCH 2/3] separate thread affinity io/worker --- .../java/org/xnio/LockFreeMultiQueue.java | 23 ++++++++++++------- api/src/main/java/org/xnio/Options.java | 9 ++++++-- api/src/main/java/org/xnio/XnioWorker.java | 3 ++- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/api/src/main/java/org/xnio/LockFreeMultiQueue.java b/api/src/main/java/org/xnio/LockFreeMultiQueue.java index 528cb8835a..0a915e5a54 100644 --- a/api/src/main/java/org/xnio/LockFreeMultiQueue.java +++ b/api/src/main/java/org/xnio/LockFreeMultiQueue.java @@ -29,13 +29,15 @@ public class LockFreeMultiQueue implements BlockingQueue { private AtomicInteger writeSeqIO; private AtomicInteger writeSeqAero; private int capacity; - private boolean threadAffinity; + private boolean threadAffinityIO; + private boolean threadAffinityWorker; private StringBuffer stringBuffer; private int aeroStartID; - public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapacity, int aeroStartID) { + public LockFreeMultiQueue(int queueCount, boolean threadAffinityIO, boolean threadAffinityWorker, int queueCapacity, int aeroStartID) { this.capacity = queueCount; - this.threadAffinity = threadAffinity; + this.threadAffinityIO = threadAffinityIO; + this.threadAffinityWorker = threadAffinityWorker; this.aeroStartID = aeroStartID; manyToManyConcurrentArrayQueues = new ArrayList<>(); for (int c = 0; c < capacity; c++) { @@ -226,15 +228,20 @@ private ManyToManyConcurrentArrayQueue getWriteQueue() { private void acquireAndLogIfRequired(Long tid, boolean isWrite) { Map threadMap = isWrite ? writeThreadMap : readThreadMap; String mapType = isWrite ? "writeThreadMap" : "readThreadMap"; - if (threadAffinity) { + + if(isWrite && threadAffinityIO) { + AffinityLock affinityLock = AffinityLock.acquireLock(true); + logger.info( + Thread.currentThread().getName() + " : Assigned " + mapType + " thread id : " + tid + + " : queue id : " + threadMap.get(tid) + " : cpu id : " + affinityLock.cpuId()); + stringBuffer.append(affinityLock.cpuId()); + stringBuffer.append(' '); + } + else if((!isWrite) && threadAffinityWorker) { AffinityLock affinityLock = AffinityLock.acquireLock(true); logger.info( Thread.currentThread().getName() + " : Assigned " + mapType + " thread id : " + tid + " : queue id : " + threadMap.get(tid) + " : cpu id : " + affinityLock.cpuId()); - if (isWrite) { - stringBuffer.append(affinityLock.cpuId()); - stringBuffer.append(' '); - } } else { logger.info( diff --git a/api/src/main/java/org/xnio/Options.java b/api/src/main/java/org/xnio/Options.java index 80dd8df6f2..2dc3d004ca 100644 --- a/api/src/main/java/org/xnio/Options.java +++ b/api/src/main/java/org/xnio/Options.java @@ -564,9 +564,14 @@ private Options() {} public static final Option WATCHER_POLL_INTERVAL = Option.simple(Options.class, "WATCHER_POLL_INTERVAL", Integer.class); /** - * thread affinity default false + * thread affinity io default false */ - public static final Option THREAD_AFFINITY = Option.simple(Options.class, "THREAD_AFFINITY", Boolean.class); + public static final Option THREAD_AFFINITY_IO = Option.simple(Options.class, "THREAD_AFFINITY_IO", Boolean.class); + + /** + * thread affinity worker default false + */ + public static final Option THREAD_AFFINITY_WORKER = Option.simple(Options.class, "THREAD_AFFINITY_WORKER", Boolean.class); /** * size of one2one lock free queue diff --git a/api/src/main/java/org/xnio/XnioWorker.java b/api/src/main/java/org/xnio/XnioWorker.java index 983d2cec88..a811082218 100644 --- a/api/src/main/java/org/xnio/XnioWorker.java +++ b/api/src/main/java/org/xnio/XnioWorker.java @@ -111,7 +111,8 @@ protected XnioWorker(final Xnio xnio, final ThreadGroup threadGroup, final Optio name = workerName; taskQueue = new LockFreeMultiQueue<>( optionMap.get(Options.WORKER_TASK_MAX_THREADS, 16), - optionMap.get(Options.THREAD_AFFINITY, false), + optionMap.get(Options.THREAD_AFFINITY_IO, false), + optionMap.get(Options.THREAD_AFFINITY_WORKER, false), optionMap.get(Options.QUEUE_SIZE, 10000), optionMap.get(Options.AERO_THREAD_START_ID, 8) ); From 05580290635abac365946ec7cfe07bc2f6fdab37 Mon Sep 17 00:00:00 2001 From: Raghu Teja Date: Fri, 5 Apr 2019 14:45:28 +0530 Subject: [PATCH 3/3] Update LockFreeMultiQueue.java --- api/src/main/java/org/xnio/LockFreeMultiQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/org/xnio/LockFreeMultiQueue.java b/api/src/main/java/org/xnio/LockFreeMultiQueue.java index 0a915e5a54..3263d08cf3 100644 --- a/api/src/main/java/org/xnio/LockFreeMultiQueue.java +++ b/api/src/main/java/org/xnio/LockFreeMultiQueue.java @@ -207,7 +207,7 @@ private ManyToManyConcurrentArrayQueue getWriteQueue() { if(!writeThreadMap.containsKey(tid)) { if(Thread.currentThread().getName().contains("nioEventLoopGroup")) { writeThreadMap.put(tid, writeSeqAero.getAndIncrement()); - writeSeqAero.compareAndSet(capacity/2, aeroStartID); + writeSeqAero.compareAndSet(capacity, aeroStartID); } else { writeThreadMap.put(tid, writeSeqIO.getAndIncrement()); writeSeqIO.compareAndSet(aeroStartID, 0);