diff --git a/CMakeLists.txt b/CMakeLists.txt index b3eb7b11b..ac2ac0820 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,6 +90,10 @@ mark_as_advanced(PARSEC_SCHED_DEPS_MASK) option(PARSEC_SCHED_DEPS_MASK "Use a complete bitmask to track the dependencies, instead of a counter -- increase the debugging features, but limits to a maximum of 30 input dependencies" ON) +mark_as_advanced(PARSEC_HAVE_HYPERTHREAD_SCHEDULER) +option(PARSEC_HAVE_HYPERTHREAD_SCHEDULER + "Double the number of threads, half will compute, the other half is in charge of scheduling, lookups, and all the boring stuff" ON) + ### Distributed engine parameters mark_as_advanced(PARSEC_DIST_THREAD PARSEC_DIST_PRIORITIES) option(PARSEC_DIST_WITH_MPI diff --git a/parsec/CMakeLists.txt b/parsec/CMakeLists.txt index aba1a33aa..4f0f6e84e 100644 --- a/parsec/CMakeLists.txt +++ b/parsec/CMakeLists.txt @@ -103,6 +103,11 @@ if( PARSEC_PROF_GRAPHER ) endif( PARSEC_PROF_GRAPHER ) add_subdirectory(data_dist) + +if( PARSEC_HAVE_HYPERTHREAD_SCHEDULER ) + add_definitions(-DPARSEC_HYPERTHREAD_SCHEDULER -DPARSEC_HYPERTHREAD_ROUND_ROBIN) +endif( PARSEC_HAVE_HYPERTHREAD_SCHEDULER ) + # # Setup targets # diff --git a/parsec/include/parsec/execution_stream.h b/parsec/include/parsec/execution_stream.h index a6967a15f..d1885ac5b 100644 --- a/parsec/include/parsec/execution_stream.h +++ b/parsec/include/parsec/execution_stream.h @@ -17,6 +17,7 @@ #include "parsec/mempool.h" #include "parsec/profiling.h" #include "parsec/class/barrier.h" +#include "parsec/class/list.h" #ifdef PINS_ENABLE #include "parsec/mca/pins/pins.h" @@ -29,6 +30,15 @@ BEGIN_C_DECLS +/** + * @brief A container for interaction between scheduler and worker thread + */ +typedef struct parsec_ht_team_s parsec_ht_team_t; +/** + * @brief A container for exchanging task and task progress status between Master and Worker hardware threads + */ +typedef struct parsec_offload_task_s parsec_offload_task_t; + /** * Computational Thread-specific structure */ @@ -65,6 +75,10 @@ struct parsec_execution_stream_s { * we use these mempools */ parsec_thread_mempool_t *dependencies_mempool; /**< If using hashtables to store dependencies * those are allocated using this mempool */ +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_ht_team_t *team; /**< Structure handling the shared information, the barrier and, + the different queues required for the team of HT */ +#endif }; /** @@ -157,6 +171,27 @@ struct parsec_context_s { #define PARSEC_THREAD_IS_MASTER(eu) ( ((eu)->th_id == 0) && ((eu)->virtual_process->vp_id == 0) ) +struct parsec_ht_team_s { + parsec_barrier_t *barrier; + int nb_workers; + int current_worker; + /* the address pointer needs to be volatile, not the content pointed to */ + parsec_offload_task_t *volatile* input; + parsec_offload_task_t *volatile* output; + parsec_offload_task_t *freelist; /* simple lifo no lock */ + int keepgoing; + int submitted; +}; + +struct parsec_offload_task_s { + parsec_list_item_t super; + int rc; + int distance; + parsec_task_t *task; + parsec_execution_stream_t *es; + parsec_offload_task_t *next; +}; + END_C_DECLS #endif /* PARSEC_EXECUTION_UNIT_H_HAS_BEEN_INCLUDED */ diff --git a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c index ca7d99de1..943f16c43 100644 --- a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c +++ b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c @@ -5316,6 +5316,13 @@ static void jdf_generate_code_hook(const jdf_t *jdf, } coutput("%s\n", body->external_code); + + if( profile_on ) { + coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n" + " this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n" + " this_task);\n"); + } + if( !JDF_COMPILER_GLOBAL_ARGS.noline ) { coutput("#line %d \"%s\"\n", cfile_lineno+1, jdf_cfilename); } @@ -5376,7 +5383,7 @@ jdf_generate_code_complete_hook(const jdf_t *jdf, } } - if( profile_on ) { + if( 0 && profile_on ) { coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n" " this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n" " (parsec_task_t*)this_task);\n"); diff --git a/parsec/parsec.c b/parsec/parsec.c index 57b08db89..4999321ca 100644 --- a/parsec/parsec.c +++ b/parsec/parsec.c @@ -28,6 +28,7 @@ #include "parsec/utils/output.h" #include "parsec/data_internal.h" #include "parsec/class/list.h" +#include "parsec/class/fifo.h" #include "parsec/scheduling.h" #include "parsec/class/barrier.h" #include "parsec/remote_dep.h" @@ -167,12 +168,34 @@ typedef struct __parsec_temporary_thread_initialization_t { int bindto_ht; parsec_barrier_t* barrier; /*< the barrier used to synchronize for the * local VP data construction. */ +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_execution_stream_t* es; +#endif } __parsec_temporary_thread_initialization_t; static int parsec_parse_binding_parameter(const char* option, parsec_context_t* context, - __parsec_temporary_thread_initialization_t* startup); + __parsec_temporary_thread_initialization_t* startup); static int parsec_parse_comm_binding_parameter(const char* option, parsec_context_t* context); +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static void* __parsec_worker_init( __parsec_temporary_thread_initialization_t* startup ) +{ + parsec_execution_stream_t* es = startup->es; + int bindto = startup->bindto; + int bindto_ht = (-1 == startup->bindto_ht) ? 1 : startup->bindto_ht; + + parsec_bindthread(bindto, bindto_ht); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind worker thread %i.%i on core %i [HT %i]", + es->virtual_process->vp_id, es->th_id, + bindto, bindto_ht); + + parsec_barrier_wait(es->team->barrier); + void* ret = (void*)(long)__parsec_work_wait(es, bindto_ht-1); + + return ret; +} +#endif + static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* startup ) { parsec_execution_stream_t* es; @@ -181,10 +204,11 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s /* don't use PARSEC_THREAD_IS_MASTER, it is too early and we cannot yet allocate the es struct */ if( (0 != startup->virtual_process->vp_id) || (0 != startup->th_id) || parsec_runtime_bind_main_thread ) { /* Bind to the specified CORE */ + if( -1 == startup->bindto_ht ) startup->bindto_ht = 0; parsec_bindthread(startup->bindto, startup->bindto_ht); PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind thread %i.%i on core %i [HT %i]", - startup->virtual_process->vp_id, startup->th_id, - startup->bindto, startup->bindto_ht); + startup->virtual_process->vp_id, startup->th_id, + startup->bindto, startup->bindto_ht); } else { PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Don't bind the main thread %i.%i", startup->virtual_process->vp_id, startup->th_id); @@ -233,6 +257,72 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s offsetof(parsec_hashable_dependency_t, mempool_owner), vp->nb_cores); } + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + int ht_activated = 0, parsec_runtime_hardware_threads = 2, i; + pthread_t* workers = NULL; + es->team = NULL; + parsec_mca_param_reg_int_name("runtime", "hardware_threads", "Total number of hardware threads to use; 1 Manager, N-1 Workers", + false, false, parsec_runtime_hardware_threads, &parsec_runtime_hardware_threads); + + parsec_debug_verbose(3, parsec_debug_output, "Requesting %d hardware thread(s) for ES %d", parsec_runtime_hardware_threads, es->th_id); + int available_ht = parsec_hwloc_allow_ht(parsec_runtime_hardware_threads); + + if( 1 < available_ht) { + parsec_debug_verbose(2, parsec_debug_output, "Activating %d (requested %d) hardware thread(s) for ES %d", available_ht, parsec_runtime_hardware_threads, es->th_id); + ht_activated = 1; + posix_memalign((void**)&es->team, 64, sizeof(parsec_ht_team_t)); + es->team->nb_workers = available_ht -1; + posix_memalign((void**)&es->team->input, 64, es->team->nb_workers * sizeof(parsec_offload_task_t)); + posix_memalign((void**)&es->team->output, 64, es->team->nb_workers * sizeof(parsec_offload_task_t)); + for (i = 0; i < es->team->nb_workers; ++i) { + es->team->input[i] = NULL; + es->team->output[i] = NULL; + } + es->team->keepgoing = 1; + es->team->submitted = 0; + es->team->freelist = NULL; + es->team->current_worker = 0; + es->team->barrier = (parsec_barrier_t*)malloc(sizeof(parsec_barrier_t)); + parsec_barrier_init(es->team->barrier, NULL, es->team->nb_workers+1); + + for (i = 0; i < 5 * es->team->nb_workers; ++i) { /* we cannot have more than 5 tasks in the pipeline */ + parsec_offload_task_t *t = (parsec_offload_task_t*)calloc(1, sizeof(parsec_offload_task_t)); + t->next = es->team->freelist; + es->team->freelist = t; + } + + pthread_attr_t thread_attr; + pthread_attr_init(&thread_attr); + pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM); + + startup->es = es; + workers = (pthread_t*)malloc(es->team->nb_workers * sizeof(pthread_t)); + + /* Let's duplicate the starup struct for each team worker*/ + __parsec_temporary_thread_initialization_t *ht_startup = + (__parsec_temporary_thread_initialization_t*)calloc(es->team->nb_workers, sizeof(__parsec_temporary_thread_initialization_t)); + for (i = 0; i < es->team->nb_workers; ++i) { /* we cannot have more than 5 tasks in the pipeline */ + ht_startup[i].bindto = startup->bindto; + ht_startup[i].bindto_ht = i+1; + ht_startup[i].es = es; + /* Summon your team of interns */ + pthread_create( workers+i, + &thread_attr, + (void* (*)(void*))__parsec_worker_init, + (void*)ht_startup+i); + } + + parsec_barrier_wait(es->team->barrier); + + free(ht_startup); + } + else { + parsec_fatal("PaRSEC has been compiled with hyperthreading capabilities.\nThis machine does not support hyperthread, recompile without the PARSEC_HAVE_HYPERTHREAD_SCHEDULER.\n"); + } + +#endif + /* Synchronize with the other threads */ parsec_barrier_wait(startup->barrier); @@ -272,9 +362,21 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s if( PARSEC_THREAD_IS_MASTER(es) ) { return NULL; } - void *ret = (void*)(long)__parsec_context_wait(es); PARSEC_PAPI_SDE_THREAD_FINI(); + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + if( ht_activated ) { + /* Clean worker thread body */ + es->team->keepgoing = 0; + for (i = 0; i < es->team->nb_workers; ++i) { + void *work_ret = NULL; + pthread_join(workers[i], &work_ret); + } + free(workers); + free(es->team); + } +#endif return ret; } @@ -296,11 +398,12 @@ static void parsec_vp_init( parsec_vp_t *vp, startup[t].th_id = t; startup[t].virtual_process = vp; startup[t].bindto = -1; - startup[t].bindto_ht = -1; + startup[t].bindto_ht = 0; startup[t].barrier = barrier; pi = vpmap_get_nb_cores_affinity(vp->vp_id, t); - if( 1 == pi ) + if( 1 == pi ) { vpmap_get_core_affinity(vp->vp_id, t, &startup[t].bindto, &startup[t].bindto_ht); + } else if( 1 < pi ) parsec_warning("multiple core to bind on... for now, do nothing"); //TODO: what does that mean? } @@ -662,14 +765,14 @@ parsec_context_t* parsec_init( int nb_cores, int* pargc, char** pargv[] ) 0, NULL, &queue_remove_begin, &queue_remove_end); # endif /* PARSEC_PROF_TRACE_SCHEDULING_EVENTS */ -#if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) +# if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) parsec_profiling_add_dictionary_keyword( "ARENA_MEMORY", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &arena_memory_alloc_key, &arena_memory_free_key); parsec_profiling_add_dictionary_keyword( "ARENA_ACTIVE_SET", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &arena_memory_used_key, &arena_memory_unused_key); -#endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */ +# endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */ parsec_profiling_add_dictionary_keyword( "TASK_MEMORY", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &task_memory_alloc_key, &task_memory_free_key); diff --git a/parsec/parsec_internal.h b/parsec/parsec_internal.h index 4cf690fa7..a4f483d7b 100644 --- a/parsec/parsec_internal.h +++ b/parsec/parsec_internal.h @@ -15,6 +15,7 @@ #include "parsec/data.h" #include "parsec/class/list_item.h" #include "parsec/class/parsec_hash_table.h" +#include "parsec/class/fifo.h" #include "parsec/parsec_description_structures.h" #include "parsec/profiling.h" #include "parsec/mempool.h" diff --git a/parsec/scheduling.c b/parsec/scheduling.c index b509f717f..4cca9a8b4 100644 --- a/parsec/scheduling.c +++ b/parsec/scheduling.c @@ -424,6 +424,131 @@ int __parsec_complete_execution( parsec_execution_stream_t *es, return rc; } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_make_task_progress(parsec_execution_stream_t* es, + parsec_task_t* task, + int rc, + int distance) +{ + switch(rc) { + case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ + task->status = PARSEC_TASK_STATUS_COMPLETE; + __parsec_complete_execution( es, task ); + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completing Task %s", + es->th_id, task_string); + return 1; + break; + case PARSEC_HOOK_RETURN_AGAIN: /* Reschedule later */ + task->status = PARSEC_TASK_STATUS_HOOK; + if(0 == task->priority) { + SET_LOWEST_PRIORITY(task, parsec_execution_context_priority_comparator); + } else + task->priority /= 10; /* demote the task */ + PARSEC_LIST_ITEM_SINGLETON(task); + __parsec_schedule(es, task, distance + 1); + task = NULL; + break; + case PARSEC_HOOK_RETURN_ASYNC: /* The task is outside our reach we should not + * even try to change it's state, the completion + * will be triggered asynchronously. */ + break; + case PARSEC_HOOK_RETURN_NEXT: /* Try next variant [if any] */ + case PARSEC_HOOK_RETURN_DISABLE: /* Disable the device, something went wrong */ + case PARSEC_HOOK_RETURN_ERROR: /* Some other major error happened */ + assert( 0 ); /* Internal error: invalid return value */ + } + return 0; +} +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_drain_worker_events(parsec_execution_stream_t* es, int w) +{ + /* Master side */ + parsec_ht_team_t* team = es->team; + /* if hyperthread scheduler, here, we pull events from the shared dedicated fifo to complete tasks */ + if (team->output[w]) { + parsec_offload_task_t *event = team->output[w]; + if(event) { + team->output[w] = NULL; /* atomic cas event for null */ + __parsec_make_task_progress(es, event->task, event->rc, event->distance); + event->next = team->freelist; + team->freelist = event; + } + } + return PARSEC_SUCCESS; +} +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_offload_task( parsec_execution_stream_t* es, + parsec_task_t* task, + int distance) +{ + parsec_ht_team_t* team = es->team; + struct timespec r; + r.tv_sec = 0; + r.tv_nsec = 10; + /* Task goes away, consider async and pull events later to make it progress */ + parsec_offload_task_t *t = team->freelist; + team->freelist = t->next; + t->next = NULL; + t->rc = -1; + t->task = task; + t->es = es; + t->distance = distance; + /* Is there a task in the pipeline? */ + while (NULL != team->input[team->current_worker]) { + /* Yes, there is one, let's try to flush the output pipeline in case we are bottlenecking the worker */ + __parsec_drain_worker_events(es, team->current_worker); + /* 'nough work done, let's sleep */ + nanosleep(&r, NULL); + __asm__ __volatile__(""); + team->current_worker = (team->current_worker+1)%team->nb_workers; + } + /* Worker made room into input pipeline, let's push the next task, t */ + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "[M%d] Offloading Task %s to Hardware Thread %d", + es->th_id, task_string, team->current_worker); + team->input[team->current_worker] = t; + return PARSEC_HOOK_RETURN_ASYNC; +} +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +int __parsec_work_wait( parsec_execution_stream_t* es, int id ) +{ + char task_string[MAX_TASK_STRLEN]; + /* Worker side */ + parsec_ht_team_t* team = es->team; + struct timespec rqtp; + rqtp.tv_sec = 0; + rqtp.tv_nsec = 100; + + do { + parsec_offload_task_t *task = team->input[id]; + if (task != NULL) { + team->input[id] = NULL; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task->task); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "[W%i/%d] Executing Task %s", + es->th_id, id, task_string); + task->rc = __parsec_execute( task->es, task->task ); + while (NULL != team->output[id]) { + nanosleep(&rqtp, NULL); + __asm__ __volatile__(""); + } + team->output[id] = task; + continue; + } + nanosleep(&rqtp, NULL); + } while(team->keepgoing); + return PARSEC_SUCCESS; +} +#endif + int __parsec_task_progress( parsec_execution_stream_t* es, parsec_task_t* task, int distance) @@ -439,9 +564,25 @@ int __parsec_task_progress( parsec_execution_stream_t* es, } switch(rc) { case PARSEC_HOOK_RETURN_DONE: { - if(task->status <= PARSEC_TASK_STATUS_HOOK) { + if (task->status == PARSEC_TASK_STATUS_COMPLETE ) { + /* In some cases (Startup tasks), they can return from prepare input as completed */ + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completing %s: %p", + es->th_id, task->task_class->name, task); + __parsec_complete_execution( es, task ); + break; + } + else if(task->status <= PARSEC_TASK_STATUS_HOOK) { +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + rc = __parsec_offload_task(es, task, distance); +#else rc = __parsec_execute( es, task ); +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + int v; + for (v = 0; v < es->team->nb_workers; ++v) + __parsec_drain_worker_events(es, es->team->current_worker); +#else /* We're good to go ... */ switch(rc) { case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ @@ -467,6 +608,7 @@ int __parsec_task_progress( parsec_execution_stream_t* es, assert( 0 ); /* Internal error: invalid return value */ } break; +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } case PARSEC_HOOK_RETURN_ASYNC: /* The task is outside our reach we should not * even try to change it's state, the completion @@ -565,9 +707,13 @@ int __parsec_context_wait( parsec_execution_stream_t* es ) rc = __parsec_task_progress(es, task, distance); (void)rc; /* for now ignore the return value */ - nbiterations++; } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + int v; + for (v = 0; v < es->team->nb_workers; ++v) + __parsec_drain_worker_events(es, v); +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } parsec_rusage_per_es(es, true); diff --git a/parsec/scheduling.h b/parsec/scheduling.h index a134f7e92..5d3794de6 100644 --- a/parsec/scheduling.h +++ b/parsec/scheduling.h @@ -76,6 +76,7 @@ int __parsec_reschedule(parsec_execution_stream_t* es, */ int __parsec_context_wait(parsec_execution_stream_t* es); +int __parsec_work_wait(parsec_execution_stream_t* es, int ht); /** * Execute the body of the task associated to the context. */ diff --git a/parsec/vpmap.c b/parsec/vpmap.c index 9a7730ca5..ea3bd4435 100644 --- a/parsec/vpmap.c +++ b/parsec/vpmap.c @@ -102,7 +102,7 @@ static void vpmap_get_core_affinity_parameters(int vp, int thread, int *cores, i #endif /* PARSEC_HAVE_HWLOC */ *cores = (vp * nbcores * nbht) + thread; if (nbht > 1 ) { - *ht = (*cores) % nbht; + *ht = thread / nbcores; } else { *ht = -1; } @@ -378,7 +378,11 @@ int vpmap_init_from_flat(int _nbcores) #endif /* defined(PARSEC_HAVE_HWLOC) */ nbvp = 1; - nbcores = _nbcores/nbht; +#if !defined(PARSEC_HYPERTHREAD_ROUND_ROBIN) + nbcores = _nbcores/nbht; /* Fill a core with hyperthreads before proceeding to the next one */ +#else + nbcores = _nbcores; /* Round robin distribution of hyperthreads */ +#endif nbthreadspervp = _nbcores; vpmap_nb_total_threads = nbvp * nbthreadspervp;