diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bd543b64..9f29cceaa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,6 +90,10 @@ mark_as_advanced(PARSEC_SCHED_DEPS_MASK) option(PARSEC_SCHED_DEPS_MASK "Use a complete bitmask to track the dependencies, instead of a counter -- increase the debugging features, but limits to a maximum of 30 input dependencies" ON) +mark_as_advanced(PARSEC_HAVE_HYPERTHREAD_SCHEDULER) +option(PARSEC_HAVE_HYPERTHREAD_SCHEDULER + "Double the number of threads, half will compute, the other half is in charge of scheduling, lookups, and all the boring stuff" ON) + ### Distributed engine parameters mark_as_advanced(PARSEC_DIST_THREAD PARSEC_DIST_PRIORITIES) option(PARSEC_DIST_WITH_MPI diff --git a/parsec/CMakeLists.txt b/parsec/CMakeLists.txt index 6553cd369..99f367470 100644 --- a/parsec/CMakeLists.txt +++ b/parsec/CMakeLists.txt @@ -116,6 +116,11 @@ if( PARSEC_PROF_GRAPHER ) endif( PARSEC_PROF_GRAPHER ) add_subdirectory(data_dist) + +if( PARSEC_HAVE_HYPERTHREAD_SCHEDULER ) + add_definitions(-DPARSEC_HYPERTHREAD_SCHEDULER -DPARSEC_HYPERTHREAD_ROUND_ROBIN) +endif( PARSEC_HAVE_HYPERTHREAD_SCHEDULER ) + # # Setup targets # diff --git a/parsec/include/parsec/execution_stream.h b/parsec/include/parsec/execution_stream.h index e63bdcdda..247e68cbc 100644 --- a/parsec/include/parsec/execution_stream.h +++ b/parsec/include/parsec/execution_stream.h @@ -17,6 +17,7 @@ #include "parsec/mempool.h" #include "parsec/profiling.h" #include "parsec/class/barrier.h" +#include "parsec/class/list.h" #ifdef PINS_ENABLE #include "parsec/mca/pins/pins.h" @@ -29,6 +30,15 @@ BEGIN_C_DECLS +/** + * @brief A container for interaction between scheduler and worker thread + */ +typedef struct parsec_shared_information_s parsec_shared_information_t; +/** + * @brief A container for interaction between scheduler and worker thread + */ +typedef struct parsec_loctask_s parsec_loctask_t; + /** * Computational Thread-specific structure */ @@ -65,6 +75,9 @@ struct parsec_execution_stream_s { * we use these mempools */ parsec_thread_mempool_t *dependencies_mempool; /**< If using hashtables to store dependencies * those are allocated using this mempool */ +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_shared_information_t *shared; +#endif }; /** @@ -159,6 +172,25 @@ struct parsec_context_s { #define PARSEC_THREAD_IS_MASTER(eu) ( ((eu)->th_id == 0) && ((eu)->virtual_process->vp_id == 0) ) +struct parsec_shared_information_s { + parsec_barrier_t *barrier; + /* the address pointer needs to be volatile, not the content pointed to */ + parsec_loctask_t *volatile input; + parsec_loctask_t *volatile output; + parsec_loctask_t *freelist; /* simple lifo no lock */ + int keepgoing; + int submitted; +}; + +struct parsec_loctask_s { + parsec_list_item_t super; + int rc; + int distance; + parsec_task_t *task; + parsec_execution_stream_t *es; + parsec_loctask_t *next; +}; + END_C_DECLS #endif /* PARSEC_EXECUTION_UNIT_H_HAS_BEEN_INCLUDED */ diff --git a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c index b3f9382d5..e58a38598 100644 --- a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c +++ b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c @@ -5328,6 +5328,13 @@ static void jdf_generate_code_hook(const jdf_t *jdf, } coutput("%s\n", body->external_code); + + if( profile_on ) { + coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n" + " this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n" + " this_task);\n"); + } + if( !JDF_COMPILER_GLOBAL_ARGS.noline ) { coutput("#line %d \"%s\"\n", cfile_lineno+1, jdf_cfilename); } @@ -5388,7 +5395,7 @@ jdf_generate_code_complete_hook(const jdf_t *jdf, } } - if( profile_on ) { + if( 0 && profile_on ) { coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n" " this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n" " (parsec_task_t*)this_task);\n"); diff --git a/parsec/parsec.c b/parsec/parsec.c index 89489e891..59886c8e8 100644 --- a/parsec/parsec.c +++ b/parsec/parsec.c @@ -28,6 +28,7 @@ #include "parsec/utils/output.h" #include "parsec/data_internal.h" #include "parsec/class/list.h" +#include "parsec/class/fifo.h" #include "parsec/scheduling.h" #include "parsec/class/barrier.h" #include "parsec/remote_dep.h" @@ -167,12 +168,34 @@ typedef struct __parsec_temporary_thread_initialization_t { int bindto_ht; parsec_barrier_t* barrier; /*< the barrier used to synchronize for the * local VP data construction. */ +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + parsec_execution_stream_t* es; +#endif } __parsec_temporary_thread_initialization_t; static int parsec_parse_binding_parameter(const char* option, parsec_context_t* context, - __parsec_temporary_thread_initialization_t* startup); + __parsec_temporary_thread_initialization_t* startup); static int parsec_parse_comm_binding_parameter(const char* option, parsec_context_t* context); +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static void* __parsec_worker_init( __parsec_temporary_thread_initialization_t* startup ) +{ + parsec_execution_stream_t* es = startup->es; + int bindto = startup->bindto; + int bindto_ht = (-1 == startup->bindto_ht) ? 1 : startup->bindto_ht+1; + + parsec_bindthread(bindto, bindto_ht); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind worker thread %i.%i on core %i [HT %i]", + es->virtual_process->vp_id, es->th_id, + bindto, bindto_ht); + + parsec_barrier_wait(es->shared->barrier); + void* ret = (void*)(long)__parsec_work_wait(es); + + return ret; +} +#endif + static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* startup ) { parsec_execution_stream_t* es; @@ -181,10 +204,11 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s /* don't use PARSEC_THREAD_IS_MASTER, it is too early and we cannot yet allocate the es struct */ if( (0 != startup->virtual_process->vp_id) || (0 != startup->th_id) || parsec_runtime_bind_main_thread ) { /* Bind to the specified CORE */ + if( -1 == startup->bindto_ht ) startup->bindto_ht = 0; parsec_bindthread(startup->bindto, startup->bindto_ht); PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind thread %i.%i on core %i [HT %i]", - startup->virtual_process->vp_id, startup->th_id, - startup->bindto, startup->bindto_ht); + startup->virtual_process->vp_id, startup->th_id, + startup->bindto, startup->bindto_ht); } else { PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Don't bind the main thread %i.%i", startup->virtual_process->vp_id, startup->th_id); @@ -233,6 +257,50 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s offsetof(parsec_hashable_dependency_t, mempool_owner), vp->nb_cores); } + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + int ht_activated = 0; + pthread_t worker; + es->shared = NULL; + + if( 1 < parsec_hwloc_allow_ht(2) ) { + ht_activated = 1; + /* es->shared = (parsec_shared_information_t*)malloc(sizeof(parsec_shared_information_t)); */ + posix_memalign((void**)&es->shared, 64, sizeof(parsec_shared_information_t)); + es->shared->barrier = (parsec_barrier_t*)malloc(sizeof(parsec_barrier_t)); + parsec_barrier_init(es->shared->barrier, NULL, 2); + es->shared->input = NULL; + es->shared->output = NULL; + es->shared->keepgoing = 1; + es->shared->submitted = 0; + es->shared->freelist = NULL; + + int i; + for (i = 0; i < 5; ++i) { /* we cannot have more than 5 tasks in the pipeline */ + parsec_loctask_t *t = (parsec_loctask_t*)calloc(1, sizeof(parsec_loctask_t)); + t->next = es->shared->freelist; + es->shared->freelist = t; + } + + pthread_attr_t thread_attr; + pthread_attr_init(&thread_attr); + pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM); + + startup->es = es; + /* Summon your intern */ + pthread_create( &worker, + &thread_attr, + (void* (*)(void*))__parsec_worker_init, + (void*)startup); + + parsec_barrier_wait(es->shared->barrier); + } + else { + parsec_fatal("PaRSEC has been compiled with hyperthreading capabilities.\nThis machine does not support hyperthread, recompile without the PARSEC_HAVE_HYPERTHREAD_SCHEDULER.\n"); + } + +#endif + /* Synchronize with the other threads */ parsec_barrier_wait(startup->barrier); @@ -275,6 +343,17 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s void *ret = (void*)(long)__parsec_context_wait(es); PARSEC_PAPI_SDE_THREAD_FINI(); + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + if( ht_activated ) { + /* Clean worker thread body */ + es->shared->keepgoing = 0; + void *work_ret = NULL; + pthread_join(worker, &work_ret); + + free(es->shared); + } +#endif return ret; } @@ -296,11 +375,12 @@ static void parsec_vp_init( parsec_vp_t *vp, startup[t].th_id = t; startup[t].virtual_process = vp; startup[t].bindto = -1; - startup[t].bindto_ht = -1; + startup[t].bindto_ht = 0; startup[t].barrier = barrier; pi = vpmap_get_nb_cores_affinity(vp->vp_id, t); - if( 1 == pi ) + if( 1 == pi ) { vpmap_get_core_affinity(vp->vp_id, t, &startup[t].bindto, &startup[t].bindto_ht); + } else if( 1 < pi ) parsec_warning("multiple core to bind on... for now, do nothing"); //TODO: what does that mean? } @@ -662,14 +742,14 @@ parsec_context_t* parsec_init( int nb_cores, int* pargc, char** pargv[] ) 0, NULL, &queue_remove_begin, &queue_remove_end); # endif /* PARSEC_PROF_TRACE_SCHEDULING_EVENTS */ -#if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) +# if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) parsec_profiling_add_dictionary_keyword( "ARENA_MEMORY", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &arena_memory_alloc_key, &arena_memory_free_key); parsec_profiling_add_dictionary_keyword( "ARENA_ACTIVE_SET", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &arena_memory_used_key, &arena_memory_unused_key); -#endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */ +# endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */ parsec_profiling_add_dictionary_keyword( "TASK_MEMORY", "fill:#B9B243", sizeof(size_t), "size{int64_t}", &task_memory_alloc_key, &task_memory_free_key); diff --git a/parsec/parsec_internal.h b/parsec/parsec_internal.h index 4cf690fa7..a4f483d7b 100644 --- a/parsec/parsec_internal.h +++ b/parsec/parsec_internal.h @@ -15,6 +15,7 @@ #include "parsec/data.h" #include "parsec/class/list_item.h" #include "parsec/class/parsec_hash_table.h" +#include "parsec/class/fifo.h" #include "parsec/parsec_description_structures.h" #include "parsec/profiling.h" #include "parsec/mempool.h" diff --git a/parsec/scheduling.c b/parsec/scheduling.c index b509f717f..99ede1d94 100644 --- a/parsec/scheduling.c +++ b/parsec/scheduling.c @@ -424,6 +424,131 @@ int __parsec_complete_execution( parsec_execution_stream_t *es, return rc; } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_make_task_progress(parsec_execution_stream_t* es, + parsec_task_t* task, + int rc, + int distance) +{ + switch(rc) { + case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ + task->status = PARSEC_TASK_STATUS_COMPLETE; + __parsec_complete_execution( es, task ); + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completing Task %s", + es->th_id, task_string); + return 1; + break; + case PARSEC_HOOK_RETURN_AGAIN: /* Reschedule later */ + task->status = PARSEC_TASK_STATUS_HOOK; + if(0 == task->priority) { + SET_LOWEST_PRIORITY(task, parsec_execution_context_priority_comparator); + } else + task->priority /= 10; /* demote the task */ + PARSEC_LIST_ITEM_SINGLETON(task); + __parsec_schedule(es, task, distance + 1); + task = NULL; + break; + case PARSEC_HOOK_RETURN_ASYNC: /* The task is outside our reach we should not + * even try to change it's state, the completion + * will be triggered asynchronously. */ + break; + case PARSEC_HOOK_RETURN_NEXT: /* Try next variant [if any] */ + case PARSEC_HOOK_RETURN_DISABLE: /* Disable the device, something went wrong */ + case PARSEC_HOOK_RETURN_ERROR: /* Some other major error happened */ + assert( 0 ); /* Internal error: invalid return value */ + } + return 0; +} +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_drain_worker_events(parsec_execution_stream_t* es) +{ + /* Master side */ + parsec_shared_information_t* shared = es->shared; + /* if hyperthread scheduler, here, we pull events from the shared dedicated fifo to complete tasks */ + if (shared->output) { + parsec_loctask_t *event = shared->output; + if(event) { + shared->output = NULL; /* atomic cas event for null */ + __parsec_make_task_progress(es, event->task, event->rc, event->distance); + /* parsec_list_nolock_push_front(shared->freelist, (parsec_list_item_t*)event); */ + event->next = shared->freelist; + shared->freelist = event; + } + } + return PARSEC_SUCCESS; +} +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +static int __parsec_offload_task( parsec_execution_stream_t* es, + parsec_task_t* task, + int distance) +{ + parsec_shared_information_t* shared = es->shared; + struct timespec r; + r.tv_sec = 0; + r.tv_nsec = 10; + /* Task goes away, consider async and pull events later to make it progress */ + parsec_loctask_t *t = shared->freelist; + shared->freelist = t->next; + t->next = NULL; + t->rc = -1; + t->task = task; + t->es = es; + t->distance = distance; + /* Is there a task in the pipeline? */ + while (NULL != shared->input) { + /* Yes, there is one, let's try to flush the output pipeline in case we are bottlenecking the worker */ + __parsec_drain_worker_events(es); + /* 'nough work done, let's sleep */ + nanosleep(&r, NULL); + __asm__ __volatile__(""); + } + /* Worker made room into input pipeline, let's push the next task, t */ + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Offloading Task %s", + es->th_id, task_string); + shared->input = t; + return PARSEC_HOOK_RETURN_ASYNC; +} +#endif + +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) +int __parsec_work_wait( parsec_execution_stream_t* es ) +{ + /* Worker side */ + parsec_shared_information_t* shared = es->shared; + struct timespec rqtp; + rqtp.tv_sec = 0; + rqtp.tv_nsec = 100; + + do { + parsec_loctask_t *task = shared->input; + if (task != NULL) { + shared->input = NULL; + char task_string[MAX_TASK_STRLEN]; + (void)parsec_task_snprintf(task_string, MAX_TASK_STRLEN, task->task); + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[W%d] Executing Task %s", + es->th_id, task_string); + task->rc = __parsec_execute( task->es, task->task ); + while (NULL != shared->output) { + nanosleep(&rqtp, NULL); + __asm__ __volatile__(""); + } + shared->output = task; + continue; + } + nanosleep(&rqtp, NULL); + } while(shared->keepgoing); + return PARSEC_SUCCESS; +} +#endif + int __parsec_task_progress( parsec_execution_stream_t* es, parsec_task_t* task, int distance) @@ -439,9 +564,23 @@ int __parsec_task_progress( parsec_execution_stream_t* es, } switch(rc) { case PARSEC_HOOK_RETURN_DONE: { - if(task->status <= PARSEC_TASK_STATUS_HOOK) { + if (task->status == PARSEC_TASK_STATUS_COMPLETE ) { + /* In some cases (Startup tasks), they can return from prepare input as completed */ + PARSEC_DEBUG_VERBOSE(2, parsec_debug_output, "[M%d] Completing %s: %p", + es->th_id, task->task_class->name, task); + __parsec_complete_execution( es, task ); + break; + } + else if(task->status <= PARSEC_TASK_STATUS_HOOK) { +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + rc = __parsec_offload_task(es, task, distance); +#else rc = __parsec_execute( es, task ); +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + __parsec_drain_worker_events(es); +#else /* We're good to go ... */ switch(rc) { case PARSEC_HOOK_RETURN_DONE: /* This execution succeeded */ @@ -467,6 +606,7 @@ int __parsec_task_progress( parsec_execution_stream_t* es, assert( 0 ); /* Internal error: invalid return value */ } break; +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } case PARSEC_HOOK_RETURN_ASYNC: /* The task is outside our reach we should not * even try to change it's state, the completion @@ -565,9 +705,11 @@ int __parsec_context_wait( parsec_execution_stream_t* es ) rc = __parsec_task_progress(es, task, distance); (void)rc; /* for now ignore the return value */ - nbiterations++; } +#if defined(PARSEC_HYPERTHREAD_SCHEDULER) + __parsec_drain_worker_events(es); +#endif /* PARSEC_HYPERTHREAD_SCHEDULER */ } parsec_rusage_per_es(es, true); diff --git a/parsec/scheduling.h b/parsec/scheduling.h index a134f7e92..853c6b10a 100644 --- a/parsec/scheduling.h +++ b/parsec/scheduling.h @@ -76,6 +76,7 @@ int __parsec_reschedule(parsec_execution_stream_t* es, */ int __parsec_context_wait(parsec_execution_stream_t* es); +int __parsec_work_wait(parsec_execution_stream_t* es); /** * Execute the body of the task associated to the context. */ diff --git a/parsec/vpmap.c b/parsec/vpmap.c index 9a7730ca5..ea3bd4435 100644 --- a/parsec/vpmap.c +++ b/parsec/vpmap.c @@ -102,7 +102,7 @@ static void vpmap_get_core_affinity_parameters(int vp, int thread, int *cores, i #endif /* PARSEC_HAVE_HWLOC */ *cores = (vp * nbcores * nbht) + thread; if (nbht > 1 ) { - *ht = (*cores) % nbht; + *ht = thread / nbcores; } else { *ht = -1; } @@ -378,7 +378,11 @@ int vpmap_init_from_flat(int _nbcores) #endif /* defined(PARSEC_HAVE_HWLOC) */ nbvp = 1; - nbcores = _nbcores/nbht; +#if !defined(PARSEC_HYPERTHREAD_ROUND_ROBIN) + nbcores = _nbcores/nbht; /* Fill a core with hyperthreads before proceeding to the next one */ +#else + nbcores = _nbcores; /* Round robin distribution of hyperthreads */ +#endif nbthreadspervp = _nbcores; vpmap_nb_total_threads = nbvp * nbthreadspervp;