From 65f97cd38b6af838583bb4bb19dbdbf716b4088b Mon Sep 17 00:00:00 2001 From: ralomairy Date: Sun, 10 May 2026 03:24:09 +0000 Subject: [PATCH 1/2] Add installation instructions --- INSTALL_STEPS.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 INSTALL_STEPS.md diff --git a/INSTALL_STEPS.md b/INSTALL_STEPS.md new file mode 100644 index 000000000..331068593 --- /dev/null +++ b/INSTALL_STEPS.md @@ -0,0 +1,26 @@ +# PaRSEC Installation Instructions + +## Prerequisites +Ensure the following dependencies are installed: +```bash +sudo apt-get update +sudo apt-get install -y libopenmpi-dev openmpi-bin cmake bison libhwloc-dev +``` + +## Building PaRSEC +1. Create a build directory and enter it: + ```bash + mkdir builddir && cd builddir + ``` + +2. Run the configure script. Note that we disable the MPI+HWLOC compatibility check to avoid potential conflicts with system-installed libraries: + ```bash + ../configure --with-mpi --without-hwloc --disable-debug --prefix=$PWD/install -DMPI_HWLOC_COMPAT_CHECK=OFF + ``` + +3. Build and install: + ```bash + make install + ``` + +The binaries and libraries will be available in `builddir/install`. From a94c339c1aa13cb61e384633c10b6a8302561bf8 Mon Sep 17 00:00:00 2001 From: ralomairy Date: Sun, 10 May 2026 06:55:35 +0000 Subject: [PATCH 2/2] Add PaRSEC-Agent research framework steps 1-7 --- parsec-agent-project/README.md | 34 +++++ parsec-agent-project/step1/CMakeLists.txt | 13 ++ parsec-agent-project/step1/README.md | 15 ++ parsec-agent-project/step1/main.c | 25 ++++ parsec-agent-project/step2/CMakeLists.txt | 12 ++ parsec-agent-project/step2/README.md | 17 +++ parsec-agent-project/step2/main.c | 59 ++++++++ parsec-agent-project/step3/CMakeLists.txt | 12 ++ parsec-agent-project/step3/README.md | 15 ++ parsec-agent-project/step3/main.c | 71 ++++++++++ parsec-agent-project/step4/CMakeLists.txt | 12 ++ parsec-agent-project/step4/README.md | 16 +++ parsec-agent-project/step4/main.c | 89 ++++++++++++ parsec-agent-project/step5/CMakeLists.txt | 13 ++ parsec-agent-project/step5/README.md | 16 +++ parsec-agent-project/step5/main.c | 150 ++++++++++++++++++++ parsec-agent-project/step6/CMakeLists.txt | 14 ++ parsec-agent-project/step6/README.md | 15 ++ parsec-agent-project/step6/main.c | 149 ++++++++++++++++++++ parsec-agent-project/step7/CMakeLists.txt | 14 ++ parsec-agent-project/step7/README.md | 15 ++ parsec-agent-project/step7/main.c | 161 ++++++++++++++++++++++ 22 files changed, 937 insertions(+) create mode 100644 parsec-agent-project/README.md create mode 100644 parsec-agent-project/step1/CMakeLists.txt create mode 100644 parsec-agent-project/step1/README.md create mode 100644 parsec-agent-project/step1/main.c create mode 100644 parsec-agent-project/step2/CMakeLists.txt create mode 100644 parsec-agent-project/step2/README.md create mode 100644 parsec-agent-project/step2/main.c create mode 100644 parsec-agent-project/step3/CMakeLists.txt create mode 100644 parsec-agent-project/step3/README.md create mode 100644 parsec-agent-project/step3/main.c create mode 100644 parsec-agent-project/step4/CMakeLists.txt create mode 100644 parsec-agent-project/step4/README.md create mode 100644 parsec-agent-project/step4/main.c create mode 100644 parsec-agent-project/step5/CMakeLists.txt create mode 100644 parsec-agent-project/step5/README.md create mode 100644 parsec-agent-project/step5/main.c create mode 100644 parsec-agent-project/step6/CMakeLists.txt create mode 100644 parsec-agent-project/step6/README.md create mode 100644 parsec-agent-project/step6/main.c create mode 100644 parsec-agent-project/step7/CMakeLists.txt create mode 100644 parsec-agent-project/step7/README.md create mode 100644 parsec-agent-project/step7/main.c diff --git a/parsec-agent-project/README.md b/parsec-agent-project/README.md new file mode 100644 index 000000000..006db4583 --- /dev/null +++ b/parsec-agent-project/README.md @@ -0,0 +1,34 @@ +# PaRSEC-Agent Research Framework + +This project implements a research framework for mapping AI agents onto the PaRSEC (Parallel Robust Scalable Applications) runtime. It follows a 7-step implementation journey to build a distributed, asynchronous, and parallel agentic system. + +## Project Structure +- **step1/**: Build verification and PaRSEC initialization. +- **step2/**: Introduction to Dynamic Task Discovery (DTD). +- **step3/**: Implementation of recursive task insertion for agentic loops. +- **step4/**: Design of the ReAct state machine using tasks. +- **step5/**: Integration with Ollama for real LLM reasoning. +- **step6/**: Implementation of the Asynchronous I/O bridge to prevent core stalling. +- **step7/**: Execution of N parallel agents across multiple CPU cores. + +## Prerequisites +- PaRSEC (installed at `$HOME/parsec/builddir/install`) +- OpenMPI +- libcurl +- Ollama (running with the `tinyllama` model) + +## Global Build Instructions +Each step is a standalone CMake project. To build any step: +```bash +cd stepX +mkdir build && cd build +cmake .. +make +``` + +## Running +Ensure your `LD_LIBRARY_PATH` includes the PaRSEC library path: +```bash +export LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH +``` +Then run the specific executable for each step using `mpirun`. diff --git a/parsec-agent-project/step1/CMakeLists.txt b/parsec-agent-project/step1/CMakeLists.txt new file mode 100644 index 000000000..60169d48a --- /dev/null +++ b/parsec-agent-project/step1/CMakeLists.txt @@ -0,0 +1,13 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step1 C) + +# Set the path to your PaRSEC installation +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) + +add_executable(parsec_ok main.c) +target_include_directories(parsec_ok PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS}) +target_link_libraries(parsec_ok "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES}) diff --git a/parsec-agent-project/step1/README.md b/parsec-agent-project/step1/README.md new file mode 100644 index 000000000..1f24e87f2 --- /dev/null +++ b/parsec-agent-project/step1/README.md @@ -0,0 +1,15 @@ +# PaRSEC-Agent: Step 1 - Build Verification + +## Purpose +The goal of this step is to verify that the PaRSEC installation is correct and that the build system (CMake) can correctly link against PaRSEC and MPI. + +## Key Concepts +- **parsec_init**: Initializes the PaRSEC runtime environment. +- **parsec_fini**: Safely shuts down the PaRSEC context. +- **MPI_Init_thread**: Required because PaRSEC is a multi-threaded runtime and needs MPI to support the `MPI_THREAD_MULTIPLE` level. + +## How to Build and Run +1. Enter the build directory: `mkdir build && cd build` +2. Configure: `cmake ..` +3. Build: `make` +4. Run: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./parsec_ok` diff --git a/parsec-agent-project/step1/main.c b/parsec-agent-project/step1/main.c new file mode 100644 index 000000000..d6d3a096b --- /dev/null +++ b/parsec-agent-project/step1/main.c @@ -0,0 +1,25 @@ +#include +#include +#include + +int main(int argc, char **argv) { + parsec_context_t* parsec_context; + int parsec_argc = argc; + char** parsec_argv = argv; + int provided; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + + parsec_context = parsec_init(-1, &parsec_argc, &parsec_argv); + if(NULL == parsec_context) { + fprintf(stderr, "parsec_init failed\n"); + MPI_Finalize(); + return -1; + } + + printf("PaRSEC OK\n"); + + parsec_fini(&parsec_context); + MPI_Finalize(); + return 0; +} diff --git a/parsec-agent-project/step2/CMakeLists.txt b/parsec-agent-project/step2/CMakeLists.txt new file mode 100644 index 000000000..3572fcd95 --- /dev/null +++ b/parsec-agent-project/step2/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step2 C) + +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) + +add_executable(dtd_hello main.c) +target_include_directories(dtd_hello PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS}) +target_link_libraries(dtd_hello "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES}) diff --git a/parsec-agent-project/step2/README.md b/parsec-agent-project/step2/README.md new file mode 100644 index 000000000..a5f3a6ba6 --- /dev/null +++ b/parsec-agent-project/step2/README.md @@ -0,0 +1,17 @@ +# PaRSEC-Agent: Step 2 - DTD Basics + +## Purpose +This step introduces Dynamic Task Discovery (DTD) in PaRSEC. It demonstrates how to define a task class and insert independent tasks into a taskpool. + +## Key Concepts +- **parsec_dtd_taskpool_new**: Creates a new taskpool dedicated to DTD. +- **parsec_dtd_create_task_class**: Defines the signature of a task (number of arguments and their types). +- **PARSEC_VALUE**: Indicates that an argument should be passed by value (copied into the task). +- **parsec_dtd_unpack_args**: Extracts the arguments from the task structure inside the task body. +- **PARSEC_DTD_ARG_END**: Sentinel value used to terminate variadic argument lists. + +## How to Build and Run +1. Enter the build directory: `mkdir build && cd build` +2. Configure: `cmake ..` +3. Build: `make` +4. Run: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./dtd_hello` diff --git a/parsec-agent-project/step2/main.c b/parsec-agent-project/step2/main.c new file mode 100644 index 000000000..197d6623a --- /dev/null +++ b/parsec-agent-project/step2/main.c @@ -0,0 +1,59 @@ +#include +#include +#include +#include +#include + +static int dtd_hello_body(parsec_execution_stream_t *es, parsec_task_t *task) +{ + int index; + int value; + + parsec_dtd_unpack_args(task, &index, &value); + + printf("Task index %d received value %d\n", index, value); + + return PARSEC_HOOK_RETURN_DONE; +} + +int main(int argc, char **argv) +{ + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + + parsec_context_t *parsec_context = parsec_init(-1, NULL, NULL); + if(NULL == parsec_context) { + MPI_Finalize(); + return -1; + } + + parsec_taskpool_t *tp = parsec_dtd_taskpool_new(); + + parsec_dtd_create_task_class(tp, "hello_task", + sizeof(int), PARSEC_VALUE, + sizeof(int), PARSEC_VALUE, + PARSEC_DTD_ARG_END, + dtd_hello_body, + 0); + + parsec_context_add_taskpool(parsec_context, tp); + parsec_context_start(parsec_context); + + for(int i = 0; i < 4; i++) { + int val = i * 10; + // DTD insert task takes function pointer, priority, and device_type + parsec_dtd_insert_task(tp, dtd_hello_body, 0, + PARSEC_DEV_CPU, + "hello_task", + sizeof(int), &i, PARSEC_VALUE, + sizeof(int), &val, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + } + + parsec_taskpool_wait(tp); + parsec_taskpool_free(tp); + + parsec_fini(&parsec_context); + MPI_Finalize(); + return 0; +} diff --git a/parsec-agent-project/step3/CMakeLists.txt b/parsec-agent-project/step3/CMakeLists.txt new file mode 100644 index 000000000..a60586d4d --- /dev/null +++ b/parsec-agent-project/step3/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step3 C) + +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) + +add_executable(dtd_loop main.c) +target_include_directories(dtd_loop PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS}) +target_link_libraries(dtd_loop "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES}) diff --git a/parsec-agent-project/step3/README.md b/parsec-agent-project/step3/README.md new file mode 100644 index 000000000..8e704a361 --- /dev/null +++ b/parsec-agent-project/step3/README.md @@ -0,0 +1,15 @@ +# PaRSEC-Agent: Step 3 - Recursive Task Insertion + +## Purpose +This step demonstrates the core primitive for an agentic loop: the ability for a task to insert its successor while it is executing. + +## Key Concepts +- **parsec_dtd_get_taskpool**: Used inside a task body to retrieve the handle to the taskpool so new tasks can be inserted. +- **Recursive insertion**: Task N calls `parsec_dtd_insert_task` for Task N+1 before completing. +- **Thread Safety**: PaRSEC ensures that inserting tasks from within other tasks is thread-safe across all worker cores. + +## How to Build and Run +1. Enter the build directory: `mkdir build && cd build` +2. Configure: `cmake ..` +3. Build: `make` +4. Run: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./dtd_loop` diff --git a/parsec-agent-project/step3/main.c b/parsec-agent-project/step3/main.c new file mode 100644 index 000000000..70db4e7f2 --- /dev/null +++ b/parsec-agent-project/step3/main.c @@ -0,0 +1,71 @@ +#include +#include +#include +#include +#include + +#define MAX_TASKS 5 + +// Forward declaration of the task body +static int dtd_loop_body(parsec_execution_stream_t *es, parsec_task_t *task); + +int main(int argc, char **argv) +{ + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + + parsec_context_t *parsec_context = parsec_init(-1, NULL, NULL); + if(NULL == parsec_context) { + MPI_Finalize(); + return -1; + } + + parsec_taskpool_t *tp = parsec_dtd_taskpool_new(); + + // Define task class + parsec_dtd_create_task_class(tp, "loop_task", + sizeof(int), PARSEC_VALUE, + PARSEC_DTD_ARG_END, + dtd_loop_body, + 0); + + parsec_context_add_taskpool(parsec_context, tp); + parsec_context_start(parsec_context); + + // Start the loop with task 0 + int first_task = 0; + parsec_dtd_insert_task(tp, dtd_loop_body, 0, + PARSEC_DEV_CPU, + "loop_task", + sizeof(int), &first_task, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + + parsec_taskpool_wait(tp); + parsec_taskpool_free(tp); + + parsec_fini(&parsec_context); + MPI_Finalize(); + return 0; +} + +static int dtd_loop_body(parsec_execution_stream_t *es, parsec_task_t *task) +{ + int current_idx; + parsec_dtd_unpack_args(task, ¤t_idx); + + printf("Task N=%d executing\n", current_idx); + + if (current_idx < MAX_TASKS - 1) { + int next_idx = current_idx + 1; + printf("Task N=%d inserting Task N=%d\n", current_idx, next_idx); + + // Use parsec_dtd_get_taskpool(task) to get the taskpool + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), dtd_loop_body, 0, + PARSEC_DEV_CPU, + "loop_task", + sizeof(int), &next_idx, PARSEC_VALUE, + PARSEC_DTD_ARG_END); + } + + return PARSEC_HOOK_RETURN_DONE; +} diff --git a/parsec-agent-project/step4/CMakeLists.txt b/parsec-agent-project/step4/CMakeLists.txt new file mode 100644 index 000000000..2615bfe3f --- /dev/null +++ b/parsec-agent-project/step4/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step4 C) + +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) + +add_executable(dtd_agent main.c) +target_include_directories(dtd_agent PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS}) +target_link_libraries(dtd_agent "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES}) diff --git a/parsec-agent-project/step4/README.md b/parsec-agent-project/step4/README.md new file mode 100644 index 000000000..b7f17d815 --- /dev/null +++ b/parsec-agent-project/step4/README.md @@ -0,0 +1,16 @@ +# PaRSEC-Agent: Step 4 - Single Synchronous Agent Loop + +## Purpose +This step implements the ReAct (Reason + Act) loop logic. It uses a state object (`agent_state_t`) that is passed between tasks to maintain progress. + +## Key Concepts +- **agent_state_t**: A heap-allocated structure containing the agent's memory (thoughts, actions, and step count). +- **think_task**: Analyzes current state and decides on a tool or a final answer. +- **tool_task**: Executes a tool (stubbed) and returns control to the thinking phase. +- **finish_task**: Finalizes the agent run and frees resources. + +## How to Build and Run +1. Enter the build directory: `mkdir build && cd build` +2. Configure: `cmake ..` +3. Build: `make` +4. Run: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./dtd_agent` diff --git a/parsec-agent-project/step4/main.c b/parsec-agent-project/step4/main.c new file mode 100644 index 000000000..5c6ff79a7 --- /dev/null +++ b/parsec-agent-project/step4/main.c @@ -0,0 +1,89 @@ +#include +#include +#include +#include +#include +#include +#include + +typedef struct { + char thought[256]; + char action[64]; + char action_input[64]; + int step_count; + int finished; +} agent_state_t; + +// Stub LLM function +void stub_llm(agent_state_t* state) { + if (state->step_count < 2) { + strcpy(state->thought, "I need to check the temperature."); + strcpy(state->action, "TemperatureTool"); + strcpy(state->action_input, "London"); + } else { + strcpy(state->thought, "The temperature is 20C."); + strcpy(state->action, "Final Answer"); + strcpy(state->action_input, "20C"); + state->finished = 1; + } +} + +// Forward declarations +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task); + +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + + printf("[Think] Step %d\n", state->step_count); + stub_llm(state); + + if (strcmp(state->action, "Final Answer") == 0) { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), finish_task_body, 0, PARSEC_DEV_CPU, "finish_task", sizeof(agent_state_t*), &state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + } else { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), tool_task_body, 0, PARSEC_DEV_CPU, "tool_task", sizeof(agent_state_t*), &state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + } + return PARSEC_HOOK_RETURN_DONE; +} + +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Tool] Action: %s, Input: %s\n", state->action, state->action_input); + state->step_count++; + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), think_task_body, 0, PARSEC_DEV_CPU, "think_task", sizeof(agent_state_t*), &state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + return PARSEC_HOOK_RETURN_DONE; +} + +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Finish] Final Answer: %s (Steps: %d)\n", state->action_input, state->step_count); + free(state); + return PARSEC_HOOK_RETURN_DONE; +} + +int main(int argc, char **argv) { + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + parsec_context_t *ctx = parsec_init(-1, NULL, NULL); + parsec_taskpool_t *tp = parsec_dtd_taskpool_new(); + + parsec_dtd_create_task_class(tp, "think_task", sizeof(agent_state_t*), PARSEC_VALUE, PARSEC_DTD_ARG_END, think_task_body, 0); + parsec_dtd_create_task_class(tp, "tool_task", sizeof(agent_state_t*), PARSEC_VALUE, PARSEC_DTD_ARG_END, tool_task_body, 0); + parsec_dtd_create_task_class(tp, "finish_task", sizeof(agent_state_t*), PARSEC_VALUE, PARSEC_DTD_ARG_END, finish_task_body, 0); + + parsec_context_add_taskpool(ctx, tp); + parsec_context_start(ctx); + + agent_state_t* initial_state = calloc(1, sizeof(agent_state_t)); + parsec_dtd_insert_task(tp, think_task_body, 0, PARSEC_DEV_CPU, "think_task", sizeof(agent_state_t*), &initial_state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + + parsec_taskpool_wait(tp); + parsec_taskpool_free(tp); + parsec_fini(&ctx); + MPI_Finalize(); + return 0; +} diff --git a/parsec-agent-project/step5/CMakeLists.txt b/parsec-agent-project/step5/CMakeLists.txt new file mode 100644 index 000000000..e38b7ebd8 --- /dev/null +++ b/parsec-agent-project/step5/CMakeLists.txt @@ -0,0 +1,13 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step5 C) + +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) +find_package(CURL REQUIRED) + +add_executable(dtd_agent_ollama main.c) +target_include_directories(dtd_agent_ollama PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS} ${CURL_INCLUDE_DIRS}) +target_link_libraries(dtd_agent_ollama "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES} ${CURL_LIBRARIES}) diff --git a/parsec-agent-project/step5/README.md b/parsec-agent-project/step5/README.md new file mode 100644 index 000000000..be42721e6 --- /dev/null +++ b/parsec-agent-project/step5/README.md @@ -0,0 +1,16 @@ +# PaRSEC-Agent: Step 5 - Ollama Integration (Synchronous) + +## Purpose +This step replaces the stubbed LLM logic with real calls to a local Ollama server using libcurl. + +## Key Concepts +- **libcurl**: Used to perform HTTP POST requests to `http://127.0.0.1:11434/api/generate`. +- **JSON Parsing**: Demonstrates how to parse LLM responses manually using standard C functions (`strstr`). +- **Synchronous Blocking**: In this step, the compute core is blocked while waiting for the LLM response, which is inefficient for HPC. + +## How to Build and Run +1. Ensure Ollama is running: `ollama serve` and `ollama pull tinyllama`. +2. Enter the build directory: `mkdir build && cd build` +3. Configure: `cmake ..` +4. Build: `make` +5. Run: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./dtd_agent_ollama` diff --git a/parsec-agent-project/step5/main.c b/parsec-agent-project/step5/main.c new file mode 100644 index 000000000..f941220f2 --- /dev/null +++ b/parsec-agent-project/step5/main.c @@ -0,0 +1,150 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_STEPS 5 + +typedef struct { + char thought[256]; + char action[64]; + char action_input[64]; + int step_count; + int finished; +} agent_state_t; + +size_t write_callback(char *ptr, size_t size, size_t nmemb, void *userdata) { + size_t realsize = size * nmemb; + char *buffer = (char *)userdata; + strncat(buffer, ptr, 4095 - strlen(buffer)); + return realsize; +} + +void call_ollama(agent_state_t* state) { + if (state->step_count >= MAX_STEPS) { + printf("Safety limit reached!\n"); + state->finished = 1; + strcpy(state->action, "Final Answer"); + strcpy(state->action_input, "Limit reached"); + return; + } + + CURL *curl = curl_easy_init(); + if (!curl) return; + + char response_buffer[4096] = {0}; + char prompt[512]; + snprintf(prompt, sizeof(prompt), + "Answer with either 'Action:ToolName' or 'Final Answer:Value'. Current thought: %s.", + state->thought); + + char post_data[2048]; + snprintf(post_data, sizeof(post_data), + "{\"model\":\"tinyllama\",\"prompt\":\"%s\",\"stream\":false}", prompt); + + curl_easy_setopt(curl, CURLOPT_URL, "http://127.0.0.1:11434/api/generate"); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post_data); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, response_buffer); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 300L); // Increase timeout to 5 minutes + + CURLcode res = curl_easy_perform(curl); + if (res != CURLE_OK) { + printf("Curl failed: %s. (Did 5 minute timeout expire?)\n", curl_easy_strerror(res)); + strcpy(state->action, "Final Answer"); + state->finished = 1; + } else { + printf("DEBUG: Raw Response received (length: %zu)\n", strlen(response_buffer)); + // Simple manual parsing for the 'response' field + char *ptr = strstr(response_buffer, "\"response\":\""); + if (ptr) { + ptr += 12; // Skip "response":" + char *end = strchr(ptr, '\"'); + if (end) { + int len = (int)(end - ptr); + if (len > 255) len = 255; + strncpy(state->thought, ptr, len); + state->thought[len] = '\0'; + printf("LLM said: %s\n", state->thought); + } + } + + // Very basic parsing for ReAct actions + if (strstr(state->thought, "Final Answer")) { + strcpy(state->action, "Final Answer"); + strcpy(state->action_input, "Success"); + state->finished = 1; + } else { + // Default to next step + strcpy(state->action, "Tool"); + strcpy(state->action_input, "Input"); + } + } + curl_easy_cleanup(curl); +} + +// Forward declarations +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task); + +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + + printf("[Think] Step %d\n", state->step_count); + call_ollama(state); + + if (state->finished) { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), finish_task_body, 0, PARSEC_DEV_CPU, "finish_task", sizeof(agent_state_t*), &state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + } else { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), tool_task_body, 0, PARSEC_DEV_CPU, "tool_task", sizeof(agent_state_t*), &state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + } + return PARSEC_HOOK_RETURN_DONE; +} + +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Tool] Action: %s, Input: %s\n", state->action, state->action_input); + state->step_count++; + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), think_task_body, 0, PARSEC_DEV_CPU, "think_task", sizeof(agent_state_t*), &state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + return PARSEC_HOOK_RETURN_DONE; +} + +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Finish] Final Answer: %s\n", state->action_input); + free(state); + return PARSEC_HOOK_RETURN_DONE; +} + +int main(int argc, char **argv) { + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + curl_global_init(CURL_GLOBAL_ALL); + parsec_context_t *ctx = parsec_init(-1, NULL, NULL); + parsec_taskpool_t *tp = parsec_dtd_taskpool_new(); + + parsec_dtd_create_task_class(tp, "think_task", sizeof(agent_state_t*), PARSEC_VALUE, PARSEC_DTD_ARG_END, think_task_body, 0); + parsec_dtd_create_task_class(tp, "tool_task", sizeof(agent_state_t*), PARSEC_VALUE, PARSEC_DTD_ARG_END, tool_task_body, 0); + parsec_dtd_create_task_class(tp, "finish_task", sizeof(agent_state_t*), PARSEC_VALUE, PARSEC_DTD_ARG_END, finish_task_body, 0); + + parsec_context_add_taskpool(ctx, tp); + parsec_context_start(ctx); + + agent_state_t* initial_state = calloc(1, sizeof(agent_state_t)); + parsec_dtd_insert_task(tp, think_task_body, 0, PARSEC_DEV_CPU, "think_task", sizeof(agent_state_t*), &initial_state, PARSEC_VALUE, PARSEC_DTD_ARG_END); + + parsec_taskpool_wait(tp); + parsec_taskpool_free(tp); + parsec_fini(&ctx); + curl_global_cleanup(); + MPI_Finalize(); + return 0; +} diff --git a/parsec-agent-project/step6/CMakeLists.txt b/parsec-agent-project/step6/CMakeLists.txt new file mode 100644 index 000000000..50cd98ece --- /dev/null +++ b/parsec-agent-project/step6/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step6 C) + +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) +find_package(CURL REQUIRED) +find_package(Threads REQUIRED) + +add_executable(dtd_agent_async main.c) +target_include_directories(dtd_agent_async PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS} ${CURL_INCLUDE_DIRS}) +target_link_libraries(dtd_agent_async "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES} ${CURL_LIBRARIES} Threads::Threads) diff --git a/parsec-agent-project/step6/README.md b/parsec-agent-project/step6/README.md new file mode 100644 index 000000000..a63b4b34b --- /dev/null +++ b/parsec-agent-project/step6/README.md @@ -0,0 +1,15 @@ +# PaRSEC-Agent: Step 6 - Asynchronous I/O Bridge + +## Purpose +This step solves the "blocking I/O problem" by moving the LLM call to a background pthread. It uses a "Wait and Poll" pattern to safely re-insert tasks into PaRSEC without crashing the runtime. + +## Key Concepts +- **Background I/O Thread**: A dedicated pthread handles all libcurl calls. +- **Polling Task**: Instead of inserting a task directly from an external thread (which can cause segfaults), we insert a "poll" task that checks a status flag in memory. +- **Core Efficiency**: PaRSEC threads are freed immediately after offloading the request, allowing them to perform other computations while the LLM generates a response. + +## How to Build and Run +1. Enter the build directory: `mkdir build && cd build` +2. Configure: `cmake ..` +3. Build: `make` +4. Run: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./dtd_agent_async` diff --git a/parsec-agent-project/step6/main.c b/parsec-agent-project/step6/main.c new file mode 100644 index 000000000..9c41b75da --- /dev/null +++ b/parsec-agent-project/step6/main.c @@ -0,0 +1,149 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_STEPS 2 + +typedef enum { AGENT_IDLE, AGENT_THINKING, AGENT_READY } agent_status_t; + +typedef struct { + char thought[512]; + int step_count; + int finished; + agent_status_t status; +} agent_state_t; + +// Background I/O +pthread_t io_thread; +pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; +agent_state_t *request_queue[64]; +int queue_size = 0; +int io_thread_running = 1; + +// Task Prototypes +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int poll_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task); + +void* io_thread_loop(void* arg) { + while(1) { + pthread_mutex_lock(&queue_mutex); + while(queue_size == 0 && io_thread_running) pthread_cond_wait(&queue_cond, &queue_mutex); + if (!io_thread_running && queue_size == 0) { pthread_mutex_unlock(&queue_mutex); break; } + agent_state_t *state = request_queue[--queue_size]; + pthread_mutex_unlock(&queue_mutex); + + // Blocking I/O + CURL *curl = curl_easy_init(); + char post_data[2048]; + snprintf(post_data, sizeof(post_data), "{\"model\":\"tinyllama\",\"prompt\":\"Done\",\"stream\":false}"); + curl_easy_setopt(curl, CURLOPT_URL, "http://127.0.0.1:11434/api/generate"); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post_data); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 60L); + curl_easy_perform(curl); + curl_easy_cleanup(curl); + + // Signal completion + state->status = AGENT_READY; + } + return NULL; +} + +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Think] Offloading...\n"); + + state->status = AGENT_THINKING; + pthread_mutex_lock(&queue_mutex); + request_queue[queue_size++] = state; + pthread_cond_signal(&queue_cond); + pthread_mutex_unlock(&queue_mutex); + + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), poll_task_body, 0, PARSEC_DEV_CPU, "poll_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + return PARSEC_HOOK_RETURN_DONE; +} + +static int poll_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + + if (state->status == AGENT_READY) { + printf("[Poll] Result Ready!\n"); + if (state->step_count >= MAX_STEPS) { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), finish_task_body, 0, PARSEC_DEV_CPU, "finish_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + } else { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), tool_task_body, 0, PARSEC_DEV_CPU, "tool_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + } + } else { + // Yield and poll again + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), poll_task_body, 0, PARSEC_DEV_CPU, "poll_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + } + return PARSEC_HOOK_RETURN_DONE; +} + +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Tool] Step %d\n", state->step_count++); + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), think_task_body, 0, PARSEC_DEV_CPU, "think_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + return PARSEC_HOOK_RETURN_DONE; +} + +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Finish] Done.\n"); + state->finished = 1; + return PARSEC_HOOK_RETURN_DONE; +} + +int main(int argc, char **argv) { + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + curl_global_init(CURL_GLOBAL_ALL); + pthread_create(&io_thread, NULL, io_thread_loop, NULL); + + parsec_context_t *ctx = parsec_init(-1, NULL, NULL); + parsec_taskpool_t *tp = parsec_dtd_taskpool_new(); + + parsec_dtd_create_task_class(tp, "think_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, think_task_body, 0); + parsec_dtd_create_task_class(tp, "poll_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, poll_task_body, 0); + parsec_dtd_create_task_class(tp, "tool_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, tool_task_body, 0); + parsec_dtd_create_task_class(tp, "finish_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, finish_task_body, 0); + + parsec_context_add_taskpool(ctx, tp); + parsec_context_start(ctx); + + agent_state_t* state = calloc(1, sizeof(agent_state_t)); + parsec_dtd_insert_task(tp, think_task_body, 0, PARSEC_DEV_CPU, "think_task", sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + + parsec_taskpool_wait(tp); + + pthread_mutex_lock(&queue_mutex); + io_thread_running = 0; + pthread_cond_signal(&queue_cond); + pthread_mutex_unlock(&queue_mutex); + pthread_join(io_thread, NULL); + + parsec_taskpool_free(tp); + parsec_fini(&ctx); + curl_global_cleanup(); + MPI_Finalize(); + free(state); + printf("Step 6 Successful.\n"); + return 0; +} diff --git a/parsec-agent-project/step7/CMakeLists.txt b/parsec-agent-project/step7/CMakeLists.txt new file mode 100644 index 000000000..491dab197 --- /dev/null +++ b/parsec-agent-project/step7/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.21) +project(parsec-agent-step7 C) + +set(PaRSEC_ROOT "$ENV{HOME}/parsec/builddir/install") +list(APPEND CMAKE_PREFIX_PATH "${PaRSEC_ROOT}/share/cmake/parsec") + +find_package(PaRSEC REQUIRED) +find_package(MPI REQUIRED) +find_package(CURL REQUIRED) +find_package(Threads REQUIRED) + +add_executable(dtd_multi_agent main.c) +target_include_directories(dtd_multi_agent PRIVATE "${PaRSEC_ROOT}/include" ${MPI_C_INCLUDE_DIRS} ${CURL_INCLUDE_DIRS}) +target_link_libraries(dtd_multi_agent "${PaRSEC_ROOT}/lib/libparsec.so" ${MPI_C_LIBRARIES} ${CURL_LIBRARIES} Threads::Threads) diff --git a/parsec-agent-project/step7/README.md b/parsec-agent-project/step7/README.md new file mode 100644 index 000000000..b0c9fa452 --- /dev/null +++ b/parsec-agent-project/step7/README.md @@ -0,0 +1,15 @@ +# PaRSEC-Agent: Step 7 - Multi-Agent Parallelism + +## Purpose +The final step demonstrates the power of PaRSEC by running N independent AI agents in parallel across all available CPU cores. + +## Key Concepts +- **Scalability**: All agents share a single DTD taskpool and a single I/O thread. +- **Dynamic Scheduling**: PaRSEC automatically balances the agents' `think`, `poll`, and `tool` tasks across all CPU cores. +- **Core Affinity**: The logs show which core ID is executing each task, proving that the agents are running concurrently. + +## How to Build and Run +1. Enter the build directory: `mkdir build && cd build` +2. Configure: `cmake ..` +3. Build: `make` +4. Run for 4 agents: `LD_LIBRARY_PATH=$HOME/parsec/builddir/install/lib:$LD_LIBRARY_PATH mpirun -n 1 ./dtd_multi_agent 4` diff --git a/parsec-agent-project/step7/main.c b/parsec-agent-project/step7/main.c new file mode 100644 index 000000000..43470b0de --- /dev/null +++ b/parsec-agent-project/step7/main.c @@ -0,0 +1,161 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_STEPS 2 + +typedef enum { AGENT_IDLE, AGENT_THINKING, AGENT_READY } agent_status_t; + +typedef struct { + int id; + int step_count; + int finished; + agent_status_t status; +} agent_state_t; + +// Shared I/O structures +pthread_t io_thread; +pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; +agent_state_t *request_queue[256]; +int queue_size = 0; +int io_thread_running = 1; + +// Task Prototypes +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int poll_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task); +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task); + +void* io_thread_loop(void* arg) { + while(1) { + pthread_mutex_lock(&queue_mutex); + while(queue_size == 0 && io_thread_running) pthread_cond_wait(&queue_cond, &queue_mutex); + if (!io_thread_running && queue_size == 0) { pthread_mutex_unlock(&queue_mutex); break; } + agent_state_t *state = request_queue[--queue_size]; + pthread_mutex_unlock(&queue_mutex); + + // Blocking I/O (shared among all agents) + CURL *curl = curl_easy_init(); + char post_data[512]; + snprintf(post_data, sizeof(post_data), "{\"model\":\"tinyllama\",\"prompt\":\"Agent %d ready\",\"stream\":false}", state->id); + curl_easy_setopt(curl, CURLOPT_URL, "http://127.0.0.1:11434/api/generate"); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post_data); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 60L); + curl_easy_perform(curl); + curl_easy_cleanup(curl); + + state->status = AGENT_READY; + } + return NULL; +} + +static int think_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + + // es->virtual_process->core_id gives us the core index executing this task + printf("[Core %d] Agent %d: Offloading Think Step %d\n", es->core_id, state->id, state->step_count); + + state->status = AGENT_THINKING; + pthread_mutex_lock(&queue_mutex); + request_queue[queue_size++] = state; + pthread_cond_signal(&queue_cond); + pthread_mutex_unlock(&queue_mutex); + + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), poll_task_body, 0, PARSEC_DEV_CPU, "poll_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + return PARSEC_HOOK_RETURN_DONE; +} + +static int poll_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + + if (state->status == AGENT_READY) { + if (state->step_count >= MAX_STEPS) { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), finish_task_body, 0, PARSEC_DEV_CPU, "finish_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + } else { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), tool_task_body, 0, PARSEC_DEV_CPU, "tool_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + } + } else { + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), poll_task_body, 0, PARSEC_DEV_CPU, "poll_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + } + return PARSEC_HOOK_RETURN_DONE; +} + +static int tool_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Core %d] Agent %d: Executing Tool Step %d\n", es->core_id, state->id, state->step_count); + state->step_count++; + parsec_dtd_insert_task(parsec_dtd_get_taskpool(task), think_task_body, 0, PARSEC_DEV_CPU, "think_task", + sizeof(agent_state_t*), state, PARSEC_REF, PARSEC_DTD_ARG_END); + return PARSEC_HOOK_RETURN_DONE; +} + +static int finish_task_body(parsec_execution_stream_t *es, parsec_task_t *task) { + agent_state_t* state; + parsec_dtd_unpack_args(task, &state); + printf("[Core %d] Agent %d: Finished!\n", es->core_id, state->id); + state->finished = 1; + return PARSEC_HOOK_RETURN_DONE; +} + +int main(int argc, char **argv) { + int N = 1; + if (argc > 1) N = atoi(argv[1]); + + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + curl_global_init(CURL_GLOBAL_ALL); + pthread_create(&io_thread, NULL, io_thread_loop, NULL); + + parsec_context_t *ctx = parsec_init(-1, NULL, NULL); + parsec_taskpool_t *tp = parsec_dtd_taskpool_new(); + + parsec_dtd_create_task_class(tp, "think_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, think_task_body, 0); + parsec_dtd_create_task_class(tp, "poll_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, poll_task_body, 0); + parsec_dtd_create_task_class(tp, "tool_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, tool_task_body, 0); + parsec_dtd_create_task_class(tp, "finish_task", sizeof(agent_state_t*), PARSEC_REF, PARSEC_DTD_ARG_END, finish_task_body, 0); + + parsec_context_add_taskpool(ctx, tp); + + // Initial agents insertion before start + agent_state_t **agents = malloc(N * sizeof(agent_state_t*)); + for (int i = 0; i < N; i++) { + agents[i] = calloc(1, sizeof(agent_state_t)); + agents[i]->id = i; + parsec_dtd_insert_task(tp, think_task_body, 0, PARSEC_DEV_CPU, "think_task", sizeof(agent_state_t*), agents[i], PARSEC_REF, PARSEC_DTD_ARG_END); + } + + printf("Starting %d agents...\n", N); + parsec_context_start(ctx); + + parsec_taskpool_wait(tp); + + pthread_mutex_lock(&queue_mutex); + io_thread_running = 0; + pthread_cond_signal(&queue_cond); + pthread_mutex_unlock(&queue_mutex); + pthread_join(io_thread, NULL); + + parsec_taskpool_free(tp); + parsec_fini(&ctx); + curl_global_cleanup(); + MPI_Finalize(); + for(int i=0; i