From 6fe7d0e86c20f5553d353a4ded852c460defd282 Mon Sep 17 00:00:00 2001 From: Damien Genet Date: Tue, 2 Oct 2018 14:11:21 -0400 Subject: [PATCH 1/3] Hyperthreading with one pointer to exchange task Signed-off-by: Damien Genet --- CMakeLists.txt | 4 + parsec/CMakeLists.txt | 5 + parsec/include/parsec/execution_stream.h | 32 +++++ parsec/interfaces/ptg/ptg-compiler/jdf2c.c | 9 +- parsec/parsec.c | 95 +++++++++++++-- parsec/parsec_internal.h | 1 + parsec/scheduling.c | 135 ++++++++++++++++++++- parsec/scheduling.h | 1 + parsec/vpmap.c | 8 +- 9 files changed, 277 insertions(+), 13 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b3eb7b11b..ac2ac0820 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,6 +90,10 @@ mark_as_advanced(PARSEC_SCHED_DEPS_MASK) option(PARSEC_SCHED_DEPS_MASK "Use a complete bitmask to track the dependencies, instead of a counter -- increase the debugging features, but limits to a maximum of 30 input dependencies" ON) +mark_as_advanced(PARSEC_HAVE_HYPERTHREAD_SCHEDULER) +option(PARSEC_HAVE_HYPERTHREAD_SCHEDULER + "Double the number of threads, half will compute, the other half is in charge of scheduling, lookups, and all the boring stuff" ON) + ### Distributed engine parameters mark_as_advanced(PARSEC_DIST_THREAD PARSEC_DIST_PRIORITIES) option(PARSEC_DIST_WITH_MPI diff --git a/parsec/CMakeLists.txt b/parsec/CMakeLists.txt index aba1a33aa..4f0f6e84e 100644 --- a/parsec/CMakeLists.txt +++ b/parsec/CMakeLists.txt @@ -103,6 +103,11 @@ if( PARSEC_PROF_GRAPHER ) endif( PARSEC_PROF_GRAPHER ) add_subdirectory(data_dist) + +if( PARSEC_HAVE_HYPERTHREAD_SCHEDULER ) + add_definitions(-DPARSEC_HYPERTHREAD_SCHEDULER -DPARSEC_HYPERTHREAD_ROUND_ROBIN) +endif( PARSEC_HAVE_HYPERTHREAD_SCHEDULER ) + # # Setup targets # diff --git a/parsec/include/parsec/execution_stream.h b/parsec/include/parsec/execution_stream.h index a6967a15f..cacc42e60 100644 --- a/parsec/include/parsec/execution_stream.h +++ b/parsec/include/parsec/execution_stream.h @@ -17,6 +17,7 @@ #include "parsec/mempool.h" #include "parsec/profiling.h" #include "parsec/class/barrier.h" +#include "parsec/class/list.h" #ifdef PINS_ENABLE #include "parsec/mca/pins/pins.h" @@ -29,6 +30,15 @@ BEGIN_C_DECLS +/** + * @brief A container for interaction between scheduler and worker thread + */ +typedef struct parsec_shared_information_s parsec_shared_information_t; +/** + * @brief A container for interaction between scheduler and worker thread + */ +typedef struct parsec_loctask_s parsec_loctask_t; + /** * Computational Thread-specific structure */ @@ -65,6 +75,9 @@ struct parsec_execution_stream_s { * we use these mempools */ parsec_thread_mempool_t *dependencies_mempool; /**< If using hashtables to store dependencies * those are allocated using this mempool */ +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_shared_information_t *shared; +#endif }; /** @@ -157,6 +170,25 @@ struct parsec_context_s { #define PARSEC_THREAD_IS_MASTER(eu) ( ((eu)->th_id == 0) && ((eu)->virtual_process->vp_id == 0) ) +struct parsec_shared_information_s { + parsec_barrier_t *barrier; + /* the address pointer needs to be volatile, not the content pointed to */ + parsec_loctask_t *volatile input; + parsec_loctask_t *volatile output; + parsec_loctask_t *freelist; /* simple lifo no lock */ + int keepgoing; + int submitted; +}; + +struct parsec_loctask_s { + parsec_list_item_t super; + int rc; + int distance; + parsec_task_t *task; + parsec_execution_stream_t *es; + parsec_loctask_t *next; +}; + END_C_DECLS #endif /* PARSEC_EXECUTION_UNIT_H_HAS_BEEN_INCLUDED */ diff --git a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c index ca7d99de1..943f16c43 100644 --- a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c +++ b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c @@ -5316,6 +5316,13 @@ static void jdf_generate_code_hook(const jdf_t *jdf, } coutput("%s\n", body->external_code); + + if( profile_on ) { + coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n" + " this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n" + " this_task);\n"); + } + if( !JDF_COMPILER_GLOBAL_ARGS.noline ) { coutput("#line %d \"%s\"\n", cfile_lineno+1, jdf_cfilename); } @@ -5376,7 +5383,7 @@ jdf_generate_code_complete_hook(const jdf_t *jdf, } } - if( profile_on ) { + if( 0 && profile_on ) { coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n" " this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n" " (parsec_task_t*)this_task);\n"); diff --git a/parsec/parsec.c b/parsec/parsec.c index 57b08db89..276849a8f 100644 --- a/parsec/parsec.c +++ b/parsec/parsec.c @@ -28,6 +28,7 @@ #include "parsec/utils/output.h" #include "parsec/data_internal.h" #include "parsec/class/list.h" +#include "parsec/class/fifo.h" #include "parsec/scheduling.h" #include "parsec/class/barrier.h" #include "parsec/remote_dep.h" @@ -167,12 +168,34 @@ typedef struct __parsec_temporary_thread_initialization_t { int bindto_ht; parsec_barrier_t* barrier; /*< the barrier used to synchronize for the * local VP data construction. */ +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_execution_stream_t* es; +#endif } __parsec_temporary_thread_initialization_t; static int parsec_parse_binding_parameter(const char* option, parsec_context_t* context, - __parsec_temporary_thread_initialization_t* startup); + __parsec_temporary_thread_initialization_t* startup); static int parsec_parse_comm_binding_parameter(const char* option, parsec_context_t* context); +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static void* __parsec_worker_init( __parsec_temporary_thread_initialization_t* startup ) +{ + parsec_execution_stream_t* es = startup->es; + int bindto = startup->bindto; + int bindto_ht = (-1 == startup->bindto_ht) ? 1 : startup->bindto_ht+1; + + parsec_bindthread(bindto, bindto_ht); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind worker thread %i.%i on core %i [HT %i]", + es->virtual_process->vp_id, es->th_id, + bindto, bindto_ht); + + parsec_barrier_wait(es->shared->barrier); + void* ret = (void*)(long)__parsec_work_wait(es); + + return ret; +} +#endif + static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* startup ) { parsec_execution_stream_t* es; @@ -181,10 +204,11 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s /* don't use PARSEC_THREAD_IS_MASTER, it is too early and we cannot yet allocate the es struct */ if( (0 != startup->virtual_process->vp_id) || (0 != startup->th_id) || parsec_runtime_bind_main_thread ) { /* Bind to the specified CORE */ + if( -1 == startup->bindto_ht ) startup->bindto_ht = 0; parsec_bindthread(startup->bindto, startup->bindto_ht); PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind thread %i.%i on core %i [HT %i]", - startup->virtual_process->vp_id, startup->th_id, - startup->bindto, startup->bindto_ht); + startup->virtual_process->vp_id, startup->th_id, + startup->bindto, startup->bindto_ht); } else { PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Don't bind the main thread %i.%i", startup->virtual_process->vp_id, startup->th_id); @@ -233,6 +257,50 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s offsetof(parsec_hashable_dependency_t, mempool_owner), vp->nb_cores); } + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + int ht_activated = 0; + pthread_t worker; + es->shared = NULL; + + if( 1 < parsec_hwloc_allow_ht(2) ) { + ht_activated = 1; + /* es->shared = (parsec_shared_information_t*)malloc(sizeof(parsec_shared_information_t)); */ + posix_memalign((void**)&es->shared, 64, sizeof(parsec_shared_information_t)); + es->shared->barrier = (parsec_barrier_t*)malloc(sizeof(parsec_barrier_t)); + parsec_barrier_init(es->shared->barrier, NULL, 2); + es->shared->input = NULL; + es->shared->output = NULL; + es->shared->keepgoing = 1; + es->shared->submitted = 0; + es->shared->freelist = NULL; + + int i; + for (i = 0; i < 5; ++i) { /* we cannot have more than 5 tasks in the pipeline */ + parsec_loctask_t *t = (parsec_loctask_t*)calloc(1, sizeof(parsec_loctask_t)); + t->next = es->shared->freelist; + es->shared->freelist = t; + } + + pthread_attr_t thread_attr; + pthread_attr_init(&thread_attr); + pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM); + + startup->es = es; + /* Summon your intern */ + pthread_create( &worker, + &thread_attr, + (void* (*)(void*))__parsec_worker_init, + (void*)startup); + + parsec_barrier_wait(es->shared->barrier); + } + else { + parsec_fatal("PaRSEC has been compiled with hyperthreading capabilities.\nThis machine does not support hyperthread, recompile without the PARSEC_HAVE_HYPERTHREAD_SCHEDULER.\n"); + } + +#endif + /* Synchronize with the other threads */ parsec_barrier_wait(startup->barrier); @@ -272,9 +340,19 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s if( PARSEC_THREAD_IS_MASTER(es) ) { return NULL; } - void *ret = (void*)(long)__parsec_context_wait(es); PARSEC_PAPI_SDE_THREAD_FINI(); + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + if( ht_activated ) { + /* Clean worker thread body */ + es->shared->keepgoing = 0; + void *work_ret = NULL; + pthread_join(worker, &work_ret); + + free(es->shared); + } +#endif return ret; } @@ -296,11 +374,12 @@ static void parsec_vp_init( parsec_vp_t *vp, startup[t].th_id = t; startup[t].virtual_process = vp; startup[t].bindto = -1; - startup[t].bindto_ht = -1; + startup[t].bindto_ht = 0; startup[t].barrier = barrier; pi = vpmap_get_nb_cores_affinity(vp->vp_id, t); - if( 1 == pi ) + if( 1 == pi ) { vpmap_get_core_affinity(vp->vp_id, t, &startup[t].bindto, &startup[t].bindto_ht); + } else if( 1 < pi ) parsec_warning("multiple core to bind on... for now, do nothing"); //TODO: what does that mean? } @@ -662,14 +741,14 @@ parsec_context_t* parsec_init( int nb_cores, int* pargc, char** pargv[] ) 0, NULL, &queue_remove_begin, &queue_remove_end); # endif /* PARSEC_PROF_TRACE_SCHEDULING_EVENTS */ -#if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) +# if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) parsec_profiling_add_dictionary_keyword( "ARENA_MEMORY", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &arena_memory_alloc_key, &arena_memory_free_key); parsec_profiling_add_dictionary_keyword( "ARENA_ACTIVE_SET", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &arena_memory_used_key, &arena_memory_unused_key); -#endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */ +# endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */ parsec_profiling_add_dictionary_keyword( "TASK_MEMORY", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &task_memory_alloc_key, &task_memory_free_key); diff --git a/parsec/parsec_internal.h b/parsec/parsec_internal.h index 4cf690fa7..a4f483d7b 100644 --- a/parsec/parsec_internal.h +++ b/parsec/parsec_internal.h @@ -15,6 +15,7 @@ #include "parsec/data.h" #include "parsec/class/list_item.h" #include "parsec/class/parsec_hash_table.h" +#include "parsec/class/fifo.h" #include "parsec/parsec_description_structures.h" #include "parsec/profiling.h" #include "parsec/mempool.h" diff --git a/parsec/scheduling.c b/parsec/scheduling.c index b509f717f..f9a8bb442 100644 --- a/parsec/scheduling.c +++ b/parsec/scheduling.c @@ -424,6 +424,116 @@ int __parsec_complete_execution( parsec_execution_stream_t *es, return rc; } +static int __parsec_make_task_progress(parsec_execution_stream_t* es, + parsec_task_t* task, + int rc, + int distance) +{ + switch(rc) { + case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ + task->status = PARSEC_TASK_STATUS_COMPLETE; + __parsec_complete_execution( es, task ); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completed %s, task: %p", + es->th_id, task->task_class->name, task); + return 1; + break; + case PARSEC_HOOK_RETURN_AGAIN: /* Reschedule later */ + task->status = PARSEC_TASK_STATUS_HOOK; + if(0 == task->priority) { + SET_LOWEST_PRIORITY(task, parsec_execution_context_priority_comparator); + } else + task->priority /= 10; /* demote the task */ + PARSEC_LIST_ITEM_SINGLETON(task); + __parsec_schedule(es, task, distance + 1); + task = NULL; + break; + case PARSEC_HOOK_RETURN_ASYNC: /* The task is outside our reach we should not + * even try to change it's state, the completion + * will be triggered asynchronously. */ + break; + case PARSEC_HOOK_RETURN_NEXT: /* Try next variant [if any] */ + case PARSEC_HOOK_RETURN_DISABLE: /* Disable the device, something went wrong */ + case PARSEC_HOOK_RETURN_ERROR: /* Some other major error happened */ + assert( 0 ); /* Internal error: invalid return value */ + } + return 0; +} + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_offload_task( parsec_execution_stream_t* es, + parsec_task_t* task, + int distance) +{ + parsec_shared_information_t* shared = es->shared; + struct timespec r; + r.tv_sec = 0; + r.tv_nsec = 10; + /* Task goes away, consider async and pull events later to make it progress */ + parsec_loctask_t *t = shared->freelist; + shared->freelist = t->next; + t->next = NULL; + t->rc = rc; + t->task = task; + t->es = es; + t->distance = distance; + /* Is there a task in the pipeline? */ + while (NULL != shared->input) { + /* Yes, there is one, let's try to flush the output pipeline in case we are bottlenecking the worker */ + __parsec_drain_worker_events(es); + /* 'nough work done, let's sleep */ + nanosleep(&r, NULL); + __asm__ __volatile__(""); + } + /* Worker made room into input pipeline, let's push the next task, t */ + shared->input = t; +} +#endif + +static int __parsec_drain_worker_events(parsec_execution_stream_t* es) +{ + /* Master side */ + parsec_shared_information_t* shared = es->shared; + /* if hyperthread scheduler, here, we pull events from the shared dedicated fifo to complete tasks */ + if (shared->output) { + parsec_loctask_t *event = shared->output; + if(event) { + shared->output = NULL; /* atomic cas event for null */ + __parsec_make_task_progress(es, event->task, event->rc, event->distance); + /* parsec_list_nolock_push_front(shared->freelist, (parsec_list_item_t*)event); */ + event->next = shared->freelist; + shared->freelist = event; + } + } + return PARSEC_SUCCESS; +} + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +int __parsec_work_wait( parsec_execution_stream_t* es ) +{ + /* Worker side */ + parsec_shared_information_t* shared = es->shared; + struct timespec rqtp; + rqtp.tv_sec = 0; + rqtp.tv_nsec = 100; + + do { + parsec_loctask_t *task = shared->input; + if (task != NULL) { + shared->input = NULL; + task->rc = __parsec_execute( task->es, task->task ); + while (NULL != shared->output) { + nanosleep(&rqtp, NULL); + __asm__ __volatile__(""); + } + shared->output = task; + continue; + } + nanosleep(&rqtp, NULL); + } while(shared->keepgoing); + return PARSEC_SUCCESS; +} +#endif + int __parsec_task_progress( parsec_execution_stream_t* es, parsec_task_t* task, int distance) @@ -439,9 +549,23 @@ int __parsec_task_progress( parsec_execution_stream_t* es, } switch(rc) { case PARSEC_HOOK_RETURN_DONE: { - if(task->status <= PARSEC_TASK_STATUS_HOOK) { + if (task->status == PARSEC_TASK_STATUS_COMPLETE ) { + /* In some cases (Startup tasks), they can return from prepare input as completed */ + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completing %s: %p", + es->th_id, task->task_class->name, task); + __parsec_complete_execution( es, task ); + break; + } + else if(task->status <= PARSEC_TASK_STATUS_HOOK) { +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + __parsec_offload_task(es, task, rc); +#else rc = __parsec_execute( es, task ); +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + __parsec_drain_worker_events(es); +#else /* We're good to go ... */ switch(rc) { case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ @@ -467,6 +591,7 @@ int __parsec_task_progress( parsec_execution_stream_t* es, assert( 0 ); /* Internal error: invalid return value */ } break; +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } case PARSEC_HOOK_RETURN_ASYNC: /* The task is outside our reach we should not * even try to change it's state, the completion @@ -491,6 +616,10 @@ int __parsec_task_progress( parsec_execution_stream_t* es, int __parsec_context_wait( parsec_execution_stream_t* es ) { +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_shared_information_t* shared = es->shared; +#endif + uint64_t misses_in_a_row; parsec_context_t* parsec_context = es->virtual_process->parsec_context; int32_t my_barrier_counter = parsec_context->__parsec_internal_finalization_counter; @@ -565,9 +694,11 @@ int __parsec_context_wait( parsec_execution_stream_t* es ) rc = __parsec_task_progress(es, task, distance); (void)rc; /* for now ignore the return value */ - nbiterations++; } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + __parsec_drain_worker_events(es); +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } parsec_rusage_per_es(es, true); diff --git a/parsec/scheduling.h b/parsec/scheduling.h index a134f7e92..853c6b10a 100644 --- a/parsec/scheduling.h +++ b/parsec/scheduling.h @@ -76,6 +76,7 @@ int __parsec_reschedule(parsec_execution_stream_t* es, */ int __parsec_context_wait(parsec_execution_stream_t* es); +int __parsec_work_wait(parsec_execution_stream_t* es); /** * Execute the body of the task associated to the context. */ diff --git a/parsec/vpmap.c b/parsec/vpmap.c index 9a7730ca5..ea3bd4435 100644 --- a/parsec/vpmap.c +++ b/parsec/vpmap.c @@ -102,7 +102,7 @@ static void vpmap_get_core_affinity_parameters(int vp, int thread, int *cores, i #endif /* PARSEC_HAVE_HWLOC */ *cores = (vp * nbcores * nbht) + thread; if (nbht > 1 ) { - *ht = (*cores) % nbht; + *ht = thread / nbcores; } else { *ht = -1; } @@ -378,7 +378,11 @@ int vpmap_init_from_flat(int _nbcores) #endif /* defined(PARSEC_HAVE_HWLOC) */ nbvp = 1; - nbcores = _nbcores/nbht; +#if !defined(PARSEC_HYPERTHREAD_ROUND_ROBIN) + nbcores = _nbcores/nbht; /* Fill a core with hyperthreads before proceeding to the next one */ +#else + nbcores = _nbcores; /* Round robin distribution of hyperthreads */ +#endif nbthreadspervp = _nbcores; vpmap_nb_total_threads = nbvp * nbthreadspervp; From e6d62c385852d924407d99c87d7ce92829dacee3 Mon Sep 17 00:00:00 2001 From: Damien Genet Date: Wed, 3 Oct 2018 11:35:18 -0400 Subject: [PATCH 2/3] hyperthreading working, surprisingly Signed-off-by: Damien Genet --- parsec/scheduling.c | 63 ++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/parsec/scheduling.c b/parsec/scheduling.c index f9a8bb442..99ede1d94 100644 --- a/parsec/scheduling.c +++ b/parsec/scheduling.c @@ -424,6 +424,7 @@ int __parsec_complete_execution( parsec_execution_stream_t *es, return rc; } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) static int __parsec_make_task_progress(parsec_execution_stream_t* es, parsec_task_t* task, int rc, @@ -433,8 +434,10 @@ static int __parsec_make_task_progress(parsec_execution_stream_t* es, case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ task->status = PARSEC_TASK_STATUS_COMPLETE; __parsec_complete_execution( es, task ); - PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completed %s, task: %p", - es->th_id, task->task_class->name, task); + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completing Task %s", + es->th_id, task_string); return 1; break; case PARSEC_HOOK_RETURN_AGAIN: /* Reschedule later */ @@ -458,6 +461,27 @@ static int __parsec_make_task_progress(parsec_execution_stream_t* es, } return 0; } +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_drain_worker_events(parsec_execution_stream_t* es) +{ + /* Master side */ + parsec_shared_information_t* shared = es->shared; + /* if hyperthread scheduler, here, we pull events from the shared dedicated fifo to complete tasks */ + if (shared->output) { + parsec_loctask_t *event = shared->output; + if(event) { + shared->output = NULL; /* atomic cas event for null */ + __parsec_make_task_progress(es, event->task, event->rc, event->distance); + /* parsec_list_nolock_push_front(shared->freelist, (parsec_list_item_t*)event); */ + event->next = shared->freelist; + shared->freelist = event; + } + } + return PARSEC_SUCCESS; +} +#endif #if defined(PARSEC_HYPERTHREAD_SCHEDULER) static int __parsec_offload_task( parsec_execution_stream_t* es, @@ -472,7 +496,7 @@ static int __parsec_offload_task( parsec_execution_stream_t* es, parsec_loctask_t *t = shared->freelist; shared->freelist = t->next; t->next = NULL; - t->rc = rc; + t->rc = -1; t->task = task; t->es = es; t->distance = distance; @@ -485,28 +509,15 @@ static int __parsec_offload_task( parsec_execution_stream_t* es, __asm__ __volatile__(""); } /* Worker made room into input pipeline, let's push the next task, t */ + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Offloading Task %s", + es->th_id, task_string); shared->input = t; + return PARSEC_HOOK_RETURN_ASYNC; } #endif -static int __parsec_drain_worker_events(parsec_execution_stream_t* es) -{ - /* Master side */ - parsec_shared_information_t* shared = es->shared; - /* if hyperthread scheduler, here, we pull events from the shared dedicated fifo to complete tasks */ - if (shared->output) { - parsec_loctask_t *event = shared->output; - if(event) { - shared->output = NULL; /* atomic cas event for null */ - __parsec_make_task_progress(es, event->task, event->rc, event->distance); - /* parsec_list_nolock_push_front(shared->freelist, (parsec_list_item_t*)event); */ - event->next = shared->freelist; - shared->freelist = event; - } - } - return PARSEC_SUCCESS; -} - #if defined(PARSEC_HYPERTHREAD_SCHEDULER) int __parsec_work_wait( parsec_execution_stream_t* es ) { @@ -520,6 +531,10 @@ int __parsec_work_wait( parsec_execution_stream_t* es ) parsec_loctask_t *task = shared->input; if (task != NULL) { shared->input = NULL; + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task->task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[W%d] Executing Task %s", + es->th_id, task_string); task->rc = __parsec_execute( task->es, task->task ); while (NULL != shared->output) { nanosleep(&rqtp, NULL); @@ -558,7 +573,7 @@ int __parsec_task_progress( parsec_execution_stream_t* es, } else if(task->status <= PARSEC_TASK_STATUS_HOOK) { #if defined(PARSEC_HYPERTHREAD_SCHEDULER) - __parsec_offload_task(es, task, rc); + rc = __parsec_offload_task(es, task, distance); #else rc = __parsec_execute( es, task ); #endif /* PARSEC_HYPERTHREAD_SCHEDULER */ @@ -616,10 +631,6 @@ int __parsec_task_progress( parsec_execution_stream_t* es, int __parsec_context_wait( parsec_execution_stream_t* es ) { -#if defined(PARSEC_HYPERTHREAD_SCHEDULER) - parsec_shared_information_t* shared = es->shared; -#endif - uint64_t misses_in_a_row; parsec_context_t* parsec_context = es->virtual_process->parsec_context; int32_t my_barrier_counter = parsec_context->__parsec_internal_finalization_counter; From cf49610b6610016d8953f43dd2a476bc77fe1f28 Mon Sep 17 00:00:00 2001 From: Damien Genet Date: Wed, 3 Oct 2018 21:29:50 -0400 Subject: [PATCH 3/3] Adding: variable number of hardware threads through MCA param runtime_hardware_threads 1 Master HT, N-1 Workers (per ES) Signed-off-by: Damien Genet --- parsec/include/parsec/execution_stream.h | 23 +++--- parsec/parsec.c | 90 +++++++++++++++--------- parsec/scheduling.c | 62 ++++++++-------- parsec/scheduling.h | 2 +- 4 files changed, 104 insertions(+), 73 deletions(-) diff --git a/parsec/include/parsec/execution_stream.h b/parsec/include/parsec/execution_stream.h index cacc42e60..d1885ac5b 100644 --- a/parsec/include/parsec/execution_stream.h +++ b/parsec/include/parsec/execution_stream.h @@ -33,11 +33,11 @@ BEGIN_C_DECLS /** * @brief A container for interaction between scheduler and worker thread */ -typedef struct parsec_shared_information_s parsec_shared_information_t; +typedef struct parsec_ht_team_s parsec_ht_team_t; /** - * @brief A container for interaction between scheduler and worker thread + * @brief A container for exchanging task and task progress status between Master and Worker hardware threads */ -typedef struct parsec_loctask_s parsec_loctask_t; +typedef struct parsec_offload_task_s parsec_offload_task_t; /** * Computational Thread-specific structure @@ -76,7 +76,8 @@ struct parsec_execution_stream_s { parsec_thread_mempool_t *dependencies_mempool; /**< If using hashtables to store dependencies * those are allocated using this mempool */ #if defined(PARSEC_HYPERTHREAD_SCHEDULER) - parsec_shared_information_t *shared; + parsec_ht_team_t *team; /**< Structure handling the shared information, the barrier and, + the different queues required for the team of HT */ #endif }; @@ -170,23 +171,25 @@ struct parsec_context_s { #define PARSEC_THREAD_IS_MASTER(eu) ( ((eu)->th_id == 0) && ((eu)->virtual_process->vp_id == 0) ) -struct parsec_shared_information_s { +struct parsec_ht_team_s { parsec_barrier_t *barrier; + int nb_workers; + int current_worker; /* the address pointer needs to be volatile, not the content pointed to */ - parsec_loctask_t *volatile input; - parsec_loctask_t *volatile output; - parsec_loctask_t *freelist; /* simple lifo no lock */ + parsec_offload_task_t *volatile* input; + parsec_offload_task_t *volatile* output; + parsec_offload_task_t *freelist; /* simple lifo no lock */ int keepgoing; int submitted; }; -struct parsec_loctask_s { +struct parsec_offload_task_s { parsec_list_item_t super; int rc; int distance; parsec_task_t *task; parsec_execution_stream_t *es; - parsec_loctask_t *next; + parsec_offload_task_t *next; }; END_C_DECLS diff --git a/parsec/parsec.c b/parsec/parsec.c index 276849a8f..4999321ca 100644 --- a/parsec/parsec.c +++ b/parsec/parsec.c @@ -182,15 +182,15 @@ static void* __parsec_worker_init( __parsec_temporary_thread_initialization_t* s { parsec_execution_stream_t* es = startup->es; int bindto = startup->bindto; - int bindto_ht = (-1 == startup->bindto_ht) ? 1 : startup->bindto_ht+1; + int bindto_ht = (-1 == startup->bindto_ht) ? 1 : startup->bindto_ht; parsec_bindthread(bindto, bindto_ht); PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind worker thread %i.%i on core %i [HT %i]", es->virtual_process->vp_id, es->th_id, bindto, bindto_ht); - parsec_barrier_wait(es->shared->barrier); - void* ret = (void*)(long)__parsec_work_wait(es); + parsec_barrier_wait(es->team->barrier); + void* ret = (void*)(long)__parsec_work_wait(es, bindto_ht-1); return ret; } @@ -259,27 +259,37 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s } #if defined(PARSEC_HYPERTHREAD_SCHEDULER) - int ht_activated = 0; - pthread_t worker; - es->shared = NULL; + int ht_activated = 0, parsec_runtime_hardware_threads = 2, i; + pthread_t* workers = NULL; + es->team = NULL; + parsec_mca_param_reg_int_name("runtime", "hardware_threads", "Total number of hardware threads to use; 1 Manager, N-1 Workers", + false, false, parsec_runtime_hardware_threads, &parsec_runtime_hardware_threads); - if( 1 < parsec_hwloc_allow_ht(2) ) { + parsec_debug_verbose(3, parsec_debug_output, "Requesting %d hardware thread(s) for ES %d", parsec_runtime_hardware_threads, es->th_id); + int available_ht = parsec_hwloc_allow_ht(parsec_runtime_hardware_threads); + + if( 1 < available_ht) { + parsec_debug_verbose(2, parsec_debug_output, "Activating %d (requested %d) hardware thread(s) for ES %d", available_ht, parsec_runtime_hardware_threads, es->th_id); ht_activated = 1; - /* es->shared = (parsec_shared_information_t*)malloc(sizeof(parsec_shared_information_t)); */ - posix_memalign((void**)&es->shared, 64, sizeof(parsec_shared_information_t)); - es->shared->barrier = (parsec_barrier_t*)malloc(sizeof(parsec_barrier_t)); - parsec_barrier_init(es->shared->barrier, NULL, 2); - es->shared->input = NULL; - es->shared->output = NULL; - es->shared->keepgoing = 1; - es->shared->submitted = 0; - es->shared->freelist = NULL; - - int i; - for (i = 0; i < 5; ++i) { /* we cannot have more than 5 tasks in the pipeline */ - parsec_loctask_t *t = (parsec_loctask_t*)calloc(1, sizeof(parsec_loctask_t)); - t->next = es->shared->freelist; - es->shared->freelist = t; + posix_memalign((void**)&es->team, 64, sizeof(parsec_ht_team_t)); + es->team->nb_workers = available_ht -1; + posix_memalign((void**)&es->team->input, 64, es->team->nb_workers * sizeof(parsec_offload_task_t)); + posix_memalign((void**)&es->team->output, 64, es->team->nb_workers * sizeof(parsec_offload_task_t)); + for (i = 0; i < es->team->nb_workers; ++i) { + es->team->input[i] = NULL; + es->team->output[i] = NULL; + } + es->team->keepgoing = 1; + es->team->submitted = 0; + es->team->freelist = NULL; + es->team->current_worker = 0; + es->team->barrier = (parsec_barrier_t*)malloc(sizeof(parsec_barrier_t)); + parsec_barrier_init(es->team->barrier, NULL, es->team->nb_workers+1); + + for (i = 0; i < 5 * es->team->nb_workers; ++i) { /* we cannot have more than 5 tasks in the pipeline */ + parsec_offload_task_t *t = (parsec_offload_task_t*)calloc(1, sizeof(parsec_offload_task_t)); + t->next = es->team->freelist; + es->team->freelist = t; } pthread_attr_t thread_attr; @@ -287,13 +297,25 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM); startup->es = es; - /* Summon your intern */ - pthread_create( &worker, - &thread_attr, - (void* (*)(void*))__parsec_worker_init, - (void*)startup); + workers = (pthread_t*)malloc(es->team->nb_workers * sizeof(pthread_t)); + + /* Let's duplicate the starup struct for each team worker*/ + __parsec_temporary_thread_initialization_t *ht_startup = + (__parsec_temporary_thread_initialization_t*)calloc(es->team->nb_workers, sizeof(__parsec_temporary_thread_initialization_t)); + for (i = 0; i < es->team->nb_workers; ++i) { /* we cannot have more than 5 tasks in the pipeline */ + ht_startup[i].bindto = startup->bindto; + ht_startup[i].bindto_ht = i+1; + ht_startup[i].es = es; + /* Summon your team of interns */ + pthread_create( workers+i, + &thread_attr, + (void* (*)(void*))__parsec_worker_init, + (void*)ht_startup+i); + } - parsec_barrier_wait(es->shared->barrier); + parsec_barrier_wait(es->team->barrier); + + free(ht_startup); } else { parsec_fatal("PaRSEC has been compiled with hyperthreading capabilities.\nThis machine does not support hyperthread, recompile without the PARSEC_HAVE_HYPERTHREAD_SCHEDULER.\n"); @@ -346,11 +368,13 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s #if defined(PARSEC_HYPERTHREAD_SCHEDULER) if( ht_activated ) { /* Clean worker thread body */ - es->shared->keepgoing = 0; - void *work_ret = NULL; - pthread_join(worker, &work_ret); - - free(es->shared); + es->team->keepgoing = 0; + for (i = 0; i < es->team->nb_workers; ++i) { + void *work_ret = NULL; + pthread_join(workers[i], &work_ret); + } + free(workers); + free(es->team); } #endif return ret; diff --git a/parsec/scheduling.c b/parsec/scheduling.c index 99ede1d94..4cca9a8b4 100644 --- a/parsec/scheduling.c +++ b/parsec/scheduling.c @@ -464,19 +464,18 @@ static int __parsec_make_task_progress(parsec_execution_stream_t* es, #endif #if defined(PARSEC_HYPERTHREAD_SCHEDULER) -static int __parsec_drain_worker_events(parsec_execution_stream_t* es) +static int __parsec_drain_worker_events(parsec_execution_stream_t* es, int w) { /* Master side */ - parsec_shared_information_t* shared = es->shared; + parsec_ht_team_t* team = es->team; /* if hyperthread scheduler, here, we pull events from the shared dedicated fifo to complete tasks */ - if (shared->output) { - parsec_loctask_t *event = shared->output; + if (team->output[w]) { + parsec_offload_task_t *event = team->output[w]; if(event) { - shared->output = NULL; /* atomic cas event for null */ + team->output[w] = NULL; /* atomic cas event for null */ __parsec_make_task_progress(es, event->task, event->rc, event->distance); - /* parsec_list_nolock_push_front(shared->freelist, (parsec_list_item_t*)event); */ - event->next = shared->freelist; - shared->freelist = event; + event->next = team->freelist; + team->freelist = event; } } return PARSEC_SUCCESS; @@ -486,65 +485,66 @@ static int __parsec_drain_worker_events(parsec_execution_stream_t* es) #if defined(PARSEC_HYPERTHREAD_SCHEDULER) static int __parsec_offload_task( parsec_execution_stream_t* es, parsec_task_t* task, - int distance) + int distance) { - parsec_shared_information_t* shared = es->shared; + parsec_ht_team_t* team = es->team; struct timespec r; r.tv_sec = 0; r.tv_nsec = 10; /* Task goes away, consider async and pull events later to make it progress */ - parsec_loctask_t *t = shared->freelist; - shared->freelist = t->next; + parsec_offload_task_t *t = team->freelist; + team->freelist = t->next; t->next = NULL; t->rc = -1; t->task = task; t->es = es; t->distance = distance; /* Is there a task in the pipeline? */ - while (NULL != shared->input) { + while (NULL != team->input[team->current_worker]) { /* Yes, there is one, let's try to flush the output pipeline in case we are bottlenecking the worker */ - __parsec_drain_worker_events(es); + __parsec_drain_worker_events(es, team->current_worker); /* 'nough work done, let's sleep */ nanosleep(&r, NULL); __asm__ __volatile__(""); + team->current_worker = (team->current_worker+1)%team->nb_workers; } /* Worker made room into input pipeline, let's push the next task, t */ char task_string[MAX_TASK_STRLEN]; (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); - PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Offloading Task %s", - es->th_id, task_string); - shared->input = t; + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "[M%d] Offloading Task %s to Hardware Thread %d", + es->th_id, task_string, team->current_worker); + team->input[team->current_worker] = t; return PARSEC_HOOK_RETURN_ASYNC; } #endif #if defined(PARSEC_HYPERTHREAD_SCHEDULER) -int __parsec_work_wait( parsec_execution_stream_t* es ) +int __parsec_work_wait( parsec_execution_stream_t* es, int id ) { + char task_string[MAX_TASK_STRLEN]; /* Worker side */ - parsec_shared_information_t* shared = es->shared; + parsec_ht_team_t* team = es->team; struct timespec rqtp; rqtp.tv_sec = 0; rqtp.tv_nsec = 100; do { - parsec_loctask_t *task = shared->input; + parsec_offload_task_t *task = team->input[id]; if (task != NULL) { - shared->input = NULL; - char task_string[MAX_TASK_STRLEN]; + team->input[id] = NULL; (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task->task); - PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[W%d] Executing Task %s", - es->th_id, task_string); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "[W%i/%d] Executing Task %s", + es->th_id, id, task_string); task->rc = __parsec_execute( task->es, task->task ); - while (NULL != shared->output) { + while (NULL != team->output[id]) { nanosleep(&rqtp, NULL); __asm__ __volatile__(""); } - shared->output = task; + team->output[id] = task; continue; } nanosleep(&rqtp, NULL); - } while(shared->keepgoing); + } while(team->keepgoing); return PARSEC_SUCCESS; } #endif @@ -579,7 +579,9 @@ int __parsec_task_progress( parsec_execution_stream_t* es, #endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } #if defined(PARSEC_HYPERTHREAD_SCHEDULER) - __parsec_drain_worker_events(es); + int v; + for (v = 0; v < es->team->nb_workers; ++v) + __parsec_drain_worker_events(es, es->team->current_worker); #else /* We're good to go ... */ switch(rc) { @@ -708,7 +710,9 @@ int __parsec_context_wait( parsec_execution_stream_t* es ) nbiterations++; } #if defined(PARSEC_HYPERTHREAD_SCHEDULER) - __parsec_drain_worker_events(es); + int v; + for (v = 0; v < es->team->nb_workers; ++v) + __parsec_drain_worker_events(es, v); #endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } diff --git a/parsec/scheduling.h b/parsec/scheduling.h index 853c6b10a..5d3794de6 100644 --- a/parsec/scheduling.h +++ b/parsec/scheduling.h @@ -76,7 +76,7 @@ int __parsec_reschedule(parsec_execution_stream_t* es, */ int __parsec_context_wait(parsec_execution_stream_t* es); -int __parsec_work_wait(parsec_execution_stream_t* es); +int __parsec_work_wait(parsec_execution_stream_t* es, int ht); /** * Execute the body of the task associated to the context. */