Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions api/src/main/java/org/xnio/LockFreeMultiQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ public class LockFreeMultiQueue<T> implements BlockingQueue<T> {
private AtomicInteger writeSeqIO;
private AtomicInteger writeSeqAero;
private int capacity;
private boolean threadAffinity;
private boolean threadAffinityIO;
private boolean threadAffinityWorker;
private StringBuffer stringBuffer;
private int aeroStartID;

public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapacity) {
public LockFreeMultiQueue(int queueCount, boolean threadAffinityIO, boolean threadAffinityWorker, int queueCapacity, int aeroStartID) {
this.capacity = queueCount;
this.threadAffinity = threadAffinity;
this.threadAffinityIO = threadAffinityIO;
this.threadAffinityWorker = threadAffinityWorker;
this.aeroStartID = aeroStartID;
manyToManyConcurrentArrayQueues = new ArrayList<>();
for (int c = 0; c < capacity; c++) {
manyToManyConcurrentArrayQueues.add(
Expand All @@ -45,7 +49,7 @@ public LockFreeMultiQueue(int queueCount, boolean threadAffinity, int queueCapac
writeThreadMap = new ConcurrentHashMap<>();
readSeq = new AtomicInteger(0);
writeSeqIO = new AtomicInteger(0);
writeSeqAero = new AtomicInteger(8);
writeSeqAero = new AtomicInteger(aeroStartID);
stringBuffer = new StringBuffer();
logQueueSizes();
}
Expand Down Expand Up @@ -203,10 +207,10 @@ private ManyToManyConcurrentArrayQueue<T> getWriteQueue() {
if(!writeThreadMap.containsKey(tid)) {
if(Thread.currentThread().getName().contains("nioEventLoopGroup")) {
writeThreadMap.put(tid, writeSeqAero.getAndIncrement());
writeSeqAero.compareAndSet(12, 8);
writeSeqAero.compareAndSet(capacity, aeroStartID);
} else {
writeThreadMap.put(tid, writeSeqIO.getAndIncrement());
writeSeqIO.compareAndSet(8, 0);
writeSeqIO.compareAndSet(aeroStartID, 0);
}
index = writeThreadMap.get(tid);
acquireAndLogIfRequired(tid, true);
Expand All @@ -224,15 +228,20 @@ private ManyToManyConcurrentArrayQueue<T> getWriteQueue() {
private void acquireAndLogIfRequired(Long tid, boolean isWrite) {
Map<Long, Integer> threadMap = isWrite ? writeThreadMap : readThreadMap;
String mapType = isWrite ? "writeThreadMap" : "readThreadMap";
if (threadAffinity) {

if(isWrite && threadAffinityIO) {
AffinityLock affinityLock = AffinityLock.acquireLock(true);
logger.info(
Thread.currentThread().getName() + " : Assigned " + mapType + " thread id : " + tid
+ " : queue id : " + threadMap.get(tid) + " : cpu id : " + affinityLock.cpuId());
stringBuffer.append(affinityLock.cpuId());
stringBuffer.append(' ');
}
else if((!isWrite) && threadAffinityWorker) {
AffinityLock affinityLock = AffinityLock.acquireLock(true);
logger.info(
Thread.currentThread().getName() + " : Assigned " + mapType + " thread id : " + tid
+ " : queue id : " + threadMap.get(tid) + " : cpu id : " + affinityLock.cpuId());
if (isWrite) {
stringBuffer.append(affinityLock.cpuId());
stringBuffer.append(' ');
}
}
else {
logger.info(
Expand Down
14 changes: 12 additions & 2 deletions api/src/main/java/org/xnio/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,22 @@ private Options() {}
public static final Option<Integer> WATCHER_POLL_INTERVAL = Option.simple(Options.class, "WATCHER_POLL_INTERVAL", Integer.class);

/**
* thread affinity default false
* thread affinity io default false
*/
public static final Option<Boolean> THREAD_AFFINITY = Option.simple(Options.class, "THREAD_AFFINITY", Boolean.class);
public static final Option<Boolean> THREAD_AFFINITY_IO = Option.simple(Options.class, "THREAD_AFFINITY_IO", Boolean.class);

/**
* thread affinity worker default false
*/
public static final Option<Boolean> THREAD_AFFINITY_WORKER = Option.simple(Options.class, "THREAD_AFFINITY_WORKER", Boolean.class);

/**
* size of one2one lock free queue
*/
public static final Option<Integer> QUEUE_SIZE = Option.simple(Options.class, "QUEUE_SIZE", Integer.class);

/**
* aero thread start id
*/
public static final Option<Integer> AERO_THREAD_START_ID = Option.simple(Options.class, "AERO_THREAD_START_ID", Integer.class);
}
6 changes: 4 additions & 2 deletions api/src/main/java/org/xnio/XnioWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ protected XnioWorker(final Xnio xnio, final ThreadGroup threadGroup, final Optio
name = workerName;
taskQueue = new LockFreeMultiQueue<>(
optionMap.get(Options.WORKER_TASK_MAX_THREADS, 16),
optionMap.get(Options.THREAD_AFFINITY, false),
optionMap.get(Options.QUEUE_SIZE, 10000)
optionMap.get(Options.THREAD_AFFINITY_IO, false),
optionMap.get(Options.THREAD_AFFINITY_WORKER, false),
optionMap.get(Options.QUEUE_SIZE, 10000),
optionMap.get(Options.AERO_THREAD_START_ID, 8)
);
this.coreSize = optionMap.get(Options.WORKER_TASK_CORE_THREADS, 4);
final boolean markThreadAsDaemon = optionMap.get(Options.THREAD_DAEMON, false);
Expand Down