diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c0fb8fbe..d55940945 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,7 +144,7 @@ if(PARSEC_DIST_WITH_MPI AND 0) message(FATAL_ERROR "PARSEC_DIST_WITH_MPI and PARSEC_DIST_WITH_OTHER are mutually exclusive, please select only one") endif() option(PARSEC_DIST_THREAD - "Use an extra thread to progress the data movements" ON) + "Use an extra thread to progress the data movements" OFF) option(PARSEC_DIST_PRIORITIES "Favor the communications that unlock the most prioritary tasks" ON) option(PARSEC_DIST_COLLECTIVES diff --git a/parsec/interfaces/superscalar/CMakeLists.txt b/parsec/interfaces/superscalar/CMakeLists.txt index e0f55d879..4ff096b0d 100644 --- a/parsec/interfaces/superscalar/CMakeLists.txt +++ b/parsec/interfaces/superscalar/CMakeLists.txt @@ -2,7 +2,9 @@ if( BUILD_PARSEC ) LIST(APPEND EXTRA_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/parsec_dtd_data_flush.c ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/overlap_strategies.c - ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/insert_function.c) + ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/insert_function.c + ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/parsec_dtd_broadcast.c + ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/collectives.c) INSTALL(FILES ${CMAKE_CURRENT_SOURCE_DIR}/interfaces/superscalar/insert_function.h diff --git a/parsec/interfaces/superscalar/collectives.c b/parsec/interfaces/superscalar/collectives.c new file mode 100644 index 000000000..1274695a6 --- /dev/null +++ b/parsec/interfaces/superscalar/collectives.c @@ -0,0 +1,195 @@ +/** + * Copyright (c) 2013-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + */ + +#include "parsec/class/lifo.h" +#include "parsec/parsec_internal.h" +#include "parsec/parsec_config.h" +#include "parsec/interfaces/superscalar/insert_function_internal.h" +#include "parsec/interfaces/superscalar/insert_function.h" + +#ifdef PARSEC_DTD_DIST_COLLECTIVES + +/** + * Create and return `parsec_remote_deps_t` structure associated with + * the broadcast of the a data to all the nodes set in the + * `dest_ranks` array. + **/ +parsec_remote_deps_t* parsec_dtd_create_remote_deps( + int myrank, int root, parsec_data_copy_t *data_copy, + parsec_arena_datatype_t *arenas_datatype, + int* dest_ranks, int num_dest_ranks) { + + parsec_remote_deps_t *deps = (parsec_remote_deps_t*)remote_deps_allocate(&parsec_remote_dep_context.freelist); + + assert(NULL != deps); + assert(NULL == deps->taskpool); + + deps->root = root; + deps->outgoing_mask |= (1 << 0); /* only 1 flow */ + deps->max_priority = 0; + + struct remote_dep_output_param_s* output = &deps->output[0]; + output->data.data = NULL; + output->data.arena = arenas_datatype->arena; + output->data.layout = arenas_datatype->opaque_dtt; + output->data.count = 1; + output->data.displ = 0; + output->priority = 0; + + if (myrank == root) { + // if my rank corresponds to the root for this broadcast then we + // add `data_copy` to the remote deps information + // `data.data`. Otherwise, we leave it to NULL. + output->data.data = data_copy; + } + + int _array_pos, _array_mask; + uint32_t dest_rank_idx; + if(myrank == root) { + // Loop through destination ranks in `dest_rank` array + for (dest_rank_idx = 0; dest_rank_idx < (uint32_t)num_dest_ranks; ++dest_rank_idx) { + + // Get rank from `dest_rank` array + uint32_t dest_rank = dest_ranks[dest_rank_idx]; + + // Skip if we are root + if(deps->root == dest_rank) continue; + + _array_pos = dest_rank / (8 * sizeof(uint32_t)); + _array_mask = 1 << (dest_rank % (8 * sizeof(uint32_t))); + + if( !(output->rank_bits[_array_pos] & _array_mask) ) { + output->rank_bits[_array_pos] |= _array_mask; + output->deps_mask |= (1 << 0); /* not used by DTD? */ + output->count_bits++; + } + } + } else{ + _array_pos = myrank / (8 * sizeof(uint32_t)); + _array_mask = 1 << (myrank % (8 * sizeof(uint32_t))); + + output->rank_bits[_array_pos] |= _array_mask; + output->deps_mask |= (1 << 0); /* not used by DTD? */ + output->count_bits++; + } + + return deps; +} + +/** + * Perform a broadcast for of the dtd tile `dtd_tile_root` from the + * root node associated with the rank `root` to the nodes with ranks + * set in the `dest_ranks` array. + **/ +void parsec_dtd_broadcast( + parsec_taskpool_t *taskpool, int root, + parsec_dtd_tile_t* dtd_tile_root, int arena_index, + //parsec_dtd_tile_t* bcast_keys_root, int bcast_arena_index, + int* dest_ranks, int num_dest_ranks) { + + parsec_dtd_tile_t* bcast_keys_root = NULL; + int bcast_arena_index = 15; + + + parsec_data_copy_t *parsec_data_copy; + int *data_ptr; + int key; + int bcast_id; + int myrank = taskpool->context->my_rank; + parsec_dtd_taskpool_t *dtd_tp = (parsec_dtd_taskpool_t *)taskpool; + + //bcast_keys_root = (parsec_dtd_tile_t *) parsec_thread_mempool_allocate( parsec_bcast_keys_tile_mempool->thread_mempools ); + bcast_keys_root = (parsec_dtd_tile_t *) malloc(sizeof(parsec_dtd_tile_t)); + SET_LAST_ACCESSOR(bcast_keys_root); + bcast_keys_root->dc = NULL; + bcast_keys_root->arena_index = -1; + bcast_keys_root->key = (uint64_t) bcast_id; + bcast_keys_root->rank = root; + bcast_keys_root->flushed = NOT_FLUSHED; + parsec_data_copy_t* new_data_copy = PARSEC_OBJ_NEW(parsec_data_copy_t); + + new_data_copy->coherency_state = PARSEC_DATA_COHERENCY_OWNED; + new_data_copy->device_private = malloc(sizeof(int)*2500); + bcast_keys_root->data_copy = new_data_copy; + + if(myrank == root) { + bcast_id = ( (1<<30) | (root << 18) | dtd_tp->bcast_id); + dtd_tp->bcast_id++; + + + parsec_data_copy = bcast_keys_root->data_copy; + data_ptr = (int*)parsec_data_copy_get_ptr(parsec_data_copy); + data_ptr[0] = bcast_id; + data_ptr[600] = num_dest_ranks; + for(int i = 0; i < num_dest_ranks; i++) { + data_ptr[dest_ranks[i]+1] = dtd_tp->send_task_id[dest_ranks[i]]++; + //pack the ranks at the end of the tiles as well + data_ptr[600+i+1] = dest_ranks[i]; + } + bcast_keys_root->ht_item.key = (parsec_key_t)((uintptr_t)data_ptr[0]); + + //fprintf(stderr, "on rank %d inserting key tile into bcast_keys_hash with key %ld num dest ranks %d\n", myrank, bcast_keys_root->ht_item.key, data_ptr[400]); + parsec_hash_table_insert(parsec_bcast_keys_hash, &bcast_keys_root->ht_item); + parsec_mfence(); /* Write */ + } + + // Retrieve DTD tile's data_copy + //parsec_data_copy_t *data_copy = dtd_tile_root->data_copy; + //parsec_data_copy_t *key_copy = bcast_keys_root->data_copy; + + // Create remote deps corresponding to the braodcast + /* + parsec_remote_deps_t *deps_0 = parsec_dtd_create_remote_deps( + myrank, root, data_copy, &parsec_dtd_arenas_datatypes[arena_index], + dest_ranks, num_dest_ranks); + parsec_remote_deps_t *deps_1 = parsec_dtd_create_remote_deps( + myrank, root, key_copy, &parsec_dtd_arenas_datatypes[bcast_arena_index], + dest_ranks, num_dest_ranks); + */ + parsec_task_t *bcast_task_root = parsec_dtd_taskpool_create_task( + taskpool, parsec_dtd_bcast_data_fn, 0, "bcast_data_fn", + PASSED_BY_REF, dtd_tile_root, PARSEC_INOUT | arena_index, + sizeof(int), &root, PARSEC_VALUE | PARSEC_AFFINITY, + PARSEC_DTD_ARG_END); + + parsec_dtd_task_t *dtd_bcast_task_root = (parsec_dtd_task_t *)bcast_task_root; + + // Set broadcast topology info + //deps_0->pending_ack = 0; + //dtd_bcast_task_root->deps_out = deps_0; + + if(myrank == root) { + dtd_bcast_task_root->ht_item.key = bcast_id; + dtd_bcast_task_root->super.locals[0].value = dtd_bcast_task_root->ht_item.key; + }else{ + bcast_id = ( (1<<28) | (root << 18) | dtd_tp->recv_task_id[root]++); + dtd_bcast_task_root->ht_item.key = bcast_id; + dtd_bcast_task_root->super.locals[0].value = dtd_bcast_task_root->ht_item.key; + } + + parsec_task_t *bcast_key_root = parsec_dtd_taskpool_create_task( + taskpool, parsec_dtd_bcast_key_fn, 0, "bcast_key_fn", + PASSED_BY_REF, bcast_keys_root, PARSEC_INOUT | bcast_arena_index, + sizeof(int), &root, PARSEC_VALUE | PARSEC_AFFINITY, + PARSEC_DTD_ARG_END); + parsec_dtd_task_t *dtd_bcast_key_root = (parsec_dtd_task_t *)bcast_key_root; + //deps_1->pending_ack = 0; + //dtd_bcast_key_root->deps_out = deps_1; + if(myrank == root) { + /* nothing here since the key is stored in the key array and will be updated before remote_dep_activate */ + }else{ + bcast_id = ( (1<<29) | (root << 18) | (dtd_tp->recv_task_id[root] -1)); + dtd_bcast_key_root->ht_item.key = bcast_id; + dtd_bcast_key_root->super.locals[0].value = dtd_bcast_key_root->ht_item.key; + } + /* Post the bcast of keys and ranks array */ + /* Post the bcast tasks for the actual data */ + parsec_insert_dtd_task(dtd_bcast_task_root); + //sleep(1); + parsec_insert_dtd_task(dtd_bcast_key_root); +} + +#endif diff --git a/parsec/interfaces/superscalar/insert_function.c b/parsec/interfaces/superscalar/insert_function.c index fbb59f51d..d1aeb9aff 100644 --- a/parsec/interfaces/superscalar/insert_function.c +++ b/parsec/interfaces/superscalar/insert_function.c @@ -65,6 +65,7 @@ int parsec_dtd_threshold_size = 4000; /**< Default threshold size of static int parsec_dtd_task_hash_table_size = 1<<16; /**< Default task hash table size */ static int parsec_dtd_tile_hash_table_size = 1<<16; /**< Default tile hash table size */ static int parsec_dtd_no_of_arenas_datatypes = 16; +static int parsec_dtd_bcast_tile_size = 50; int parsec_dtd_dump_traversal_info = 60; /**< Level for printing traversal info */ int parsec_dtd_dump_function_info = 50; /**< Level for printing function_structure info */ @@ -83,6 +84,8 @@ parsec_mempool_t *parsec_dtd_taskpool_mempool = NULL; /* Global mempool for all tiles */ parsec_mempool_t *parsec_dtd_tile_mempool = NULL; +parsec_hash_table_t* parsec_bcast_keys_hash; +parsec_mempool_t* parsec_bcast_keys_tile_mempool; /** * All the static functions should be declared before being defined. */ @@ -96,13 +99,11 @@ parsec_dtd_iterate_successors(parsec_execution_stream_t *es, uint32_t action_mask, parsec_ontask_function_t *ontask, void *ontask_arg); - static int parsec_dtd_release_deps(parsec_execution_stream_t *, parsec_task_t *, uint32_t, parsec_remote_deps_t *); - static parsec_hook_return_t complete_hook_of_dtd(parsec_execution_stream_t *, parsec_task_t *); @@ -214,6 +215,12 @@ parsec_dtd_enqueue_taskpool(parsec_taskpool_t *tp, void *data) /* The first taskclass of every taskpool is the flush taskclass */ parsec_dtd_create_task_class(dtd_tp, parsec_dtd_data_flush_sndrcv, "parsec_dtd_data_flush", 0, 0, 1); + /* The second taskclass of every taskpool is the bcast key array propagation taskclass */ + parsec_dtd_create_task_class(dtd_tp, parsec_dtd_bcast_key_fn, "parsec_dtd_bcast_key_fn", + 2, sizeof(int), 1); + /* The third taskclass of every taskpool is the bcast taskclass for tile data bcast */ + parsec_dtd_create_task_class(dtd_tp, parsec_dtd_bcast_data_fn, "parsec_dtd_bcast_data_fn", + 2, sizeof(int), 1); return 0; } @@ -247,6 +254,14 @@ void parsec_dtd_taskpool_constructor(parsec_dtd_taskpool_t *tp) tp->function_counter = 0; + tp->keys_hash_table = PARSEC_OBJ_NEW(parsec_hash_table_t); + for(nb = 1; nb < 16 && (1<keys_hash_table, + offsetof(dtd_hash_table_pointer_item_t, ht_item), + nb, + DTD_key_fns, + tp->keys_hash_table); + tp->task_hash_table = PARSEC_OBJ_NEW(parsec_hash_table_t); for(nb = 1; nb < 16 && (1<task_hash_table, @@ -448,6 +463,23 @@ parsec_dtd_lazy_init(void) 1/* no. of threads*/ ); parsec_dtd_arenas_datatypes = (parsec_arena_datatype_t *) calloc(parsec_dtd_no_of_arenas_datatypes, sizeof(parsec_arena_datatype_t)); + + parsec_bcast_keys_hash = PARSEC_OBJ_NEW(parsec_hash_table_t); + int nb; + for(nb = 1; nb < 16 && (1 << nb) < parsec_dtd_tile_hash_table_size; nb++) /* nothing */; + parsec_hash_table_init( parsec_bcast_keys_hash, + offsetof(parsec_dtd_tile_t, ht_item), + nb, + DTD_key_fns, + parsec_bcast_keys_hash); + parsec_bcast_keys_tile_mempool = (parsec_mempool_t*) malloc (sizeof(parsec_mempool_t)); + parsec_mempool_construct( parsec_bcast_keys_tile_mempool, + PARSEC_OBJ_CLASS(parsec_dtd_tile_t), sizeof(parsec_dtd_tile_t), + offsetof(parsec_dtd_tile_t, mempool_owner), + 1/* no. of threads*/ ); + parsec_matrix_add2arena_rect(&parsec_dtd_arenas_datatypes[15], + parsec_datatype_int32_t, + parsec_dtd_bcast_tile_size, parsec_dtd_bcast_tile_size, parsec_dtd_bcast_tile_size); } /* **************************************************************************** */ @@ -734,6 +766,7 @@ parsec_dtd_track_task( parsec_dtd_taskpool_t *tp, { dtd_hash_table_pointer_item_t *item = (dtd_hash_table_pointer_item_t *)parsec_thread_mempool_allocate(tp->hash_table_bucket_mempool->thread_mempools); + //fprintf(stderr, "tracking task with key value %ld on rank %d\n", key, tp->super.context->my_rank); parsec_hash_table_t *hash_table = tp->task_hash_table; item->ht_item.key = (parsec_key_t)key; @@ -760,6 +793,7 @@ parsec_dtd_untrack_task( parsec_dtd_taskpool_t *tp, parsec_hash_table_t *hash_table = tp->task_hash_table; void *value; + //fprintf(stderr, "untracking task with key value %ld on rank %d\n", key, tp->super.context->my_rank); dtd_hash_table_pointer_item_t *item = (dtd_hash_table_pointer_item_t *)parsec_hash_table_nolock_find( hash_table, (parsec_key_t)key ); if( NULL == item ) return NULL; @@ -949,6 +983,8 @@ parsec_dtd_tile_insert( uint64_t key, void parsec_dtd_tile_remove( parsec_data_collection_t *dc, uint64_t key ) { + if(dc == NULL) + return; parsec_hash_table_t *hash_table = (parsec_hash_table_t *)dc->tile_h_table; parsec_hash_table_remove( hash_table, (parsec_key_t)key ); @@ -1274,6 +1310,9 @@ parsec_dtd_taskpool_new(void) __tp->current_thread_id = 0; __tp->function_counter = 0; __tp->enqueue_flag = 0; + __tp->bcast_id = 0; + memset(__tp->send_task_id, 0, MAX_RANK_INFO*sizeof(int)*8*sizeof(int)); + memset(__tp->recv_task_id, 0, MAX_RANK_INFO*sizeof(int)*8*sizeof(int)); (void)parsec_taskpool_reserve_id((parsec_taskpool_t *) __tp); if( 0 < asprintf(&__tp->super.taskpool_name, "DTD Taskpool %d", @@ -1436,23 +1475,43 @@ dtd_release_dep_fct( parsec_execution_stream_t *es, parsec_release_dep_fct_arg_t *arg = (parsec_release_dep_fct_arg_t *)param; parsec_dtd_task_t *current_task = (parsec_dtd_task_t *)newcontext; int32_t not_ready = 1; - + #if defined(DISTRIBUTED) if( dst_rank != src_rank && src_rank == oldcontext->taskpool->context->my_rank) { assert( 0 == (arg->action_mask & PARSEC_ACTION_RECV_INIT_REMOTE_DEPS) ); + /* check that the desc task is a read task, then it should skip here */ + if(PARSEC_DTD_BCAST_DATA_TC_ID == oldcontext->task_class->task_class_id) { + if(PARSEC_INPUT == ((FLOW_OF(current_task, dep->dep_index))->op_type & PARSEC_GET_OP_TYPE)) { + return PARSEC_ITERATE_CONTINUE; + } + } if( arg->action_mask & PARSEC_ACTION_SEND_INIT_REMOTE_DEPS ) { if( parsec_dtd_not_sent_to_rank((parsec_dtd_task_t *)oldcontext, dep->belongs_to->flow_index, dst_rank) ) { struct remote_dep_output_param_s* output; int _array_pos, _array_mask; + /* On the sender side, update the key of the dep flow */ + parsec_dtd_task_t * real_parent_task = (parsec_dtd_task_t *)oldcontext; + #if !defined(PARSEC_DIST_COLLECTIVES) assert(src_rank == es->virtual_process->parsec_context->my_rank); #endif _array_pos = dst_rank / (8 * sizeof(uint32_t)); _array_mask = 1 << (dst_rank % (8 * sizeof(uint32_t))); PARSEC_ALLOCATE_REMOTE_DEPS_IF_NULL(arg->remote_deps, oldcontext, MAX_PARAM_COUNT); + if(real_parent_task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + real_parent_task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + arg->remote_deps->bcast_keys[dep->dep_datatype_index] = 0; + arg->remote_deps->bcast_keys[dep->dep_datatype_index] |= src_rank<<18; + arg->remote_deps->bcast_keys[dep->dep_datatype_index] |= (FLOW_OF(real_parent_task, dep->belongs_to->flow_index))->msg_keys[dst_rank]; + } else { + arg->remote_deps->bcast_keys[dep->dep_datatype_index] = 0; + arg->remote_deps->bcast_keys[dep->dep_datatype_index] |= src_rank<<18; + arg->remote_deps->bcast_keys[dep->dep_datatype_index] |= (FLOW_OF(real_parent_task, dep->belongs_to->flow_index))->msg_keys[dst_rank]; + arg->remote_deps->bcast_keys[15] = 1; /* a flag to indicate bcast_keys has been set */ + } output = &arg->remote_deps->output[dep->dep_datatype_index]; assert( (-1 == arg->remote_deps->root) || (arg->remote_deps->root == src_rank) ); arg->remote_deps->root = src_rank; @@ -1553,6 +1612,7 @@ parsec_dtd_iterate_successors(parsec_execution_stream_t *es, action_mask, ontask, ontask_arg ); } + /* **************************************************************************** */ /** * Release dependencies after a task is done @@ -1607,6 +1667,7 @@ parsec_dtd_release_deps(parsec_execution_stream_t *es, if((action_mask & (1 << flow_index))) { if(!(track_flow & (1U << flow_index))) { uint64_t key = (((uint64_t)this_task->locals[0].value<<32) | (1U<locals[0].value, tp->super.context->my_rank); parsec_hash_table_lock_bucket(tp->task_hash_table, (parsec_key_t)key); this_dtd_task = parsec_dtd_find_task( tp, key ); assert(this_dtd_task != NULL); @@ -1753,6 +1814,12 @@ parsec_dtd_release_local_task( parsec_dtd_task_t *this_task ) if(PARSEC_DTD_FLUSH_TC_ID == this_task->super.task_class->task_class_id) { assert( current_flow == 0 ); parsec_dtd_tile_release( tile ); + } + if(PARSEC_DTD_BCAST_KEY_TC_ID == this_task->super.task_class->task_class_id) { + assert( current_flow == 0 ); + tile->flushed = FLUSHED; + parsec_dtd_tile_remove( tile->dc, tile->key ); + //parsec_dtd_tile_release( tile ); } } assert(this_task->super.super.super.obj_reference_count == 1); @@ -1801,6 +1868,12 @@ parsec_dtd_remote_task_release( parsec_dtd_task_t *this_task ) assert( current_flow == 0 ); parsec_dtd_tile_release( tile ); } + if(PARSEC_DTD_BCAST_KEY_TC_ID == this_task->super.task_class->task_class_id) { + assert( current_flow == 0 ); + tile->flushed = FLUSHED; + parsec_dtd_tile_remove( tile->dc, tile->key ); + //parsec_dtd_tile_release( tile ); + } } assert(this_task->super.super.super.obj_reference_count == 1); parsec_taskpool_t *tp = this_task->super.taskpool; @@ -1892,6 +1965,23 @@ static int datatype_lookup_of_dtd_task(parsec_execution_stream_t *es, return PARSEC_HOOK_RETURN_DONE; } +static int bcast_key_datatype_lookup_of_dtd_task(parsec_execution_stream_t *es, + const parsec_task_t *this_task, + uint32_t *flow_mask, parsec_dep_data_description_t *data) +{ + (void)es;(void)this_task; + data->count = 1; + data->displ = 0; + data->arena = NULL; + data->data = NULL; + data->layout = PARSEC_DATATYPE_NULL; + data->count = 0; + data->displ = 0; + (*flow_mask) = 0; /* nothing left */ + + return PARSEC_HOOK_RETURN_DONE; +} + /* This function creates relationship between two task function classes. * Arguments: - parsec taskpool (parsec_taskpool_t *) - parent master structure (parsec_task_class_t *) @@ -2016,9 +2106,17 @@ parsec_dtd_create_task_class( parsec_dtd_taskpool_t *__tp, parsec_dtd_funcptr_t* tc->nb_flows = flow_count; /* set to one so that prof_grpaher prints the task id properly */ tc->nb_parameters = 1; - tc->nb_locals = 1; + tc->nb_locals = 9; params[0] = &symb_dtd_taskid; locals[0] = &symb_dtd_taskid; + locals[1] = &symb_dtd_taskid; + locals[2] = &symb_dtd_taskid; + locals[3] = &symb_dtd_taskid; + locals[4] = &symb_dtd_taskid; + locals[5] = &symb_dtd_taskid; + locals[6] = &symb_dtd_taskid; + locals[7] = &symb_dtd_taskid; + locals[8] = &symb_dtd_taskid; tc->data_affinity = NULL; tc->initial_data = NULL; tc->final_data = (parsec_data_ref_fn_t *) NULL; @@ -2031,11 +2129,15 @@ parsec_dtd_create_task_class( parsec_dtd_taskpool_t *__tp, parsec_dtd_funcptr_t* *incarnations = (__parsec_chore_t *)dtd_chore; tc->find_deps = NULL; tc->iterate_successors = parsec_dtd_iterate_successors; + if(tc->task_class_id == PARSEC_DTD_BCAST_KEY_TC_ID) + tc->iterate_successors = parsec_dtd_bcast_key_iterate_successors; tc->iterate_predecessors = NULL; tc->release_deps = parsec_dtd_release_deps; tc->prepare_input = data_lookup_of_dtd_task; tc->prepare_output = output_data_of_dtd_task; - tc->get_datatype = (parsec_datatype_lookup_t *)datatype_lookup_of_dtd_task, + tc->get_datatype = (parsec_datatype_lookup_t *)datatype_lookup_of_dtd_task; + if(tc->task_class_id == PARSEC_DTD_BCAST_KEY_TC_ID) + tc->get_datatype = (parsec_datatype_lookup_t *)bcast_key_datatype_lookup_of_dtd_task; tc->complete_execution = complete_hook_of_dtd; tc->release_task = parsec_release_dtd_task_to_mempool; @@ -2224,9 +2326,37 @@ parsec_dtd_set_descendant(parsec_dtd_task_t *parent_task, uint8_t parent_flow_in parsec_dtd_remote_task_retain( real_parent_task ); } + /* On the receiver side, based on the previous parent key, update next recv key for dep flow */ + if(real_parent_task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + real_parent_task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + if(real_parent_task->ht_item.key == 0xffffffff) { + real_parent_task->ht_item.key = 0; + real_parent_task->ht_item.key |= real_parent_task->rank<<18; + real_parent_task->ht_item.key |= tp->recv_task_id[real_parent_task->rank]++; + real_parent_task->super.locals[0].value = real_parent_task->ht_item.key; + } + } else { + /* parent is a collective, so ID is provided and we don't do anything here */ + } + uint64_t key = (((uint64_t)real_parent_task->ht_item.key)<<32) | (1U<task_hash_table, (parsec_key_t)key); parsec_remote_deps_t *dep = parsec_dtd_find_remote_dep( tp, key ); + if(real_parent_task->super.task_class->task_class_id == PARSEC_DTD_BCAST_DATA_TC_ID) { + parsec_hash_table_t *hash_table = tp->keys_hash_table; + dtd_hash_table_pointer_item_t *item = (dtd_hash_table_pointer_item_t *)parsec_hash_table_find( hash_table, (parsec_key_t)key ); + if(item) { + int* data_ptr = (int*)item->value; + parsec_dtd_untrack_task(tp, key); + parsec_hash_table_unlock_bucket(tp->task_hash_table, (parsec_key_t)key); + key = ((uint64_t)(data_ptr[0])<<32) | (1U<<0); + parsec_hash_table_lock_bucket(tp->task_hash_table, (parsec_key_t)key); + dep = parsec_dtd_find_task(tp, key); + real_parent_task->super.locals[0].value = real_parent_task->ht_item.key = data_ptr[0]; + //populate_remote_deps(data_ptr, real_parent_task->deps_out); + } + //fprintf(stderr, "inserting bcast data task and finding in hashtable with key %llu %d, result %p dep %p\n", key, real_parent_task->super.locals[0].value, item, dep); + } if( NULL == dep ) { if( !(flow->flags & TASK_INSERTED) ) { flow->flags |= TASK_INSERTED; @@ -2234,7 +2364,7 @@ parsec_dtd_set_descendant(parsec_dtd_task_t *parent_task, uint8_t parent_flow_in } } else { if( !(flow->flags & TASK_INSERTED) ) { - assert(dep->from == real_parent_task->rank); + //assert(dep->from == real_parent_task->rank); flow->flags |= TASK_INSERTED; parsec_dtd_untrack_remote_dep( tp, key ); #if defined(PARSEC_PROF_TRACE) @@ -2310,11 +2440,17 @@ parsec_dtd_create_and_initialize_task( parsec_dtd_taskpool_t *dtd_tp, assert(this_task->super.super.super.obj_reference_count == 1); this_task->orig_task = NULL; + this_task->super.taskpool = (parsec_taskpool_t*)dtd_tp; - this_task->ht_item.key = (parsec_key_t)(uintptr_t)(dtd_tp->task_id++); + /* this_task->ht_item.key = (parsec_key_t)(uintptr_t)(dtd_tp->task_id++); */ + this_task->ht_item.key = (uintptr_t)0xffffffff; + /* this is needed for grapher to work properly */ this_task->super.locals[0].value = (int)(uintptr_t)this_task->ht_item.key; - assert( (uintptr_t)this_task->super.locals[0].value == (uintptr_t)this_task->ht_item.key ); + //assert( (uintptr_t)this_task->super.locals[0].value == (uintptr_t)this_task->ht_item.key ); + for(int idx = 0; idx < 9; idx++) { + this_task->super.locals[idx].value = 0; + } this_task->super.task_class = tc; /** * +1 to make sure the task cannot be completed by the potential predecessors, @@ -2327,6 +2463,7 @@ parsec_dtd_create_and_initialize_task( parsec_dtd_taskpool_t *dtd_tp, this_task->super.priority = 0; this_task->super.chore_id = 0; this_task->super.status = PARSEC_TASK_STATUS_NONE; + memset(this_task->rank_bits, 0, MAX_RANK_INFO*sizeof(int)); int j; parsec_dtd_flow_info_t *flow; @@ -2448,6 +2585,8 @@ parsec_dtd_block_if_threshold_reached(parsec_dtd_taskpool_t *dtd_tp, int task_th if( dtd_tp->task_window_size < parsec_dtd_window_size ) { dtd_tp->task_window_size *= 2; } else { + if(dtd_tp->local_task_inserted>0) + //fprintf(stderr, "block function in rank %d with local task inserted %d\n", dtd_tp->super.context->my_rank, dtd_tp->local_task_inserted); parsec_execute_and_come_back(&dtd_tp->super, task_threshold); return 1; /* Indicating we blocked */ @@ -2486,6 +2625,7 @@ parsec_insert_dtd_task(parsec_task_t *__this_task) parsec_dtd_remote_task_retain( this_task ); } + /* In the next segment we resolve the dependencies of each flow */ for( flow_index = 0, tile = NULL, tile_op_type = 0; flow_index < tc->nb_flows; flow_index ++ ) { parsec_dtd_tile_user_t last_user, last_writer; @@ -2607,6 +2747,38 @@ parsec_insert_dtd_task(parsec_task_t *__this_task) tile_op_type, last_user.alive); } + if(last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + /* local parent and we are inserting a remote task, indicates it needs to send data */ + if(parsec_dtd_task_is_local(last_writer.task) && parsec_dtd_task_is_remote(this_task)) + { + int _array_pos, _array_mask; + _array_pos = this_task->rank / (8 * sizeof(int)); + _array_mask = 1 << (this_task->rank % (8 * sizeof(int))); + if(last_writer.task->rank_bits[_array_pos] & _array_mask) + { + //FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = last_writer.task->super.locals[5+this_task->rank%5].value; + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = last_writer.task->send_id_storage[this_task->rank]; + } else + { + last_writer.task->rank_bits[_array_pos] |= _array_mask; + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = dtd_tp->send_task_id[this_task->rank]++; + //last_writer.task->super.locals[5+this_task->rank%5].value = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + last_writer.task->send_id_storage[this_task->rank] = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + } + } + } else { + /* For bcast data task, if we have a remote write descendant, generate the p2p key for send */ + if(last_writer.task->super.task_class->task_class_id == PARSEC_DTD_BCAST_DATA_TC_ID && parsec_dtd_task_is_remote(this_task) + && parsec_dtd_task_is_local(last_writer.task)) { + if(PARSEC_INOUT == (tile_op_type & PARSEC_GET_OP_TYPE)) { + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = dtd_tp->send_task_id[this_task->rank]++; + last_writer.task->super.locals[5].value = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + //fprintf(stderr, "BCAST_DATA %p last_writer.task on root %d with send ID to rank %d task %p as %d\n", last_writer.task, last_writer.task->rank, this_task->rank, this_task, last_writer.task->super.locals[5].value); + } + } + } + /* Are we using the same data multiple times for the same task? */ if(last_user.task == this_task) { satisfied_flow += 1; @@ -2683,6 +2855,36 @@ parsec_insert_dtd_task(parsec_task_t *__this_task) } } + + if(last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + /* local parent and we are inserting a remote task, indicates it needs to send data */ + if(parsec_dtd_task_is_local(last_writer.task) && parsec_dtd_task_is_remote(this_task)) + { + int _array_pos, _array_mask; + _array_pos = this_task->rank / (8 * sizeof(int)); + _array_mask = 1 << (this_task->rank % (8 * sizeof(int))); + if(last_writer.task->rank_bits[_array_pos] & _array_mask) + { + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = last_writer.task->send_id_storage[this_task->rank]; + } else + { + last_writer.task->rank_bits[_array_pos] |= _array_mask; + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = dtd_tp->send_task_id[this_task->rank]++; + last_writer.task->send_id_storage[this_task->rank] = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + } + } + } else { + /* For bcast data task, if we have a remote write descendant, generate the p2p key for send */ + if(last_writer.task->super.task_class->task_class_id == PARSEC_DTD_BCAST_DATA_TC_ID && parsec_dtd_task_is_remote(this_task) + && parsec_dtd_task_is_local(last_writer.task)) { + if(PARSEC_INOUT == (tile_op_type & PARSEC_GET_OP_TYPE)) { + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = dtd_tp->send_task_id[this_task->rank]++; + last_writer.task->super.locals[5].value = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + //fprintf(stderr, "BCAST_DATA2222 %p last_writer.task on root %d with send ID to rank %d task %p as %d\n", last_writer.task, last_writer.task->rank, this_task->rank, this_task, last_writer.task->super.locals[5].value); + } + } + } /* we can avoid all the hash table crap if the last_writer is not alive */ if( put_in_chain ) { @@ -2752,6 +2954,9 @@ parsec_insert_dtd_task(parsec_task_t *__this_task) dtd_tp->local_task_inserted++; PARSEC_DEBUG_VERBOSE(parsec_dtd_dump_traversal_info, parsec_dtd_debug_output, "Task generated -> %s %d rank %d\n", this_task->super.task_class->name, this_task->ht_item.key, this_task->rank); + //if(this_task->rank == 1) { + //fprintf(stderr, "Task generated -> %s %d rank %d\n", this_task->super.task_class->name, this_task->ht_item.key, this_task->rank); + //} } /* Releasing every remote_task */ @@ -2762,6 +2967,28 @@ parsec_insert_dtd_task(parsec_task_t *__this_task) /* Increase the count of satisfied flows to counter-balance the increase in the * number of expected flows done during the task creation. */ satisfied_flow++; + + if(parsec_dtd_task_is_remote(this_task) && this_task->super.task_class->task_class_id == PARSEC_DTD_BCAST_KEY_TC_ID) { + parsec_dtd_flow_info_t *flow = FLOW_OF(this_task, 0); + uint64_t key = ((uint64_t)(this_task->super.locals[0].value)<<32) | (1U<<0); + parsec_hash_table_lock_bucket(dtd_tp->task_hash_table, (parsec_key_t)key); + parsec_remote_deps_t *dep = parsec_dtd_find_remote_dep( dtd_tp, key ); + //fprintf(stderr, "tracking remote task of key %d on rank %d\n", this_task->super.locals[0].value, dtd_tp->super.context->my_rank); + if( NULL == dep ) { + if( !(flow->flags & TASK_INSERTED) ) { + flow->flags |= TASK_INSERTED; + parsec_dtd_track_task( dtd_tp, key, this_task ); + } + } else { + if( !(flow->flags & TASK_INSERTED) ) { + flow->flags |= TASK_INSERTED; + parsec_dtd_untrack_remote_dep( dtd_tp, key ); + parsec_dtd_track_task( dtd_tp, key, this_task ); + remote_dep_dequeue_delayed_dep_release(dep); + } + } + parsec_hash_table_unlock_bucket(dtd_tp->task_hash_table, (parsec_key_t)key); + } #if defined(PARSEC_PROF_TRACE) if(parsec_dtd_profile_verbose) @@ -3063,3 +3290,10 @@ parsec_dtd_get_taskpool(parsec_task_t *this_task) { return this_task->taskpool; } + +int +parsec_dtd_rank_of_data(parsec_dc_t *dc, int i, int j) +{ + parsec_data_key_t key = dc->data_key(dc, i, j); + return dc->rank_of_key(dc, key); +} diff --git a/parsec/interfaces/superscalar/insert_function.h b/parsec/interfaces/superscalar/insert_function.h index 215fdf87b..9cc0798f3 100644 --- a/parsec/interfaces/superscalar/insert_function.h +++ b/parsec/interfaces/superscalar/insert_function.h @@ -15,10 +15,16 @@ #define PARSEC_INSERT_FUNCTION_H_HAS_BEEN_INCLUDED #include "parsec/runtime.h" +#include "parsec/parsec_internal.h" #include "parsec/data_distribution.h" BEGIN_C_DECLS +#ifdef PARSEC_DIST_COLLECTIVES +// Availability of collective operations with DTD interface. +#define PARSEC_DTD_DIST_COLLECTIVES +#endif + /** * @addtogroup DTD_INTERFACE * @{ @@ -96,6 +102,8 @@ extern parsec_arena_datatype_t *parsec_dtd_arenas_datatypes; extern int parsec_dtd_window_size; extern int parsec_dtd_threshold_size; +extern parsec_hash_table_t* parsec_bcast_keys_hash; +extern parsec_mempool_t* parsec_bcast_keys_tile_mempool; typedef struct parsec_dtd_tile_s parsec_dtd_tile_t; typedef struct parsec_dtd_task_s parsec_dtd_task_t; @@ -321,6 +329,30 @@ parsec_dtd_get_taskpool(parsec_task_t *this_task); int parsec_dtd_dequeue_taskpool(parsec_taskpool_t *tp); +#ifdef PARSEC_DTD_DIST_COLLECTIVES +/** + * Create and return `parsec_remote_deps_t` structure associated with + * the broadcast of the a data to all the nodes set in the + * `dest_ranks` array. + **/ + +parsec_remote_deps_t* parsec_dtd_create_remote_deps( + int myrank, int root, parsec_data_copy_t *data_copy, + parsec_arena_datatype_t *arenas_datatype, + int* dest_ranks, int num_dest_ranks); + +/** + * Perform a broadcast for of the dtd tile `dtd_tile_root` from the + * root node associated with the rank `root` to the nodes with ranks + * set in the `dest_ranks` array. + **/ + +void parsec_dtd_broadcast( + parsec_taskpool_t *taskpool, int root, + parsec_dtd_tile_t* dtd_tile_root, int arena_index, + int* dest_ranks, int num_dest_ranks); +#endif + /** * @} */ diff --git a/parsec/interfaces/superscalar/insert_function_internal.h b/parsec/interfaces/superscalar/insert_function_internal.h index bbf4ff1dd..3d34d5669 100644 --- a/parsec/interfaces/superscalar/insert_function_internal.h +++ b/parsec/interfaces/superscalar/insert_function_internal.h @@ -38,6 +38,9 @@ extern int parsec_dtd_debug_output; extern int parsec_dtd_dump_traversal_info; /**< For printing traversal info */ #define PARSEC_DTD_FLUSH_TC_ID ((uint8_t)0x00) +#define PARSEC_DTD_BCAST_KEY_TC_ID ((uint8_t)0x01) +#define PARSEC_DTD_BCAST_DATA_TC_ID ((uint8_t)0x02) +#define PARSEC_DTD_BCAST_TC_ID = ((PARSEC_DTD_BCAST_KEY_TC_ID) | (PARSEC_DTD_BCAST_DATA_TC_ID)) /* To flag the task we are trying to complete as a local one */ #define PARSEC_ACTION_COMPLETE_LOCAL_TASK 0x08000000 @@ -131,6 +134,7 @@ typedef struct parsec_dtd_flow_info_s { 4 release ownership even when the flow is of type R */ parsec_dtd_tile_t *tile; + int msg_keys[MAX_RANK_INFO*sizeof(int)*8]; /* enable user trimming, store dest rank send ID for a flow */ int rank_sent_to[MAX_RANK_INFO]; /* currently support 1024 nodes */ } parsec_dtd_flow_info_t; @@ -178,6 +182,8 @@ struct parsec_dtd_task_s { parsec_thread_mempool_t *mempool_owner; int32_t rank; int32_t flow_count; + int32_t rank_bits[MAX_RANK_INFO]; + int send_id_storage[MAX_RANK_INFO*sizeof(int)*8]; /* enable user trimming, store dest rank send ID for a task, same for all the flows in that task */ /* for testing PTG inserting task in DTD */ parsec_task_t *orig_task; }; @@ -235,8 +241,12 @@ struct parsec_dtd_taskpool_s { parsec_mempool_t *hash_table_bucket_mempool; parsec_hash_table_t *task_hash_table; parsec_hash_table_t *function_h_table; + parsec_hash_table_t *keys_hash_table; /* ring of initial ready tasks */ parsec_task_t **startup_list; + int bcast_id; + int send_task_id[MAX_RANK_INFO*sizeof(int)*8]; + int recv_task_id[MAX_RANK_INFO*sizeof(int)*8]; /* from here to end is for the testing interface */ struct hook_info actual_hook[PARSEC_DTD_NB_TASK_CLASSES]; }; @@ -279,6 +289,10 @@ typedef struct parsec_dtd_common_args_s { } parsec_dtd_common_args_t; /* Function prototypes */ +int parsec_dtd_bcast_key_fn( parsec_execution_stream_t *es, parsec_task_t *this_task); + +int parsec_dtd_bcast_data_fn( parsec_execution_stream_t *es, parsec_task_t *this_task); + void parsec_detach_all_dtd_taskpool_from_context( parsec_context_t *context ); @@ -420,6 +434,9 @@ parsec_dtd_tile_retain( parsec_dtd_tile_t *tile ); void parsec_dtd_tile_release( parsec_dtd_tile_t *tile ); +int +parsec_dtd_rank_of_data(parsec_dc_t *dc, int i, int j); + int parsec_dtd_data_flush_sndrcv(parsec_execution_stream_t *es, parsec_task_t *this_task); @@ -515,6 +532,16 @@ int parsec_dtd_iterator_arg_get_size(int first_arg, void *tile, int tile_op_type, void *cb_data); +void +parsec_dtd_bcast_key_iterate_successors(parsec_execution_stream_t *es, + const parsec_task_t *this_task, + uint32_t action_mask, + parsec_ontask_function_t *ontask, + void *ontask_arg); + +void +populate_remote_deps(int* data_ptr, parsec_remote_deps_t* remote_deps); + END_C_DECLS #endif /* INSERT_FUNCTION_INTERNAL_H_HAS_BEEN_INCLUDED */ diff --git a/parsec/interfaces/superscalar/overlap_strategies.c b/parsec/interfaces/superscalar/overlap_strategies.c index d1c852797..968f9da4b 100644 --- a/parsec/interfaces/superscalar/overlap_strategies.c +++ b/parsec/interfaces/superscalar/overlap_strategies.c @@ -20,6 +20,13 @@ #include "parsec/interfaces/superscalar/insert_function_internal.h" #include "parsec/utils/debug.h" +#define MIN(x, y) ( (x)<(y)?(x):(y) ) +static inline unsigned long exponential_backoff(uint64_t k) +{ + unsigned int n = MIN( 64, k ); + unsigned int r = (unsigned int) ((double)n * ((double)rand()/(double)RAND_MAX)); + return r * 5410; +} /***************************************************************************//** * * This function makes sure that nextinline descendant is really NULL @@ -146,13 +153,14 @@ parsec_dtd_ordering_correctly( parsec_execution_stream_t *es, parsec_dtd_task_t *current_task = (parsec_dtd_task_t *)this_task; int current_dep; parsec_dtd_task_t *current_desc = NULL; - int op_type_on_current_flow, desc_op_type, desc_flow_index; + int op_type_on_current_flow, desc_op_type, desc_flow_index, cur_desc_op_type; parsec_dtd_tile_t *tile; parsec_dep_t deps; parsec_release_dep_fct_arg_t *arg = (parsec_release_dep_fct_arg_t *)ontask_arg; parsec_dep_data_description_t data; int rank_src = 0, rank_dst = 0, vpid_dst=0; + parsec_dtd_flow_info_t* flow; /* finding for which flow we need to iterate successors of */ int flow_mask = action_mask; @@ -167,6 +175,105 @@ parsec_dtd_ordering_correctly( parsec_execution_stream_t *es, if( NULL == tile ) { continue; } + + if(PARSEC_DTD_BCAST_DATA_TC_ID == current_task->super.task_class->task_class_id) { + /* for the bcast data class, in addition to release the data to local deps tasks that will read the data + * propagate the data down to descendants as well */ + //if(current_task->deps_out != NULL) { + /* we have not propagate the remote deps yet, otherwise will be set to NULL */ + if(action_mask & PARSEC_ACTION_COMPLETE_LOCAL_TASK) { + if (parsec_dtd_task_is_local(current_task)) { + if(current_task->super.locals[3].value != 10086) { + parsec_remote_deps_t *deps = NULL; + PARSEC_ALLOCATE_REMOTE_DEPS_IF_NULL(deps, this_task, MAX_PARAM_COUNT); + deps->root = rank_src; + deps->outgoing_mask |= (1 << 0); /* only 1 flow */ + deps->max_priority = 0; + + struct remote_dep_output_param_s* output = &deps->output[0]; + output->data.data = current_task->super.data[0].data_out; + output->data.arena = parsec_dtd_arenas_datatypes[FLOW_OF(current_task, current_dep)->arena_index].arena; + output->data.layout = parsec_dtd_arenas_datatypes[FLOW_OF(current_task, current_dep)->arena_index].opaque_dtt; + output->data.count = 1; + output->data.displ = 0; + output->priority = 0; + + assert(NULL != current_task->super.data[current_dep].data_out); + parsec_dtd_tile_t *tile = NULL; + parsec_key_t key = (parsec_key_t)((uintptr_t)current_task->super.locals[0].value); + int count = 1; + struct timespec rqtp; + rqtp.tv_sec = 0; + + while(tile == NULL){ + count += 1; + tile = (parsec_dtd_tile_t *)parsec_hash_table_find(parsec_bcast_keys_hash, key); + //if(count %1000 == 0)fprintf(stderr, "bcast root task %p data with global key %d tile %p on rank %d\n", current_task, current_task->ht_item.key, tile, current_task->super.taskpool->context->my_rank); + //sleep(1); + if(count == 100) { + rqtp.tv_nsec = exponential_backoff(count); + nanosleep(&rqtp, NULL); + count = 0; + fprintf(stderr, "bcast root task %p data with global key %ld tile %p on rank %d\n", current_task, key, tile, current_task->super.taskpool->context->my_rank); + sleep(1); + } + } + int* data_ptr = (int*)parsec_data_copy_get_ptr(tile->data_copy); + populate_remote_deps(data_ptr, deps); + //current_task->deps_out->output[0].data.data = + // current_task->super.data[current_dep].data_out; + (void)parsec_atomic_fetch_inc_int32(¤t_task->super.data[current_dep].data_out->readers); + parsec_remote_dep_activate( + es, (parsec_task_t *)current_task, + deps, + deps->outgoing_mask); + //current_task->deps_out = NULL; + current_task->super.locals[3].value = 10086; + } + } + } else if(action_mask & PARSEC_ACTION_RELEASE_LOCAL_DEPS) { + /* current node is part of the broadcast operation, propagate downstream */ + //int root = current_task->deps_out->root; + if(current_task->super.locals[3].value != 10086) { + parsec_release_dep_fct_arg_t* arg = (parsec_release_dep_fct_arg_t*)ontask_arg; + parsec_remote_deps_t* deps = arg->remote_deps; + int root = deps->root; + int my_rank = current_task->super.taskpool->context->my_rank; + parsec_dtd_tile_t* item = NULL; + int count = 1; + struct timespec rqtp; + rqtp.tv_sec = 0; + + while(item == NULL) { + count += 1; + item = (parsec_dtd_tile_t *)parsec_hash_table_find( parsec_bcast_keys_hash, (parsec_key_t)((uintptr_t)current_task->super.locals[0].value)); + if(count == 100){ + fprintf(stderr, "bcast data continue on rank %d, from root %d, for task %p with key %d \n", my_rank, root, current_task, current_task->super.locals[0].value); + sleep(1); + rqtp.tv_nsec = exponential_backoff(count); + nanosleep(&rqtp, NULL); + count = 0; + } + } + int* data_ptr = (int*)item->data_copy->device_private; + populate_remote_deps(data_ptr, deps); + //parsec_hash_table_nolock_remove( parsec_bcast_keys_hash, (parsec_key_t)((uintptr_t)current_task->super.locals[0].value)); + + assert(NULL != current_task->super.data[current_dep].data_out); + + //current_task->deps_out->output[0].data.data = + // current_task->super.data[0].data_out; + //(void)parsec_atomic_fetch_inc_int32(¤t_task->super.data[current_dep].data_out->readers); + parsec_remote_dep_activate( + es, (parsec_task_t *)current_task, + deps, + deps->outgoing_mask); + //current_task->deps_out = NULL; + current_task->super.locals[3].value = 10086; + } + } + //} + } /* BCAST DATA propagation */ if( FLOW_OF(current_task, current_dep)->op_type & PARSEC_DONT_TRACK ) { /* User has instructed us not to track this data */ @@ -218,6 +325,7 @@ parsec_dtd_ordering_correctly( parsec_execution_stream_t *es, #if defined(PARSEC_DEBUG_ENABLE) assert(current_desc != NULL); #endif + /* setting data */ data.data = current_task->super.data[current_dep].data_out; @@ -227,6 +335,7 @@ parsec_dtd_ordering_correctly( parsec_execution_stream_t *es, data.displ = 0; desc_op_type = ((DESC_OF(current_task, current_dep))->op_type & PARSEC_GET_OP_TYPE); + cur_desc_op_type = ((DESC_OF(current_task, current_dep))->op_type & PARSEC_GET_OP_TYPE); desc_flow_index = (DESC_OF(current_task, current_dep))->flow_index; int get_out = 0, tmp_desc_flow_index, release_parent = 0; @@ -310,17 +419,29 @@ parsec_dtd_ordering_correctly( parsec_execution_stream_t *es, deps.ctl_gather_nb = NULL; deps.task_class_id = current_desc->super.task_class->task_class_id; deps.flow = current_desc->super.task_class->in[tmp_desc_flow_index]; - deps.dep_index = 0; /* it will not be used anywhere for DTD, so whatever */ + deps.dep_index = tmp_desc_flow_index; deps.belongs_to = current_task->super.task_class->out[current_dep]; deps.direct_data = NULL; deps.dep_datatype_index = current_dep; rank_dst = current_desc->rank; - ontask( es, (parsec_task_t *)current_desc, (parsec_task_t *)current_task, - &deps, &data, rank_src, rank_dst, vpid_dst, ontask_arg ); + //if((PARSEC_DTD_BCAST_DATA_TC_ID != current_task->super.task_class->task_class_id) || (PARSEC_OUTPUT == ((DESC_OF(current_task, current_dep))->op_type & PARSEC_GET_OP_TYPE) || PARSEC_INOUT == ((DESC_OF(current_task, current_dep))->op_type & PARSEC_GET_OP_TYPE))) { + ontask( es, (parsec_task_t *)current_desc, (parsec_task_t *)current_task, + &deps, &data, rank_src, rank_dst, vpid_dst, ontask_arg ); + //} vpid_dst = (vpid_dst+1) % current_task->super.taskpool->context->nb_vp; +#if defined(DISTRIBUTED) + if( (action_mask & PARSEC_ACTION_COMPLETE_LOCAL_TASK) && (NULL != arg->remote_deps) ) { + //if((PARSEC_DTD_BCAST_DATA_TC_ID != current_task->super.task_class->task_class_id) || (PARSEC_OUTPUT == (cur_desc_op_type & PARSEC_GET_OP_TYPE) || PARSEC_INOUT == (cur_desc_op_type & PARSEC_GET_OP_TYPE))) { + (void)parsec_atomic_fetch_inc_int32(¤t_task->super.data[current_dep].data_out->readers); + parsec_remote_dep_activate(es, (parsec_task_t *)current_task, arg->remote_deps, arg->remote_deps->outgoing_mask); + arg->remote_deps = NULL; + //} + } +#endif + /* releasing remote tasks that is a descendant of a local task */ if(action_mask & PARSEC_ACTION_RELEASE_LOCAL_DEPS) { if( parsec_dtd_task_is_remote(current_desc) && parsec_dtd_task_is_local(current_task) ) { @@ -332,15 +453,10 @@ parsec_dtd_ordering_correctly( parsec_execution_stream_t *es, } } } + + cur_desc_op_type = desc_op_type; } while (0 == get_out); -#if defined(DISTRIBUTED) - if( (action_mask & PARSEC_ACTION_COMPLETE_LOCAL_TASK) && (NULL != arg->remote_deps) ) { - (void)parsec_atomic_fetch_inc_int32(¤t_task->super.data[current_dep].data_out->readers); - parsec_remote_dep_activate(es, (parsec_task_t *)current_task, arg->remote_deps, arg->remote_deps->outgoing_mask); - arg->remote_deps = NULL; - } -#endif } } } diff --git a/parsec/interfaces/superscalar/parsec_dtd_broadcast.c b/parsec/interfaces/superscalar/parsec_dtd_broadcast.c new file mode 100644 index 000000000..2b6f15bd5 --- /dev/null +++ b/parsec/interfaces/superscalar/parsec_dtd_broadcast.c @@ -0,0 +1,268 @@ +/** + * Copyright (c) 2013-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + */ + +#include "parsec/parsec_config.h" +#include "parsec/utils/mca_param.h" +#include "parsec/interfaces/superscalar/insert_function_internal.h" + +#ifdef PARSEC_DTD_DIST_COLLECTIVES + +void +populate_remote_deps(int* data_ptr, parsec_remote_deps_t* remote_deps) +{ + struct remote_dep_output_param_s* output = &remote_deps->output[0]; + int _array_pos, _array_mask; + uint32_t dest_rank_idx; + /* TODO: don't assume the length of data_ptr */ + int num_dest_ranks = data_ptr[600]; + for(dest_rank_idx = 0; dest_rank_idx < (uint32_t)num_dest_ranks; ++dest_rank_idx) { + uint32_t dest_rank = data_ptr[600+dest_rank_idx+1]; + _array_pos = dest_rank / (8 * sizeof(uint32_t)); + _array_mask = 1 << (dest_rank % (8 * sizeof(uint32_t))); + + if( !(output->rank_bits[_array_pos] & _array_mask) ) { + output->rank_bits[_array_pos] |= _array_mask; + output->count_bits++; + } + } +} + +/* when the comm_coll_bcast is 1 we use the chain topology, get the successor's rank */ +int +get_chain_successor(parsec_execution_stream_t*es, parsec_task_t* task, parsec_remote_deps_t* remote_deps) +{ + (void)task; + int my_idx, idx, current_mask; + unsigned int array_index, count, bit_index; + uint32_t boffset; + uint32_t dep_fw_mask[es->virtual_process->parsec_context->remote_dep_fw_mask_sizeof]; + memset(dep_fw_mask, 0, es->virtual_process->parsec_context->remote_dep_fw_mask_sizeof); + memcpy(&dep_fw_mask, remote_deps->remote_dep_fw_mask, es->virtual_process->parsec_context->remote_dep_fw_mask_sizeof); + struct remote_dep_output_param_s* output = &remote_deps->output[0]; + boffset = remote_deps->root / (8 * sizeof(uint32_t)); + dep_fw_mask[boffset] |= ((uint32_t)1) << (remote_deps->root % (8 * sizeof(uint32_t))); + my_idx = (remote_deps->root == es->virtual_process->parsec_context->my_rank) ? 0 : -1; + idx = 0; + for(array_index = count = 0; count < remote_deps->output[0].count_bits; array_index++) { + current_mask = output->rank_bits[array_index]; + if( 0 == current_mask ) continue; + for( bit_index = 0; current_mask != 0; bit_index++ ) { + if( !(current_mask & (1 << bit_index)) ) continue; + int rank = (array_index * sizeof(uint32_t) * 8) + bit_index; + current_mask ^= (1 << bit_index); + count++; + + boffset = rank / (8 * sizeof(uint32_t)); + idx++; + if(my_idx == -1) { + if(rank == es->virtual_process->parsec_context->my_rank) { + my_idx = idx; + } + boffset = rank / (8 * sizeof(uint32_t)); + dep_fw_mask[boffset] |= ((uint32_t)1) << (rank % (8 * sizeof(uint32_t))); + continue; + } + if(my_idx != -1){ + if(idx == my_idx+1) + { + return rank; + } + } + } + } + return -1; +} + +void +parsec_dtd_bcast_key_iterate_successors(parsec_execution_stream_t *es, + const parsec_task_t *this_task, + uint32_t action_mask, + parsec_ontask_function_t *ontask, + void *ontask_arg) +{ + parsec_dtd_task_t *current_task = (parsec_dtd_task_t *)this_task; + int current_dep; + parsec_dtd_tile_t *tile; + + parsec_dep_t deps; + parsec_dep_data_description_t data; + int vpid_dst=0; + + /* finding for which flow we need to iterate successors of */ + int flow_mask = action_mask; + int my_rank = current_task->super.taskpool->context->my_rank; + int successor = -1; + + int rc; /* retrive the mca number for comm_coll_bcast */ + int comm_coll_bcast; /* retrive the value set for comm_coll_bcast */ + if (0 < (rc = parsec_mca_param_find("runtime", NULL, "comm_coll_bcast")) ) { + parsec_mca_param_lookup_int(rc, &comm_coll_bcast); + } + for( current_dep = 0; current_dep < current_task->super.task_class->nb_flows; current_dep++ ) { + if( (flow_mask & (1<root = my_rank; + deps->outgoing_mask |= (1 << 0); /* only 1 flow */ + deps->max_priority = 0; + + struct remote_dep_output_param_s* output = &deps->output[0]; + output->data.data = current_task->super.data[0].data_out;//NULL; + output->data.arena = parsec_dtd_arenas_datatypes[15].arena; + output->data.layout = parsec_dtd_arenas_datatypes[15].opaque_dtt; + output->data.count = 1; + output->data.displ = 0; + output->priority = 0; + int* data_ptr = (int*)parsec_data_copy_get_ptr(current_task->super.data[0].data_out); + //successor = get_chain_successor(es, (parsec_task_t*)current_task, deps); + //current_task->super.locals[0].value = current_task->ht_item.key = ((1<<29) |(my_rank << 20) | *(data_ptr+1+successor)); + tile = FLOW_OF(current_task, current_dep)->tile; + parsec_dtd_tile_retain(tile); + populate_remote_deps(data_ptr, deps); + parsec_remote_dep_activate( + es, (parsec_task_t *)current_task, + deps, + deps->outgoing_mask); + //current_task->deps_out = NULL; + /* decrease the count as in the data flush */ + parsec_dtd_release_local_task( current_task ); + } else if (action_mask & PARSEC_ACTION_RELEASE_LOCAL_DEPS) { + /* a node in the key array propagation */ + parsec_release_dep_fct_arg_t* arg = (parsec_release_dep_fct_arg_t*)ontask_arg; + parsec_remote_deps_t* deps = arg->remote_deps; + int root = deps->root; + int my_rank = current_task->super.taskpool->context->my_rank; + + int _array_pos, _array_mask; + struct remote_dep_output_param_s* output; + output = &deps->output[0]; + _array_pos = my_rank / (8 * sizeof(uint32_t)); + _array_mask = 1 << (my_rank % (8 * sizeof(uint32_t))); + + /* We are part of the broadcast, forward message */ + int* data_ptr = (int*)parsec_data_copy_get_ptr(current_task->super.data[0].data_out); + int* buffer = malloc(sizeof(int)*50*50); + memcpy(buffer, data_ptr, sizeof(int)*50*50); + populate_remote_deps(data_ptr, deps); + //successor = get_chain_successor(es, (parsec_task_t*)current_task, current_task->deps_out); + if(successor == -1) { + //current_task->deps_out->outgoing_mask = 0; + } + //current_task->super.locals[0].value = current_task->ht_item.key = ((1<<29) | (root << 20) | *(data_ptr+1+successor)); + assert(NULL != current_task->super.data[current_dep].data_out); + + //current_task->deps_out->output[0].data.data = current_task->super.data[0].data_out; + //parsec_dtd_retain_data_copy(current_task->super.data[current_dep].data_out); + parsec_remote_dep_activate( + es, (parsec_task_t *)current_task, + deps, + deps->outgoing_mask); + //current_task->deps_out = NULL; + /* update the BCAST DATA task or dep with the global ID that we know now */ + uint64_t key = ((uint64_t)(1<<28 | (root << 18 ) | data_ptr[es->virtual_process->parsec_context->my_rank+1])<<32) | (1U<<0); + uint64_t key2 = ((uint64_t)(data_ptr[0])<<32) | (1U<<0); + + parsec_dtd_task_t* dtd_task = NULL; + parsec_dtd_taskpool_t *tp = (parsec_dtd_taskpool_t *)current_task->super.taskpool; + parsec_hash_table_lock_bucket(tp->task_hash_table, (parsec_key_t)key); + dtd_task = parsec_dtd_find_task(tp, key); + + // store the meta data info into the key hash table + dtd_hash_table_pointer_item_t *item = (dtd_hash_table_pointer_item_t *)parsec_thread_mempool_allocate(tp->hash_table_bucket_mempool->thread_mempools); + parsec_hash_table_t *hash_table = tp->keys_hash_table; + item->ht_item.key = (parsec_key_t)key; + item->mempool_owner = tp->hash_table_bucket_mempool->thread_mempools; + item->value = (void *)buffer; + parsec_hash_table_insert( hash_table, &item->ht_item ); + + //parsec_dtd_tile_t* bcast_keys = (parsec_dtd_tile_t *) parsec_thread_mempool_allocate( parsec_bcast_keys_tile_mempool->thread_mempools ); + parsec_dtd_tile_t* bcast_keys = (parsec_dtd_tile_t *)malloc(sizeof(parsec_dtd_tile_t)); + bcast_keys->ht_item.key = (parsec_key_t)((uintptr_t)buffer[0]); + bcast_keys->data_copy = PARSEC_OBJ_NEW(parsec_data_copy_t); + bcast_keys->data_copy->device_private = (void *)buffer; + parsec_hash_table_insert( parsec_bcast_keys_hash, &bcast_keys->ht_item ); + parsec_mfence(); /* Write */ + //fprintf(stderr, "insert into parsec_bcast_keys_hash the item %p key %d with value pointer %p on rank %d\n", bcast_keys, buffer[0], buffer, es->virtual_process->parsec_context->my_rank); + + if(dtd_task != NULL) { + parsec_hash_table_lock_bucket(tp->task_hash_table, (parsec_key_t)key2); + parsec_remote_deps_t *dep = parsec_dtd_find_task(tp, key2); + + //populate_remote_deps(data_ptr, dtd_task->deps_out); + parsec_dtd_untrack_task(tp, key); + if(dep == NULL){ + dtd_task->super.locals[0].value = data_ptr[0]; + parsec_dtd_track_task(tp, key2, dtd_task); + }else{ + + dtd_task->super.locals[0].value = data_ptr[0]; + parsec_dtd_untrack_remote_dep(tp, key2); + parsec_dtd_track_task(tp, key2, dtd_task); + remote_dep_dequeue_delayed_dep_release(dep); + } + parsec_hash_table_unlock_bucket(tp->task_hash_table, (parsec_key_t)key2); + } + parsec_hash_table_unlock_bucket(tp->task_hash_table, (parsec_key_t)key); + tile = FLOW_OF(current_task, current_dep)->tile; + parsec_dtd_tile_retain(tile); + } else { + /* on the receiver side, get datatype to aquire datatype, arena etc info */ + data.data = current_task->super.data[current_dep].data_out; + data.arena = parsec_dtd_arenas_datatypes[FLOW_OF(current_task, current_dep)->arena_index].arena; + data.layout = parsec_dtd_arenas_datatypes[FLOW_OF(current_task, current_dep)->arena_index].opaque_dtt; + data.count = 1; + data.displ = 0; + deps.cond = NULL; + deps.ctl_gather_nb = NULL; + deps.flow = current_task->super.task_class->out[current_dep]; + deps.dep_index = 0; + deps.belongs_to = current_task->super.task_class->out[current_dep]; + deps.direct_data = NULL; + deps.dep_datatype_index = current_dep; + ontask( es, (parsec_task_t *)current_task, (parsec_task_t *)current_task, + &deps, &data, current_task->rank, my_rank, vpid_dst, ontask_arg ); + } + } + } +} + +/* **************************************************************************** */ +/** + * Body of bcast key task we insert that will propagate the key array + * empty body! + * + * @param context, this_task + * + * @ingroup DTD_INTERFACE_INTERNAL + */ +int +parsec_dtd_bcast_key_fn( parsec_execution_stream_t *es, parsec_task_t *this_task) +{ + (void)es; (void)this_task; + //fprintf(stderr, "executed the body of bcast key fn\n"); + return PARSEC_HOOK_RETURN_DONE; +} + +/* **************************************************************************** */ +/** + * Body of bcast task we insert that will propagate the data tile we are broadcasting + * empty body! + * + * @param context, this_task + * + * @ingroup DTD_INTERFACE_INTERNAL + */ +int +parsec_dtd_bcast_data_fn( parsec_execution_stream_t *es, parsec_task_t *this_task) +{ + (void)es; (void)this_task; + //fprintf(stderr, "executed the body of bcast data fn\n"); + return PARSEC_HOOK_RETURN_DONE; +} + +#endif diff --git a/parsec/interfaces/superscalar/parsec_dtd_data_flush.c b/parsec/interfaces/superscalar/parsec_dtd_data_flush.c index e93d21e76..5c6685f49 100644 --- a/parsec/interfaces/superscalar/parsec_dtd_data_flush.c +++ b/parsec/interfaces/superscalar/parsec_dtd_data_flush.c @@ -40,6 +40,7 @@ parsec_dtd_data_flush_sndrcv(parsec_execution_stream_t *es, assert(tile != NULL); + //fprintf(stderr, "completed data flush task on rank %d\n", current_task->rank); #if defined(DISTRIBUTED) if(tile->rank == current_task->rank) { /* this is a receive task*/ if( current_task->super.data[0].data_in != tile->data_copy ) { @@ -185,6 +186,27 @@ parsec_insert_dtd_flush_task(parsec_dtd_task_t *this_task, parsec_dtd_tile_t *ti this_task, flow_index, last_user.op_type, tile_op_type, last_user.alive); + if(last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + /* local parent and we are inserting a remote task, indicates it needs to send data */ + if(parsec_dtd_task_is_local(last_writer.task) && parsec_dtd_task_is_remote(this_task)) + { + int _array_pos, _array_mask; + _array_pos = this_task->rank / (8 * sizeof(int)); + _array_mask = 1 << (this_task->rank % (8 * sizeof(int))); + if(last_writer.task->rank_bits[_array_pos] & _array_mask) + { + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = last_writer.task->send_id_storage[this_task->rank]; + } else + { + last_writer.task->rank_bits[_array_pos] |= _array_mask; + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = dtd_tp->send_task_id[this_task->rank]++; + last_writer.task->send_id_storage[this_task->rank] = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + } + } + } else { + /* do nothing */ + } } else { parsec_dtd_set_parent(last_writer.task, last_writer.flow_index, this_task, flow_index, last_writer.op_type, @@ -193,6 +215,27 @@ parsec_insert_dtd_flush_task(parsec_dtd_task_t *this_task, parsec_dtd_tile_t *ti this_task, flow_index, (PARENT_OF(this_task, flow_index))->op_type, tile_op_type, last_user.alive); + if(last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + last_writer.task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + /* local parent and we are inserting a remote task, indicates it needs to send data */ + if(parsec_dtd_task_is_local(last_writer.task) && parsec_dtd_task_is_remote(this_task)) + { + int _array_pos, _array_mask; + _array_pos = this_task->rank / (8 * sizeof(int)); + _array_mask = 1 << (this_task->rank % (8 * sizeof(int))); + if(last_writer.task->rank_bits[_array_pos] & _array_mask) + { + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = last_writer.task->send_id_storage[this_task->rank]; + } else + { + last_writer.task->rank_bits[_array_pos] |= _array_mask; + FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank] = dtd_tp->send_task_id[this_task->rank]++; + last_writer.task->send_id_storage[this_task->rank] = FLOW_OF(last_writer.task, last_writer.flow_index)->msg_keys[this_task->rank]; + } + } + } else { + /* do nothing */ + } parsec_dtd_task_t *parent_task = (PARENT_OF(this_task, flow_index))->task; if( parsec_dtd_task_is_local(parent_task) || parsec_dtd_task_is_local(this_task) ) { diff --git a/parsec/remote_dep.c b/parsec/remote_dep.c index ade071b03..a404e4a10 100644 --- a/parsec/remote_dep.c +++ b/parsec/remote_dep.c @@ -169,6 +169,8 @@ inline parsec_remote_deps_t* remote_deps_allocate( parsec_lifo_t* lifo ) } /* fw_mask immediatly follows outputs */ remote_deps->remote_dep_fw_mask = (uint32_t*) ptr; + remote_deps->bcast_flag = 0; /* default this dep is not for bcast */ + memset(remote_deps->bcast_keys, 0, sizeof(uint32_t)*16); assert( (int)(ptr - (char*)remote_deps) == (int)(parsec_remote_dep_context.elem_size - rank_bit_size)); } else { @@ -210,6 +212,9 @@ inline void remote_deps_free(parsec_remote_deps_t* deps) memset( &deps->msg, 0, sizeof(remote_dep_wire_activate_t) ); #endif deps->taskpool = NULL; + //memset( &deps->msg, 0, sizeof(remote_dep_wire_activate_t) ); + deps->bcast_flag = 0; /* default this dep is not for bcast */ + memset(deps->bcast_keys, 0, sizeof(uint32_t)*16); parsec_lifo_push(deps->origin, (parsec_list_item_t*)deps); PARSEC_VALGRIND_MEMPOOL_FREE(deps->origin, ((unsigned char *)deps)+sizeof(parsec_list_item_t)); } @@ -447,7 +452,7 @@ int parsec_remote_dep_activate(parsec_execution_stream_t* es, uint32_t propagation_mask) { const parsec_task_class_t* tc = task->task_class; - int i, my_idx, idx, current_mask, keeper = 0; + int i, my_idx, idx, current_mask, keeper = 0, child_count = 0; unsigned int array_index, count, bit_index; struct remote_dep_output_param_s* output; @@ -530,12 +535,37 @@ int parsec_remote_dep_activate(parsec_execution_stream_t* es, int remote_dep_bcast_child_permits = 0; /* Right now DTD only supports a star broadcast topology */ if( PARSEC_TASKPOOL_TYPE_DTD == task->taskpool->taskpool_type ) { - remote_dep_bcast_child_permits = remote_dep_bcast_star_child(my_idx, idx); + parsec_dtd_task_t *this_dtd_task = (parsec_dtd_task_t *) task; + if(this_dtd_task->super.task_class->task_class_id != PARSEC_DTD_BCAST_KEY_TC_ID && + this_dtd_task->super.task_class->task_class_id != PARSEC_DTD_BCAST_DATA_TC_ID) { + remote_deps->msg.locals[0].value = remote_deps->bcast_keys[i]; /* p2p, update the key for this message */ + remote_dep_bcast_child_permits = remote_dep_bcast_star_child(my_idx, idx); + } else { + /* TODO: when the parent is BCAST_DATA tc, but we need to do a p2p to remote write */ + if(this_dtd_task->super.task_class->task_class_id == PARSEC_DTD_BCAST_DATA_TC_ID && remote_deps->bcast_keys[15]) { + remote_deps->msg.locals[0].value = remote_deps->bcast_keys[i]; /* p2p, update the key for this message */ + remote_dep_bcast_child_permits = remote_dep_bcast_star_child(my_idx, idx); + } else { + remote_dep_bcast_child_permits = remote_dep_bcast_child(my_idx, idx); + } + } } else { remote_dep_bcast_child_permits = remote_dep_bcast_child(my_idx, idx); } if(remote_dep_bcast_child_permits) { + if( PARSEC_TASKPOOL_TYPE_DTD == task->taskpool->taskpool_type ) { + parsec_dtd_task_t *this_dtd_task = (parsec_dtd_task_t *) task; + if(this_dtd_task->super.task_class->task_class_id == PARSEC_DTD_BCAST_KEY_TC_ID) { + int* data_ptr = (int*)parsec_data_copy_get_ptr(this_dtd_task->super.data[0].data_out); + this_dtd_task->super.locals[4+child_count*2].value = rank; + this_dtd_task->super.locals[4+child_count*2+1].value = this_dtd_task->ht_item.key = ((1<<29) |(remote_deps->root << 18) | *(data_ptr+1+rank)); + remote_deps->msg.locals[4+child_count*2].value = this_dtd_task->super.locals[4+child_count*2].value; + remote_deps->msg.locals[4+child_count*2+1].value = this_dtd_task->super.locals[4+child_count*2+1].value; + child_count += 1; + //fprintf(stderr, "for remote_dep %p update key in activate to %d\n", remote_deps, this_dtd_task->super.locals[rank].value); + } + } PARSEC_DEBUG_VERBOSE(20, parsec_comm_output_stream, "[%d:%d] task %s my_idx %d idx %d rank %d -- send (%x)", remote_deps->root, i, tmp, my_idx, idx, rank, remote_deps->outgoing_mask); assert(remote_deps->outgoing_mask & (1U<virtual_process->parsec_context->my_rank, remote_deps->msg.locals[0].value, my_idx, idx); assert(output->parent->taskpool == task->taskpool); if( 0 == parsec_atomic_fetch_inc_int32(&remote_deps->pending_ack) ) { keeper = 1; @@ -560,6 +591,9 @@ int parsec_remote_dep_activate(parsec_execution_stream_t* es, */ (void)parsec_atomic_fetch_inc_int32(&remote_deps->pending_ack); } + //if(PARSEC_TASKPOOL_TYPE_DTD == task->taskpool->taskpool_type && task->task_class->task_class_id == 2) + //remote_dep_inc_flying_messages(task->taskpool); + //(void)parsec_atomic_fetch_inc_int32(&remote_deps->pending_ack); remote_dep_send(es, rank, remote_deps); } else { PARSEC_DEBUG_VERBOSE(20, parsec_comm_output_stream, "[%d:%d] task %s my_idx %d idx %d rank %d -- skip (not my direct descendant)", diff --git a/parsec/remote_dep.h b/parsec/remote_dep.h index 68398aba5..bd85e8772 100644 --- a/parsec/remote_dep.h +++ b/parsec/remote_dep.h @@ -91,6 +91,8 @@ struct parsec_remote_deps_s { int32_t priority; uint32_t *remote_dep_fw_mask; /**< list of peers already notified about * the control sequence (only used for control messages) */ + int32_t bcast_flag; + uint32_t bcast_keys[16]; struct data_repo_entry_s *repo_entry; struct remote_dep_output_param_s output[1]; }; diff --git a/parsec/remote_dep_mpi.c b/parsec/remote_dep_mpi.c index fc130e098..23a62d5b5 100644 --- a/parsec/remote_dep_mpi.c +++ b/parsec/remote_dep_mpi.c @@ -97,7 +97,7 @@ static size_t parsec_param_eager_limit = RDEP_MSG_EAGER_LIMIT; */ static size_t parsec_param_eager_limit = 0; #endif /* RDEP_MSG_EAGER_LIMIT != 0 */ -static int parsec_param_enable_aggregate = 1; +static int parsec_param_enable_aggregate = 0; #if defined(PARSEC_HAVE_MPI_OVERTAKE) static int parsec_param_enable_mpi_overtake = 1; #endif @@ -730,7 +730,6 @@ remote_dep_get_datatypes(parsec_execution_stream_t* es, parsec_dtd_task_t *dtd_task = NULL; dtd_tp = (parsec_dtd_taskpool_t *)origin->taskpool; - /* if( NULL == task.task_class ), this case will be taken care of automatically */ /* We need to convert from a dep_datatype_index mask into a dep_index @@ -742,7 +741,19 @@ remote_dep_get_datatypes(parsec_execution_stream_t* es, for(k = 0; origin->msg.output_mask>>k; k++) { if(!(origin->msg.output_mask & (1U<msg.locals[0].value<<32 | (1U<msg.task_class_id == PARSEC_DTD_BCAST_KEY_TC_ID) { + for(int idx = 4; idx < 15; idx += 2) { + if(origin->msg.locals[idx].value == es->virtual_process->parsec_context->my_rank) { + key = (uint64_t)origin->msg.locals[idx+1].value<<32 | (1U<msg.locals[es->virtual_process->parsec_context->my_rank].value, k, key); + origin->msg.locals[0].value = origin->msg.locals[idx+1].value; + break; + } + } + } else { + key = (uint64_t)origin->msg.locals[0].value<<32 | (1U<msg.locals[0].value, k, key); /* AM buffers are reused by the comm engine once the activation * has been conveyed to upper layer. In case of DTD we might receive msg to @@ -896,9 +908,24 @@ remote_dep_release_incoming(parsec_execution_stream_t* es, } PARSEC_DEBUG_VERBOSE(20, parsec_comm_output_stream, "MPI:\tTranslate mask from 0x%lx to 0x%x (remote_dep_release_incoming)", complete_mask, action_mask); - (void)task.task_class->release_deps(es, &task, - action_mask | PARSEC_ACTION_RELEASE_LOCAL_DEPS, - NULL); + if( PARSEC_TASKPOOL_TYPE_DTD == origin->taskpool->taskpool_type ) { + if(task.task_class->task_class_id == PARSEC_DTD_BCAST_KEY_TC_ID || + task.task_class->task_class_id == PARSEC_DTD_BCAST_DATA_TC_ID) { + remote_dep_inc_flying_messages(origin->taskpool); + (void)parsec_atomic_fetch_inc_int32(&origin->pending_ack); + (void)task.task_class->release_deps(es, &task, + action_mask | PARSEC_ACTION_RELEASE_LOCAL_DEPS, + origin); + } else { + (void)task.task_class->release_deps(es, &task, + action_mask | PARSEC_ACTION_RELEASE_LOCAL_DEPS, + NULL); + } + } else { + (void)task.task_class->release_deps(es, &task, + action_mask | PARSEC_ACTION_RELEASE_LOCAL_DEPS, + NULL); + } assert(0 == (origin->incoming_mask & complete_mask)); if(0 != origin->incoming_mask) /* not done receiving */ @@ -912,11 +939,14 @@ remote_dep_release_incoming(parsec_execution_stream_t* es, * references on the allocated data and on the dependency. */ uint32_t mask = origin->outgoing_mask; - origin->outgoing_mask = 0; + //origin->outgoing_mask = 0; #if defined(PARSEC_DIST_COLLECTIVES) if( PARSEC_TASKPOOL_TYPE_PTG == origin->taskpool->taskpool_type ) /* indicates it is a PTG taskpool */ + { + origin->outgoing_mask = 0; parsec_remote_dep_propagate(es, &task, origin); + } #endif /* PARSEC_DIST_COLLECTIVES */ /** * Release the dependency owned by the communication engine for all data @@ -932,7 +962,13 @@ remote_dep_release_incoming(parsec_execution_stream_t* es, if(PARSEC_TASKPOOL_TYPE_PTG == origin->taskpool->taskpool_type) { remote_dep_complete_and_cleanup(&origin, 1); } else { - remote_deps_free(origin); + /* if it is bcast keys or bcast data */ + if(origin->msg.task_class_id == 1 || origin->msg.task_class_id == 2) { + remote_dep_complete_and_cleanup(&origin, 1); + } else { + origin->outgoing_mask = 0; + remote_deps_free(origin); + } } #else remote_deps_free(origin); @@ -1859,6 +1895,10 @@ remote_dep_mpi_save_put_cb(parsec_execution_stream_t* es, assert(0 != deps->pending_ack); assert(0 != deps->outgoing_mask); item->priority = deps->max_priority; + + PARSEC_DEBUG_VERBOSE(20, parsec_comm_output_stream, "MPI: SAVE PUT CB for %s from %d tag %u which 0x%x (deps %p)", + remote_dep_cmd_to_string(&deps->msg, tmp, MAX_TASK_STRLEN), item->cmd.activate.peer, + task->tag, task->output_mask, (void*)deps); /* Get the highest priority PUT operation */ parsec_list_nolock_push_sorted(&dep_put_fifo, (parsec_list_item_t*)item, dep_cmd_prio); @@ -1955,6 +1995,8 @@ remote_dep_mpi_put_end_cb(parsec_execution_stream_t* es, DEBUG_MARK_DTA_MSG_END_SEND(status->MPI_TAG); TAKE_TIME(MPIsnd_prof, MPI_Data_plds_ek, cb->idx); remote_dep_complete_and_cleanup(&deps, 1); + //if(deps != NULL) + // remote_dep_complete_and_cleanup(&deps, 1); parsec_comm_puts--; (void)es; return 0; @@ -2118,6 +2160,10 @@ remote_dep_mpi_save_activate_cb(parsec_execution_stream_t* es, &deps->msg, dep_count, dep_dtt, dep_comm); deps->from = status->MPI_SOURCE; + //if(es->virtual_process->parsec_context->my_rank == 1){ + // fprintf(stderr, "save activate cb with value %d\n", deps->msg.locals[0].value); + //} + /* Retrieve the data arenas and update the msg.incoming_mask to reflect * the data we should be receiving from the predecessor. */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9254ea8d3..3447c4236 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -71,6 +71,8 @@ target_ptg_sources(complex_deps PRIVATE "complex_deps.jdf") if( MPI_C_FOUND ) parsec_addtest(C multichain) target_ptg_sources(multichain PRIVATE "multichain.jdf") + parsec_addtest(C ptg_bcast) + target_ptg_sources(ptg_bcast PRIVATE "ptg_bcast.jdf") endif( MPI_C_FOUND ) parsec_addtest(C compose "compose.c") diff --git a/tests/interfaces/superscalar/CMakeLists.txt b/tests/interfaces/superscalar/CMakeLists.txt index ef91ab984..78d117659 100644 --- a/tests/interfaces/superscalar/CMakeLists.txt +++ b/tests/interfaces/superscalar/CMakeLists.txt @@ -20,6 +20,8 @@ parsec_addtest(C dtd_test_data_flush "dtd_test_data_flush.c;${COMMON_DATA}") parsec_addtest(C dtd_test_global_id_for_dc_assumed "dtd_test_global_id_for_dc_assumed.c;${COMMON_DATA}") parsec_addtest(C dtd_test_explicit_task_creation "dtd_test_explicit_task_creation.c;${COMMON_DATA}") parsec_addtest(C dtd_test_tp_enqueue_dequeue "dtd_test_tp_enqueue_dequeue.c") +parsec_addtest(C dtd_test_broadcast_collective "dtd_test_broadcast_collective.c") +parsec_addtest(C dtd_test_broadcast_p2p "dtd_test_broadcast_p2p.c") # # Shared Memory Testings diff --git a/tests/interfaces/superscalar/dtd_test_broadcast_collective.c b/tests/interfaces/superscalar/dtd_test_broadcast_collective.c new file mode 100644 index 000000000..e8f585194 --- /dev/null +++ b/tests/interfaces/superscalar/dtd_test_broadcast_collective.c @@ -0,0 +1,309 @@ +#include "mpi.h" + +#include "parsec.h" +#include "parsec/arena.h" +#include "parsec/data_dist/matrix/matrix.h" +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" +#include "parsec/remote_dep.h" +#include "parsec/data_internal.h" +#include "parsec/interfaces/superscalar/insert_function_internal.h" +#include "parsec/interfaces/superscalar/insert_function.h" + +#include +#include + +enum regions + { + TILE_FULL, + TILE_BCAST + }; + +parsec_tiled_matrix_dc_t *create_and_distribute_data(int rank, int world, int mb, int mt) +{ + two_dim_block_cyclic_t *m = (two_dim_block_cyclic_t*)malloc(sizeof(two_dim_block_cyclic_t)); + two_dim_block_cyclic_init(m, matrix_ComplexDouble, matrix_Tile, + rank, + mb, 1, + mt*mb, 1, + 0, 0, + mt*mb, 1, + world, 1, + 1, 1, + 0, 0); + + m->mat = parsec_data_allocate((size_t)m->super.nb_local_tiles * + (size_t)m->super.bsiz * + (size_t)parsec_datadist_getsizeoftype(m->super.mtype)); + + return (parsec_tiled_matrix_dc_t*)m; +} + +void free_data(parsec_tiled_matrix_dc_t *d) +{ + parsec_matrix_destroy_data(d); + parsec_data_collection_destroy(&d->super); + free(d); +} + +// Read data +int read_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task ) { + (void)es; + + // INPUT data + int *val_in; + // Task rank + int dest_rank; + + parsec_dtd_unpack_args(this_task, &val_in, &dest_rank); + + int myrank; + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + printf("[read_task] rank = %d, val_in = %d\n", myrank, *val_in); + + return PARSEC_HOOK_RETURN_DONE; +} + +// Write data +int write_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task) { + (void)es; + + // INOUT data + double *val_in; + // Value to set the data to + double data_value; + // Task rank + int dest_rank; + + parsec_dtd_unpack_args(this_task, &val_in, &dest_rank, &data_value); + + int myrank; + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + //sleep(1); + //printf("[write_task] rank = %d, data_value = %f\n", myrank, data_value); + + *val_in = data_value; + + return PARSEC_HOOK_RETURN_DONE; +} + +// Retrieve value associated with input data_copy for verification. +int retrieve_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task ) { + (void)es; + + int myrank = -1; + // INPUT data + double *val_in; + // Task rank + int dest_rank; + + double *val_out; + + parsec_dtd_unpack_args(this_task, &val_in, &dest_rank, &val_out); + + /* int myrank; */ + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + //printf("[read_task] rank = %d, val_in = %f\n", myrank, *val_in); + + //*val_out = *val_in; + + return PARSEC_HOOK_RETURN_DONE; +} + + +int dummy_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task) { + (void)es;(void)this_task; + + return PARSEC_HOOK_RETURN_DONE; +} + +int test_broadcast_mixed( + int world, int myrank, parsec_context_t* parsec_context, int root, int num_elem) { + + // Test return value: + // - 0: success + // - Failure otherwise + int ret = 0; + + // Error code return by parsec routines + int perr; + + // Tile size + int nb = 1; + int nb_bcast = 30; + // Total number of tiles + int nt = 1; + int data_value = 0; + + //sleep(40); + //number of elements per tile + nb = num_elem; + // few tiles per node + nt = world; + double starttime, endtime; + + parsec_taskpool_t *dtd_tp = parsec_dtd_taskpool_new(); + + parsec_matrix_add2arena_rect( + &parsec_dtd_arenas_datatypes[TILE_FULL], + parsec_datatype_double_t, + nb, 1, nb); + + // Initial value on the root node. All node should have this value + // at the end of the operation. + double data_root = 55; + + + + parsec_tiled_matrix_dc_t *dcA; + dcA = create_and_distribute_data(myrank, world, nb, nt); + parsec_data_collection_set_key((parsec_data_collection_t *)dcA, "A"); + + parsec_data_collection_t *A = (parsec_data_collection_t *)dcA; + two_dim_block_cyclic_t *__dcA = dcA; + parsec_dtd_data_collection_init(A); + + parsec_data_copy_t *parsec_data_copy; + parsec_data_t *parsec_data; + // Pointer to local tile data + double *data_ptr; + // Local tile key + int key; + + key = A->data_key(A, myrank, 0); + parsec_data = A->data_of_key(A, key); + parsec_data_copy = parsec_data_get_copy(parsec_data, 0); + data_ptr = (double*)parsec_data_copy_get_ptr(parsec_data_copy); + + // Registering the dtd_handle with PARSEC context + perr = parsec_context_add_taskpool( parsec_context, dtd_tp ); + PARSEC_CHECK_ERROR(perr, "parsec_context_add_taskpool"); + + perr = parsec_context_start(parsec_context); + PARSEC_CHECK_ERROR(perr, "parsec_context_start"); + + MPI_Barrier(MPI_COMM_WORLD); + starttime = MPI_Wtime(); + // Initialize tiles + if( root == myrank ) { + parsec_task_t *root_task = parsec_dtd_taskpool_create_task( + dtd_tp, write_task_fn, 0, "root_task", + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, myrank, 0), PARSEC_INOUT | TILE_FULL, + sizeof(int), &myrank, PARSEC_VALUE | PARSEC_AFFINITY, + sizeof(double*), &data_root, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + parsec_dtd_task_t *dtd_root_task = (parsec_dtd_task_t *)root_task; + parsec_insert_dtd_task(dtd_root_task); + } + + // Key of tile associated with root node + int key_root; + parsec_dtd_tile_t* dtd_tile_root; + + key_root = key = A->data_key(A, root, 0); + dtd_tile_root = PARSEC_DTD_TILE_OF_KEY(A, key_root); + + // Create array of destination ranks + int num_dest_ranks = 0; + int *dest_ranks = (int*)malloc(world*sizeof(int)); + + // Destination rank index + int dest_rank_idx = 0 ; + + // VALID ONLY ON THE ROOT NODE + for (int rank = 0; rank < world; ++rank) { + if (rank == root) continue; + dest_ranks[dest_rank_idx] = rank; + ++dest_rank_idx; + } + num_dest_ranks = dest_rank_idx; + + // + // Perform Broadcast + // + //fprintf(stderr, "perform bcast from rank %d\n", myrank); + parsec_dtd_broadcast( + dtd_tp, root, + dtd_tile_root, TILE_FULL, + dest_ranks, num_dest_ranks); + + // + // Retrieve value of broadcasted data + // + double* data_value_out = -1; + if(1) { + parsec_task_t *retrieve_task = parsec_dtd_taskpool_create_task( + dtd_tp, retrieve_task_fn, 0, "retrieve_task", + PASSED_BY_REF, dtd_tile_root, PARSEC_INPUT | TILE_FULL, + sizeof(int), &myrank, PARSEC_VALUE | PARSEC_AFFINITY, + sizeof(double*), &data_value_out, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + parsec_dtd_task_t *dtd_retrieve_task = (parsec_dtd_task_t *)retrieve_task; + parsec_insert_dtd_task(retrieve_task); + } + parsec_dtd_data_flush_all( dtd_tp, A ); + + // Wait for task completion + perr = parsec_dtd_taskpool_wait( dtd_tp ); + PARSEC_CHECK_ERROR(perr, "parsec_dtd_taskpool_wait"); + + perr = parsec_context_wait(parsec_context); + PARSEC_CHECK_ERROR(perr, "parsec_context_wait"); + MPI_Barrier(MPI_COMM_WORLD); + endtime = MPI_Wtime(); + if(myrank==0)printf("That took %f seconds\n",endtime-starttime); + + // Cleanup data and parsec data structures + parsec_type_free(&parsec_dtd_arenas_datatypes[TILE_FULL].opaque_dtt); + PARSEC_OBJ_RELEASE(parsec_dtd_arenas_datatypes[TILE_FULL].arena); + parsec_dtd_data_collection_fini( A ); + free_data(dcA); + parsec_taskpool_free( dtd_tp ); + + return ret; +} + +int main(int argc, char **argv) { + + int ret; + parsec_context_t* parsec_context = NULL; + int rank, world; + + char *p; + int nt = strtol(argv[1], &p, 10); + nt = nt*nt; + { + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided); + } + MPI_Comm_size(MPI_COMM_WORLD, &world); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + /* int ncores = 1; */ + int ncores = 2; + parsec_context = parsec_init(ncores, &argc, &argv); + + // Testing trimming with a mixed destinations of receivers for broadcast + //MPI_Barrier(MPI_COMM_WORLD); + //starttime = MPI_Wtime(); + ret = test_broadcast_mixed(world, rank, parsec_context, 0, nt); + //MPI_Barrier(MPI_COMM_WORLD); + //endtime = MPI_Wtime(); + //if(rank==0)printf("That took %f seconds\n",endtime-starttime); + + parsec_fini(&parsec_context); + + MPI_Finalize(); + (void)ret; + return 0; +} diff --git a/tests/interfaces/superscalar/dtd_test_broadcast_p2p.c b/tests/interfaces/superscalar/dtd_test_broadcast_p2p.c new file mode 100644 index 000000000..8010d3632 --- /dev/null +++ b/tests/interfaces/superscalar/dtd_test_broadcast_p2p.c @@ -0,0 +1,286 @@ +#include "mpi.h" + +#include "parsec.h" +#include "parsec/arena.h" +#include "parsec/data_dist/matrix/matrix.h" +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" +#include "parsec/remote_dep.h" +#include "parsec/data_internal.h" +#include "parsec/interfaces/superscalar/insert_function_internal.h" +#include "parsec/interfaces/superscalar/insert_function.h" + +#include +#include + +enum regions + { + TILE_FULL, + TILE_BCAST + }; + +parsec_tiled_matrix_dc_t *create_and_distribute_data(int rank, int world, int mb, int mt) +{ + two_dim_block_cyclic_t *m = (two_dim_block_cyclic_t*)malloc(sizeof(two_dim_block_cyclic_t)); + two_dim_block_cyclic_init(m, matrix_ComplexDouble, matrix_Tile, + rank, + mb, 1, + mt*mb, 1, + 0, 0, + mt*mb, 1, + world, 1, + 1, 1, + 0, 0); + + m->mat = parsec_data_allocate((size_t)m->super.nb_local_tiles * + (size_t)m->super.bsiz * + (size_t)parsec_datadist_getsizeoftype(m->super.mtype)); + + return (parsec_tiled_matrix_dc_t*)m; +} + +void free_data(parsec_tiled_matrix_dc_t *d) +{ + parsec_matrix_destroy_data(d); + parsec_data_collection_destroy(&d->super); + free(d); +} + +// Read data +int read_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task ) { + (void)es; + + // INPUT data + int *val_in; + // Task rank + int dest_rank; + + parsec_dtd_unpack_args(this_task, &val_in, &dest_rank); + + int myrank; + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + //printf("[read_task] rank = %d, val_in = %d\n", myrank, *val_in); + + return PARSEC_HOOK_RETURN_DONE; +} + +// Write data +int write_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task) { + (void)es; + + // INOUT data + double *val_in; + // Value to set the data to + double data_value; + // Task rank + int dest_rank; + + parsec_dtd_unpack_args(this_task, &val_in, &dest_rank, &data_value); + + //sleep(1); + int myrank; + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + //printf("[write_task] rank = %d, data_value = %f\n", myrank, data_value); + + *val_in = data_value; + + return PARSEC_HOOK_RETURN_DONE; +} + +// Retrieve value associated with input data_copy for verification. +int retrieve_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task ) { + (void)es; + + int myrank = -1; + // INPUT data + double *val_in; + // Task rank + int dest_rank; + + double *val_out; + + parsec_dtd_unpack_args(this_task, &val_in, &dest_rank, &val_out); + + /* int myrank; */ + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + //printf("[read_task] rank = %d, val_in = %f\n", myrank, *val_in); + + //*val_out = *val_in; + + return PARSEC_HOOK_RETURN_DONE; +} + + +int dummy_task_fn( + parsec_execution_stream_t *es, + parsec_task_t *this_task) { + (void)es;(void)this_task; + + return PARSEC_HOOK_RETURN_DONE; +} + +int test_broadcast_mixed( + int world, int myrank, parsec_context_t* parsec_context, int root, int num_elem) { + + // Test return value: + // - 0: success + // - Failure otherwise + int ret = 0; + + double starttime, endtime; + // Error code return by parsec routines + int perr; + + // Tile size + int nb = 1; + int nb_bcast = 30; + // Total number of tiles + int nt = 1; + int data_value = 0; + + //sleep(40); + //number of elements per tile + nb = num_elem; + // few tiles per node + nt = world; + + parsec_taskpool_t *dtd_tp = parsec_dtd_taskpool_new(); + + parsec_matrix_add2arena_rect( + &parsec_dtd_arenas_datatypes[TILE_FULL], + parsec_datatype_double_t, + nb, 1, nb); + + // Initial value on the root node. All node should have this value + // at the end of the operation. + double data_root = 55; + + + + parsec_tiled_matrix_dc_t *dcA; + dcA = create_and_distribute_data(myrank, world, nb, nt); + parsec_data_collection_set_key((parsec_data_collection_t *)dcA, "A"); + + parsec_data_collection_t *A = (parsec_data_collection_t *)dcA; + two_dim_block_cyclic_t *__dcA = dcA; + parsec_dtd_data_collection_init(A); + + parsec_data_copy_t *parsec_data_copy; + parsec_data_t *parsec_data; + // Pointer to local tile data + double *data_ptr; + // Local tile key + int key; + + key = A->data_key(A, myrank, 0); + parsec_data = A->data_of_key(A, key); + parsec_data_copy = parsec_data_get_copy(parsec_data, 0); + data_ptr = (double*)parsec_data_copy_get_ptr(parsec_data_copy); + + // Registering the dtd_handle with PARSEC context + perr = parsec_context_add_taskpool( parsec_context, dtd_tp ); + PARSEC_CHECK_ERROR(perr, "parsec_context_add_taskpool"); + + perr = parsec_context_start(parsec_context); + PARSEC_CHECK_ERROR(perr, "parsec_context_start"); + MPI_Barrier(MPI_COMM_WORLD); + starttime = MPI_Wtime(); + // Initialize tiles + if( 1 ) { + parsec_task_t *root_task = parsec_dtd_taskpool_create_task( + dtd_tp, write_task_fn, 0, "root_task", + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, root, 0), PARSEC_INOUT | TILE_FULL, + sizeof(int), &root, PARSEC_VALUE | PARSEC_AFFINITY, + sizeof(double*), &data_root, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + parsec_dtd_task_t *dtd_root_task = (parsec_dtd_task_t *)root_task; + parsec_insert_dtd_task(dtd_root_task); + } + + // Key of tile associated with root node + int key_root; + parsec_dtd_tile_t* dtd_tile_root; + + key_root = key = A->data_key(A, root, 0); + dtd_tile_root = PARSEC_DTD_TILE_OF_KEY(A, key_root); + + // + // Retrieve value of broadcasted data + // + double* data_value_out = -1; + for(int irank = 0; irank < world; irank++) { + //parsec_task_t *retrieve_task = parsec_dtd_taskpool_create_task( + parsec_dtd_taskpool_insert_task( + dtd_tp, retrieve_task_fn, 0, "retrieve_task", + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, root, 0), PARSEC_INPUT | TILE_FULL, + sizeof(int), &irank, PARSEC_VALUE | PARSEC_AFFINITY, + sizeof(double*), &data_value_out, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + //parsec_dtd_task_t *dtd_retrieve_task = (parsec_dtd_task_t *)retrieve_task; + //parsec_insert_dtd_task(retrieve_task); + } + parsec_dtd_data_flush_all( dtd_tp, A ); + + // Wait for task completion + perr = parsec_dtd_taskpool_wait( dtd_tp ); + PARSEC_CHECK_ERROR(perr, "parsec_dtd_taskpool_wait"); + MPI_Barrier(MPI_COMM_WORLD); + endtime = MPI_Wtime(); + if(myrank==0)printf("That took %f seconds\n",endtime-starttime); + + perr = parsec_context_wait(parsec_context); + PARSEC_CHECK_ERROR(perr, "parsec_context_wait"); + + // Cleanup data and parsec data structures + parsec_type_free(&parsec_dtd_arenas_datatypes[TILE_FULL].opaque_dtt); + PARSEC_OBJ_RELEASE(parsec_dtd_arenas_datatypes[TILE_FULL].arena); + parsec_dtd_data_collection_fini( A ); + free_data(dcA); + parsec_taskpool_free( dtd_tp ); + + return ret; +} + +int main(int argc, char **argv) { + + int ret; + parsec_context_t* parsec_context = NULL; + double starttime, endtime; + int rank, world; + + char *p; + int nt = strtol(argv[1], &p, 10); + nt = nt*nt; + { + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided); + } + MPI_Comm_size(MPI_COMM_WORLD, &world); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + /* int ncores = 1; */ + int ncores = 2; + parsec_context = parsec_init(ncores, &argc, &argv); + + // Testing trimming with a mixed destinations of receivers for broadcast + //MPI_Barrier(MPI_COMM_WORLD); + //starttime = MPI_Wtime(); + ret = test_broadcast_mixed(world, rank, parsec_context, 0, nt); + //MPI_Barrier(MPI_COMM_WORLD); + //endtime = MPI_Wtime(); + //if(rank==0)printf("That took %f seconds\n",endtime-starttime); + + parsec_fini(&parsec_context); + + MPI_Finalize(); + (void)ret; + return 0; +} diff --git a/tests/interfaces/superscalar/testing_zpotrf_dtd.c b/tests/interfaces/superscalar/testing_zpotrf_dtd.c new file mode 100644 index 000000000..62430752f --- /dev/null +++ b/tests/interfaces/superscalar/testing_zpotrf_dtd.c @@ -0,0 +1,521 @@ +/* + * Copyright (c) 2013-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * + * @precisions normal z -> s d c + * + */ + +#include "common.h" +#include "flops.h" +#include "dplasma/types.h" +#include "parsec/data_dist/matrix/sym_two_dim_rectangle_cyclic.h" +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" +#include "parsec/interfaces/superscalar/insert_function.h" +#include "parsec/interfaces/superscalar/insert_function_internal.h" + +enum regions { + TILE_FULL, + TILE_BCAST + }; + +int +parsec_core_potrf(parsec_execution_stream_t *es, parsec_task_t *this_task) +{ + (void)es; + int uplo; + int m, lda, *info; + dplasma_complex64_t *A; + + parsec_dtd_unpack_args(this_task, &uplo, &m, &A, &lda, &info); + + int rank = this_task->taskpool->context->my_rank; + fprintf(stderr, "core_potrf executed on rank %d\n", rank); + CORE_zpotrf(uplo, m, A, lda, info); + + return PARSEC_HOOK_RETURN_DONE; +} + +int +parsec_core_trsm(parsec_execution_stream_t *es, parsec_task_t *this_task) +{ + (void)es; + int side, uplo, trans, diag; + int m, n, lda, ldc; + dplasma_complex64_t alpha; + dplasma_complex64_t *A, *C; + + parsec_dtd_unpack_args(this_task, &side, &uplo, &trans, &diag, &m, &n, + &alpha, &A, &lda, &C, &ldc); + int rank = this_task->taskpool->context->my_rank; + //fprintf(stderr, "core_trsm executed on rank %d \n", rank); + + CORE_ztrsm(side, uplo, trans, diag, + m, n, alpha, + A, lda, + C, ldc); + + return PARSEC_HOOK_RETURN_DONE; +} + +int +parsec_core_herk(parsec_execution_stream_t *es, parsec_task_t *this_task) +{ + (void)es; + int uplo, trans; + int m, n, lda, ldc; + dplasma_complex64_t alpha; + dplasma_complex64_t beta; + dplasma_complex64_t *A; + dplasma_complex64_t *C; + + parsec_dtd_unpack_args(this_task, &uplo, &trans, &m, &n, &alpha, &A, + &lda, &beta, &C, &ldc); + int rank = this_task->taskpool->context->my_rank; + //fprintf(stderr, "core_herk executed on rank %d\n", rank); + + CORE_zherk(uplo, trans, m, n, + alpha, A, lda, + beta, C, ldc); + + return PARSEC_HOOK_RETURN_DONE; +} + +int +parsec_core_gemm(parsec_execution_stream_t *es, parsec_task_t *this_task) +{ + (void)es; + int transA, transB; + int m, n, k, lda, ldb, ldc; + dplasma_complex64_t alpha, beta; + dplasma_complex64_t *A; + dplasma_complex64_t *B; + dplasma_complex64_t *C; + + parsec_dtd_unpack_args(this_task, &transA, &transB, &m, &n, &k, &alpha, + &A, &lda, &B, &ldb, &beta, &C, &ldc); + int rank = this_task->taskpool->context->my_rank; + //fprintf(stderr, "core_gemm executed on rank %d\n", rank); + + CORE_zgemm(transA, transB, + m, n, k, + alpha, A, lda, + B, ldb, + beta, C, ldc); + + return PARSEC_HOOK_RETURN_DONE; +} + +int main(int argc, char **argv) +{ + parsec_context_t* parsec; + int iparam[IPARAM_SIZEOF]; + int uplo = dplasmaUpper; + int info = 0; + int ret = 0; + + + int m, n, k, total; /* loop counter */ + /* Parameters passed on to Insert_task() */ + int tempkm, tempmm, ldak, ldam, side, transA_p, transA_g, diag, trans, transB, ldan; + dplasma_complex64_t alpha_trsm, alpha_herk, beta; + + /* Set defaults for non argv iparams */ + iparam_default_facto(iparam); + iparam_default_ibnbmb(iparam, 0, 180, 180); + iparam[IPARAM_NGPUS] = DPLASMA_ERR_NOT_SUPPORTED; + + /* Initialize PaRSEC */ + parsec = setup_parsec(argc, argv, iparam); + PASTE_CODE_IPARAM_LOCALS(iparam); + PASTE_CODE_FLOPS(FLOPS_ZPOTRF, ((DagDouble_t)N)); + + /* initializing matrix structure */ + LDA = dplasma_imax( LDA, N ); + LDB = dplasma_imax( LDB, N ); + KP = 1; + KQ = 1; + //sleep(30); + + PASTE_CODE_ALLOCATE_MATRIX(dcA, 1, + sym_two_dim_block_cyclic, (&dcA, matrix_ComplexDouble, + rank, MB, NB, LDA, N, 0, 0, + N, N, P, nodes/P, uplo)); + //int bsize = 30; + //PASTE_CODE_ALLOCATE_MATRIX(dcB, 1, + // sym_two_dim_block_cyclic, (&dcB, matrix_Integer, + // rank, bsize, bsize, bsize*N/NB, bsize*N/NB, 0, 0, + // bsize*N/NB, bsize*N/NB, P, nodes/P, uplo)); + + /* Initializing dc for dtd */ + sym_two_dim_block_cyclic_t *__dcA = &dcA; + parsec_dtd_data_collection_init((parsec_data_collection_t *)&dcA); + + //sym_two_dim_block_cyclic_t *__dcB = &dcB; + //parsec_dtd_data_collection_init((parsec_data_collection_t *)&dcB); + + /* matrix generation */ + if(loud > 3) printf("+++ Generate matrices ... "); + dplasma_zplghe( parsec, (double)(N), uplo, + (parsec_tiled_matrix_dc_t *)&dcA, random_seed); + if(loud > 3) printf("Done\n"); + + /* Getting new parsec handle of dtd type */ + parsec_taskpool_t *dtd_tp = parsec_dtd_taskpool_new(); + + /* Allocating data arrays to be used by comm engine */ + dplasma_add2arena_tile( &parsec_dtd_arenas_datatypes[TILE_FULL], + dcA.super.mb*dcA.super.nb*sizeof(dplasma_complex64_t), + PARSEC_ARENA_ALIGNMENT_SSE, + parsec_datatype_double_complex_t, dcA.super.mb ); + + //dplasma_add2arena_tile( &parsec_dtd_arenas_datatypes[TILE_BCAST], + // dcB.super.mb*dcB.super.nb*sizeof(int), + // PARSEC_ARENA_ALIGNMENT_SSE, + // parsec_datatype_int32_t, dcB.super.mb ); + + /* Registering the handle with parsec context */ + parsec_context_add_taskpool( parsec, dtd_tp ); + + //sleep(40); + SYNC_TIME_START(); + + /* #### parsec context Starting #### */ + + /* start parsec context */ + parsec_context_start( parsec ); + + if( dplasmaLower == uplo ) { + + side = dplasmaRight; + transA_p = dplasmaConjTrans; + diag = dplasmaNonUnit; + alpha_trsm = 1.0; + trans = dplasmaNoTrans; + alpha_herk = -1.0; + beta = 1.0; + transB = dplasmaConjTrans; + transA_g = dplasmaNoTrans; + + total = dcA.super.mt; + /* Testing Insert Function */ + for( k = 0; k < total; k++ ) { + tempkm = (k == (dcA.super.mt - 1)) ? dcA.super.m - k * dcA.super.mb : dcA.super.mb; + ldak = BLKLDD(&dcA.super, k); + + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_potrf, + (total - k) * (total-k) * (total - k)/*priority*/, "Potrf", + sizeof(int), &uplo, PARSEC_VALUE, + sizeof(int), &tempkm, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, k), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldak, PARSEC_VALUE, + sizeof(int *), &info, PARSEC_SCRATCH, + PARSEC_DTD_ARG_END ); + + for( m = k+1; m < total; m++ ) { + tempmm = m == dcA.super.mt - 1 ? dcA.super.m - m * dcA.super.mb : dcA.super.mb; + ldam = BLKLDD(&dcA.super, m); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_trsm, + (total - m) * (total-m) * (total - m) + 3 * ((2 * total) - k - m - 1) * (m - k)/*priority*/, "Trsm", + sizeof(int), &side, PARSEC_VALUE, + sizeof(int), &uplo, PARSEC_VALUE, + sizeof(int), &transA_p, PARSEC_VALUE, + sizeof(int), &diag, PARSEC_VALUE, + sizeof(int), &tempmm, PARSEC_VALUE, + sizeof(int), &dcA.super.nb, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &alpha_trsm, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, k), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldak, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, m, k), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldam, PARSEC_VALUE, + PARSEC_DTD_ARG_END ); + } + parsec_dtd_data_flush( dtd_tp, PARSEC_DTD_TILE_OF(A, k, k) ); + + for( m = k+1; m < dcA.super.nt; m++ ) { + tempmm = m == dcA.super.mt - 1 ? dcA.super.m - m * dcA.super.mb : dcA.super.mb; + ldam = BLKLDD(&dcA.super, m); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_herk, + (total - m) * (total - m) * (total - m) + 3 * (m - k)/*priority*/, "Herk", + sizeof(int), &uplo, PARSEC_VALUE, + sizeof(int), &trans, PARSEC_VALUE, + sizeof(int), &tempmm, PARSEC_VALUE, + sizeof(int), &dcA.super.mb, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &alpha_herk, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, m, k), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldam, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &beta, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, m, m), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldam, PARSEC_VALUE, + PARSEC_DTD_ARG_END ); + + for( n = m+1; n < total; n++ ) { + ldan = BLKLDD(&dcA.super, n); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_gemm, + (total - m) * (total - m) * (total - m) + 3 * ((2 * total) - m - n - 3) * (m - n) + 6 * (m - k) /*priority*/, "Gemm", + sizeof(int), &transA_g, PARSEC_VALUE, + sizeof(int), &transB, PARSEC_VALUE, + sizeof(int), &tempmm, PARSEC_VALUE, + sizeof(int), &dcA.super.mb, PARSEC_VALUE, + sizeof(int), &dcA.super.mb, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &alpha_herk, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, n, k), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldan, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, m, k), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldam, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &beta, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, n, m), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldan, PARSEC_VALUE, + PARSEC_DTD_ARG_END ); + } + parsec_dtd_data_flush( dtd_tp, PARSEC_DTD_TILE_OF(A, m, k) ); + } + } + } else { + side = dplasmaLeft; + transA_p = dplasmaConjTrans; + diag = dplasmaNonUnit; + alpha_trsm = 1.0; + trans = dplasmaConjTrans; + alpha_herk = -1.0; + beta = 1.0; + transB = dplasmaNoTrans; + transA_g = dplasmaConjTrans; + + total = dcA.super.nt; + + /* Variables used for collective */ + int root, num_dest_ranks, dest_rank_idx, flag; + int *dest_ranks = (int*)malloc((P+Q)*sizeof(int));; + for( k = 0; k < total; k++ ) { + tempkm = k == dcA.super.nt-1 ? dcA.super.n-k*dcA.super.nb : dcA.super.nb; + ldak = BLKLDD(&dcA.super, k); + if(parsec_dtd_rank_of_data(&dcA.super.super, k, k) == rank) { + //fprintf(stderr, "Inserting and executing potrf[%d %d] in rank: %d\n", k, k, rank); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_potrf, + (total - k) * (total-k) * (total - k)/*priority*/, "Potrf", + sizeof(int), &uplo, PARSEC_VALUE, + sizeof(int), &tempkm, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, k), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldak, PARSEC_VALUE, + sizeof(int *), &info, PARSEC_SCRATCH, + PARSEC_DTD_ARG_END ); + } + + /* + * Broadcast the diagonal tile to the current panel + */ + root = parsec_dtd_rank_of_data(&dcA.super.super, k, k); + num_dest_ranks = Q -1; + //int *dest_ranks = (int*)malloc(num_dest_ranks*sizeof(int)); + dest_rank_idx = 0; + flag = 0; + /* Should only be done in root, others will pass NULL since they know nothing */ + for(int m = k+1; m < total; m++) { + int tile_rank = parsec_dtd_rank_of_data(&dcA.super.super, k, m); + if(tile_rank == root) {flag = 1; continue;} + dest_ranks[dest_rank_idx] = tile_rank; + if(tile_rank == rank) flag = 1; + ++dest_rank_idx; + if(dest_rank_idx == Q-1) break; /* this is to populate the destination ranks */ + } + + if( ( flag || (rank == root) ) && ( dest_rank_idx >= 1) ) { + parsec_dtd_tile_t* dtd_tile_root = PARSEC_DTD_TILE_OF(A, k, k); + //parsec_dtd_tile_t* dtd_key_root = PARSEC_DTD_TILE_OF(B, k, k); + //fprintf(stderr, "Broadcasting PO tile to TRSM. k %d, rank %d, root %d\n", k, rank, root); + parsec_dtd_broadcast( + dtd_tp, root, + dtd_tile_root, TILE_FULL, + // dtd_key_root, TILE_BCAST, + dest_ranks, dest_rank_idx); + } + + for( m = k+1; m < total; m++ ) { + tempmm = m == dcA.super.nt-1 ? dcA.super.n-m*dcA.super.nb : dcA.super.nb; + //if( (parsec_dtd_rank_of_data(&dcA.super.super, k, m) == rank || parsec_dtd_rank_of_data(&dcA.super.super, k, k) == rank )) { + if( (parsec_dtd_rank_of_data(&dcA.super.super, k, m) == rank )) { + //fprintf(stderr, "Inserting trsm[%d %d][%d %d] in rank: %d owned: %d\n", k, k, k, m, rank, parsec_dtd_rank_of_data(&dcA.super.super, k, m)); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_trsm, + (total - m) * (total-m) * (total - m) + 3 * ((2 * total) - k - m - 1) * (m - k)/*priority*/, "Trsm", + sizeof(int), &side, PARSEC_VALUE, + sizeof(int), &uplo, PARSEC_VALUE, + sizeof(int), &transA_p, PARSEC_VALUE, + sizeof(int), &diag, PARSEC_VALUE, + sizeof(int), &dcA.super.nb, PARSEC_VALUE, + sizeof(int), &tempmm, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &alpha_trsm, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, k), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldak, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, m), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldak, PARSEC_VALUE, + PARSEC_DTD_ARG_END ); + } + + /* + * Broadcast the TRSM tile to the descendant SYRK/GEMM tasks + */ + root = parsec_dtd_rank_of_data(&dcA.super.super, k, m); + num_dest_ranks = P+Q -1; + //int *dest_ranks = (int*)malloc(num_dest_ranks*sizeof(int)); + dest_rank_idx = 0; + flag = 0; + /* Loop over P and Q processes to gather the broadcast destinations */ + for(int i = k+1; i <= m; i++) { + int tile_rank = parsec_dtd_rank_of_data(&dcA.super.super, i, m); + if(tile_rank == root) {break;} /* we have loop over all the ranks in the column */ + dest_ranks[dest_rank_idx] = tile_rank; + if(tile_rank == rank) flag = 1; /* flip the flag for the owner of the tile */ + ++dest_rank_idx; + } + int diag_rank = parsec_dtd_rank_of_data(&dcA.super.super, m, m); + for(int j = m+1; j < total; j++) { + int tile_rank = parsec_dtd_rank_of_data(&dcA.super.super, m, j); + if(tile_rank == diag_rank) {break;} /* we have loop over all the ranks in the column */ + dest_ranks[dest_rank_idx] = tile_rank; + if(tile_rank == rank) flag = 1; /* flip the flag for the owner of the tile */ + ++dest_rank_idx; + } + + if( ( flag || (rank == root) ) && ( dest_rank_idx >= 1) ) { + parsec_dtd_tile_t* dtd_tile_root = PARSEC_DTD_TILE_OF(A, k, m); + //parsec_dtd_tile_t* dtd_key_root = PARSEC_DTD_TILE_OF(B, k, m); + //fprintf(stderr, "Broadcasting TRSM tile to SYRK and GEMM. k %d, m %d, rank %d, root %d\n", k, m, rank, root); + parsec_dtd_broadcast( + dtd_tp, root, + dtd_tile_root, TILE_FULL, + //dtd_key_root, TILE_BCAST, + dest_ranks, dest_rank_idx); + } + } + parsec_dtd_data_flush( dtd_tp, PARSEC_DTD_TILE_OF(A, k, k) ); + + for( m = k+1; m < dcA.super.mt; m++ ) { + tempmm = m == dcA.super.nt-1 ? dcA.super.n-m*dcA.super.nb : dcA.super.nb; + ldam = BLKLDD(&dcA.super, m); + //if(parsec_dtd_rank_of_data(&dcA.super.super, m, m) == rank || parsec_dtd_rank_of_data(&dcA.super.super, k, m) == rank ) { + if(parsec_dtd_rank_of_data(&dcA.super.super, m, m) == rank ) { + //fprintf(stderr, "Inserting syrk[%d %d][%d %d] in rank: %d owned: %d\n", m, m, k, m, rank, parsec_dtd_rank_of_data(&dcA.super.super, m, m)); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_herk, + (total - m) * (total - m) * (total - m) + 3 * (m - k)/*priority*/, "Herk", + sizeof(int), &uplo, PARSEC_VALUE, + sizeof(int), &trans, PARSEC_VALUE, + sizeof(int), &tempmm, PARSEC_VALUE, + sizeof(int), &dcA.super.mb, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &alpha_herk, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, m), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldak, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &beta, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, m, m), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldam, PARSEC_VALUE, + PARSEC_DTD_ARG_END ); + } + for( n = m+1; n < total; n++ ) { + ldan = BLKLDD(&dcA.super, n); + //if(parsec_dtd_rank_of_data(&dcA.super.super, m, n) == rank || parsec_dtd_rank_of_data(&dcA.super.super, k, m) == rank || parsec_dtd_rank_of_data(&dcA.super.super, k, n) == rank) { + if(parsec_dtd_rank_of_data(&dcA.super.super, m, n) == rank ) { + //fprintf(stderr, "Inserting GEMM[%d %d][%d %d] in rank: %d owned: %d\n", k, n, k, m, rank, parsec_dtd_rank_of_data(&dcA.super.super, m, n)); + parsec_dtd_taskpool_insert_task( dtd_tp, parsec_core_gemm, + (total - m) * (total - m) * (total - m) + 3 * ((2 * total) - m - n - 3) * (m - n) + 6 * (m - k) /*priority*/, "Gemm", + sizeof(int), &transA_g, PARSEC_VALUE, + sizeof(int), &transB, PARSEC_VALUE, + sizeof(int), &dcA.super.mb, PARSEC_VALUE, + sizeof(int), &tempmm, PARSEC_VALUE, + sizeof(int), &dcA.super.mb, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &alpha_herk, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, m), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldak, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, k, n), PARSEC_INPUT | TILE_FULL, + sizeof(int), &ldak, PARSEC_VALUE, + sizeof(dplasma_complex64_t), &beta, PARSEC_VALUE, + PASSED_BY_REF, PARSEC_DTD_TILE_OF(A, m, n), PARSEC_INOUT | TILE_FULL | PARSEC_AFFINITY, + sizeof(int), &ldan, PARSEC_VALUE, + PARSEC_DTD_ARG_END ); + } + } + parsec_dtd_data_flush( dtd_tp, PARSEC_DTD_TILE_OF(A, k, m) ); + } + } + } + + parsec_dtd_data_flush_all( dtd_tp, (parsec_data_collection_t *)&dcA ); + + /* finishing all the tasks inserted, but not finishing the handle */ + parsec_dtd_taskpool_wait( dtd_tp ); + + /* Waiting on all handle and turning everything off for this context */ + parsec_context_wait( parsec ); + + /* #### PaRSEC context is done #### */ + + SYNC_TIME_PRINT(rank, ("\tPxQ= %3d %-3d NB= %4d N= %7d : %14f gflops\n", + P, Q, NB, N, + gflops=(flops/1e9)/sync_time_elapsed)); + + /* Cleaning up the parsec handle */ + parsec_taskpool_free( dtd_tp ); + + if( 0 == rank && info != 0 ) { + printf("-- Factorization is suspicious (info = %d) ! \n", info); + ret |= 1; + } + if( !info && check ) { + /* Check the factorization */ + PASTE_CODE_ALLOCATE_MATRIX(dcA0, check, + sym_two_dim_block_cyclic, (&dcA0, matrix_ComplexDouble, + rank, MB, NB, LDA, N, 0, 0, + N, N, P, nodes/P, uplo)); + dplasma_zplghe( parsec, (double)(N), uplo, + (parsec_tiled_matrix_dc_t *)&dcA0, random_seed); + + ret |= check_zpotrf( parsec, (rank == 0) ? loud : 0, uplo, + (parsec_tiled_matrix_dc_t *)&dcA, + (parsec_tiled_matrix_dc_t *)&dcA0); + + /* Check the solution */ + PASTE_CODE_ALLOCATE_MATRIX(dcB, check, + two_dim_block_cyclic, (&dcB, matrix_ComplexDouble, matrix_Tile, + rank, MB, NB, LDB, NRHS, 0, 0, + N, NRHS, P, nodes/P, KP, KQ, IP, JQ)); + dplasma_zplrnt( parsec, 0, (parsec_tiled_matrix_dc_t *)&dcB, random_seed+1); + + PASTE_CODE_ALLOCATE_MATRIX(dcX, check, + two_dim_block_cyclic, (&dcX, matrix_ComplexDouble, matrix_Tile, + rank, MB, NB, LDB, NRHS, 0, 0, + N, NRHS, P, nodes/P, KP, KQ, IP, JQ)); + dplasma_zlacpy( parsec, dplasmaUpperLower, + (parsec_tiled_matrix_dc_t *)&dcB, (parsec_tiled_matrix_dc_t *)&dcX ); + + dplasma_zpotrs(parsec, uplo, + (parsec_tiled_matrix_dc_t *)&dcA, + (parsec_tiled_matrix_dc_t *)&dcX ); + + ret |= check_zaxmb( parsec, (rank == 0) ? loud : 0, uplo, + (parsec_tiled_matrix_dc_t *)&dcA0, + (parsec_tiled_matrix_dc_t *)&dcB, + (parsec_tiled_matrix_dc_t *)&dcX); + + /* Cleanup */ + parsec_data_free(dcA0.mat); dcA0.mat = NULL; + parsec_tiled_matrix_dc_destroy( (parsec_tiled_matrix_dc_t*)&dcA0 ); + parsec_data_free(dcB.mat); dcB.mat = NULL; + parsec_tiled_matrix_dc_destroy( (parsec_tiled_matrix_dc_t*)&dcB ); + parsec_data_free(dcX.mat); dcX.mat = NULL; + parsec_tiled_matrix_dc_destroy( (parsec_tiled_matrix_dc_t*)&dcX ); + } + + /* Cleaning data arrays we allocated for communication */ + dplasma_matrix_del2arena( &parsec_dtd_arenas_datatypes[TILE_FULL] ); + //dplasma_matrix_del2arena( &parsec_dtd_arenas_datatypes[TILE_BCAST] ); + parsec_dtd_data_collection_fini( (parsec_data_collection_t *)&dcA ); + //parsec_dtd_data_collection_fini( (parsec_data_collection_t *)&dcB ); + + parsec_data_free(dcA.mat); dcA.mat = NULL; + //parsec_data_free(dcB.mat); dcB.mat = NULL; + parsec_tiled_matrix_dc_destroy( (parsec_tiled_matrix_dc_t*)&dcA); + //parsec_tiled_matrix_dc_destroy( (parsec_tiled_matrix_dc_t*)&dcB); + + cleanup_parsec(parsec, iparam); + return ret; +} diff --git a/tests/ptg_bcast.jdf b/tests/ptg_bcast.jdf new file mode 100644 index 000000000..c6003ff43 --- /dev/null +++ b/tests/ptg_bcast.jdf @@ -0,0 +1,168 @@ +extern "C" %{ +/** + * PTG_BCAST BENCHMARK + **/ + +#include +#include +#include +#include +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" + +#include "ptg_bcast.h" +static parsec_ptg_bcast_taskpool_t* tp; +static int verbose = 0; + +%} + +descA [type = "two_dim_block_cyclic_t*"] +descB [type = "two_dim_block_cyclic_t*" aligned=descA] + + + + +WRITER(i) + + i = 0..0 + +: descA(0, 0) + +RW A <- descA(i, 0) + -> A READERS(0..descA->super.mt-1) +BODY + if(verbose) + printf("Executed WRITE TASK ON ROOT\n"); +END + + +READERS(i) +i = 0 .. descA->super.mt-1 + +: descA(i, 0) + +READ A <- A WRITER(0) +RW B <- descB(i, 0) + -> descB(i, 0) +BODY + if(verbose) + printf("Executed READ OF A and UPDATE of B\n"); +END + +extern "C" %{ +parsec_taskpool_t* +parsec_ptg_bcast_New(parsec_tiled_matrix_dc_t *dcA, parsec_tiled_matrix_dc_t *dcB) +{ + parsec_taskpool_t* ptg_bcast_taskpool; + parsec_ptg_bcast_taskpool_t* taskpool = NULL; + + taskpool = parsec_ptg_bcast_new(dcA, dcB); + ptg_bcast_taskpool = (parsec_taskpool_t*)taskpool; + + parsec_matrix_add2arena( &taskpool->arenas_datatypes[PARSEC_ptg_bcast_DEFAULT_ARENA], + parsec_datatype_double_t, matrix_UpperLower, + 1, dcA->mb, dcA->nb, dcA->mb, + PARSEC_ARENA_ALIGNMENT_SSE, -1 ); + + return ptg_bcast_taskpool; +} + +void parsec_ptg_bcast_Destruct(parsec_taskpool_t *taskpool) +{ + parsec_ptg_bcast_taskpool_t *ptg_bcast_taskpool = (parsec_ptg_bcast_taskpool_t *)taskpool; + parsec_matrix_del2arena(&ptg_bcast_taskpool->arenas_datatypes[PARSEC_ptg_bcast_DEFAULT_ARENA]); + parsec_taskpool_free(taskpool); +} + +int parsec_ptg_bcast(parsec_context_t *parsec, + parsec_tiled_matrix_dc_t *A, + parsec_tiled_matrix_dc_t *B) +{ + parsec_taskpool_t *parsec_ptg_bcast = NULL; + + parsec_ptg_bcast = parsec_ptg_bcast_New(A, B); + + double starttime, endtime; + +MPI_Barrier (MPI_COMM_WORLD); + starttime = MPI_Wtime(); + if( parsec_ptg_bcast != NULL ){ + parsec_enqueue(parsec, parsec_ptg_bcast); + parsec_context_start(parsec); + parsec_context_wait(parsec); +MPI_Barrier(MPI_COMM_WORLD); +endtime = MPI_Wtime (); + if(parsec->my_rank==0)printf("That took %f seconds\n",endtime-starttime); + parsec_ptg_bcast_Destruct(parsec_ptg_bcast); + } + + return 0; +} + +int main(int argc, char* argv[]) +{ + two_dim_block_cyclic_t descA, descB; + parsec_arena_datatype_t adt; + parsec_context_t *parsec; + int rank = 0, size = 1, mat_size; + long time_elapsed; + int nt; + nt = strtol(argv[1], NULL, 10); +#ifdef DISTRIBUTED + { + int provided; + MPI_Init_thread(NULL, NULL, MPI_THREAD_SERIALIZED, &provided); + } + int world; + MPI_Comm_size(MPI_COMM_WORLD, &world); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); +#endif /* DISTRIBUTED */ + parsec = parsec_init(2, &argc, &argv); + assert( NULL != parsec ); + + //MPI_Barrier(MPI_COMM_WORLD); + two_dim_block_cyclic_init( &descA, matrix_RealDouble, matrix_Tile, + rank /*rank*/, + nt*nt, 1, + world*nt*nt, 1, + 0, 0, + world*nt*nt, 1, + world, 1, + 1, 1, + 0, 0); + descA.mat = parsec_data_allocate( descA.super.nb_local_tiles * + descA.super.bsiz * + parsec_datadist_getsizeoftype(descA.super.mtype) ); + two_dim_block_cyclic_init( &descB, matrix_RealDouble, matrix_Tile, + rank /*rank*/, + nt*nt, 1, + world*nt*nt, 1, + 0, 0, + world*nt*nt, 1, + world, 1, + 1, 1, + 0, 0); + descB.mat = parsec_data_allocate( descB.super.nb_local_tiles * + descB.super.bsiz * + parsec_datadist_getsizeoftype(descA.super.mtype) ); + + //SYNC_TIME_START(); + double starttime, endtime; + MPI_Barrier(MPI_COMM_WORLD); + //starttime = MPI_Wtime(); + parsec_ptg_bcast(parsec, (parsec_tiled_matrix_dc_t *)&descA, (parsec_tiled_matrix_dc_t *)&descB); + //MPI_Barrier(MPI_COMM_WORLD); + //endtime = MPI_Wtime(); + //if(rank==0)printf("That took %f seconds\n",endtime-starttime); + parsec_data_free(descA.mat); + parsec_data_free(descB.mat); + parsec_tiled_matrix_dc_destroy((parsec_tiled_matrix_dc_t*)&descA); + parsec_tiled_matrix_dc_destroy((parsec_tiled_matrix_dc_t*)&descB); + + parsec_fini(&parsec); +#ifdef DISTRIBUTED + MPI_Finalize(); +#endif /* DISTRIBUTED */ + return 0; +} + +%}