diff --git a/.github/requirements.txt b/.github/requirements.txt index f490edbc..9d4c19ae 100644 --- a/.github/requirements.txt +++ b/.github/requirements.txt @@ -1,24 +1,26 @@ -blinker==1.9.0 -cffi==1.17.1 -click==8.1.8 +cffi==2.0.0 colorlog==6.9.0 -coverage==7.8.0 -cryptography==44.0.2 -Flask==3.1.1 +coverage==7.10.7 +cryptography==46.0.2 gcovr==8.4 +grpcio==1.75.1 +grpcio-tools==1.75.1 +imageio==2.31.1 imutils==0.5.4 -itsdangerous==2.2.0 Jinja2==3.1.6 -lxml==5.3.2 -MarkupSafe==3.0.2 +lxml==6.0.2 +MarkupSafe==3.0.3 numpy==1.26.4 +opencv-contrib-python==4.6.0.66 +opencv-python==4.10.0.82 opencv-python-headless==4.11.0.86 -pillow==11.1.0 -protobuf==4.25.8 -pycparser==2.22 -Pygments==2.19.1 -pyzmq==26.4.0 -scipy==1.15.2 +pillow==11.3.0 +protobuf==6.31.1 +pycparser==2.23 +Pygments==2.19.2 +pyzmq==27.1.0 +scipy==1.16.2 +setuptools==80.9.0 sk-video==1.1.10 -Werkzeug==3.1.3 +typing_extensions==4.15.0 zmq==0.0.0 diff --git a/.github/scripts/Dockerfile.checkin b/.github/scripts/Dockerfile.checkin index dc41e9d5..d16d16ce 100644 --- a/.github/scripts/Dockerfile.checkin +++ b/.github/scripts/Dockerfile.checkin @@ -21,7 +21,7 @@ ENV DEBIAN_FRONTEND=noninteractive ENV DEBCONF_NOWARNINGS="yes" ENV PYTHON_BASE="3.12" ENV PYTHON_VERSION="${PYTHON_BASE}.3" -ENV PROTOBUF_VERSION="25.8" +ENV PROTOBUF_VERSION="6.31.1" ENV NUMPY_MIN_VERSION="1.26.0" ENV VIRTUAL_ENV=/opt/venv @@ -73,13 +73,17 @@ ENV PATH="$VIRTUAL_ENV/bin:$PATH" # Pull and Install Dependencies WORKDIR /dependencies -ENV AUTOCONF_VERSION="2.71" \ +ENV ABSEIL_VERSION="20250512.1" \ + AUTOCONF_VERSION="2.71" \ AWS_SDK_VERSION="1.11.336" \ CMAKE_VERSION="v3.28.5" \ FAISS_VERSION="v1.9.0" \ + GRPC_VERSION="v1.75.1" \ + GTEST_VERSION="52eb8108c5bdec04579160ae17225d66034bd723" \ LIBEDIT_VERSION="20230828-3.1" \ OPENCV_VERSION="4.9.0" \ PEG_VERSION="0.1.19" \ + PROTOBUF_VERSION_COMMIT="74211c0dfc2777318ab53c2cd2c317a2ef9012de" \ TILEDB_VERSION="2.14.1" \ VALIJSON_VERSION="v0.6" @@ -89,27 +93,49 @@ RUN git clone --branch ${CMAKE_VERSION} https://github.com/Kitware/CMake.git /de cd /dependencies/CMake && ./bootstrap && make ${BUILD_THREADS} && \ make install DESTDIR=/opt/dist && make install -# PROTOBUF & ITS DEPENDENCIES +# PROTOBUF & ITS DEPENDENCIES (GOOGLETEST, ABSEIL-CPP) # hadolint ignore=DL3003,SC2086 -RUN git clone -b "v${PROTOBUF_VERSION}" --recurse-submodules https://github.com/protocolbuffers/protobuf.git /dependencies/protobuf && \ - cd /dependencies/protobuf/third_party/googletest && mkdir build && cd build/ && \ - cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ - -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ - -DBUILD_GMOCK=ON -DCMAKE_CXX_STANDARD=17 .. && \ - make ${BUILD_THREADS} && make install && \ - cd /dependencies/protobuf/third_party/abseil-cpp && mkdir build && cd build && \ - cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ - -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local -DABSL_BUILD_TESTING=ON \ - -DABSL_USE_EXTERNAL_GOOGLETEST=ON \ - -DABSL_FIND_GOOGLETEST=ON -DCMAKE_CXX_STANDARD=17 .. && \ - make ${BUILD_THREADS} && make install && ldconfig /opt/dist/usr/local/lib && \ - cd /dependencies/protobuf && \ - cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ - -DCMAKE_CXX_STANDARD=17 -Dprotobuf_BUILD_SHARED_LIBS=ON \ - -Dprotobuf_ABSL_PROVIDER=package \ - -Dprotobuf_BUILD_TESTS=ON \ - -Dabsl_DIR=/opt/dist/usr/local/lib/cmake/absl . && \ - make ${BUILD_THREADS} && make install +RUN git clone https://github.com/google/googletest.git /dependencies/googletest && \ + cd /dependencies/googletest && git checkout ${GTEST_VERSION} && \ + mkdir build && cd build && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ + -DBUILD_GMOCK=ON -DCMAKE_CXX_STANDARD=17 .. && \ + make ${BUILD_THREADS} && make install && \ + git clone -b ${ABSEIL_VERSION} https://github.com/abseil/abseil-cpp.git /dependencies/abseil && \ + cd /dependencies/abseil && mkdir build && cd build && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ + -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local -DABSL_BUILD_TESTING=ON \ + -DABSL_USE_EXTERNAL_GOOGLETEST=ON \ + -DABSL_FIND_GOOGLETEST=ON -DCMAKE_CXX_STANDARD=17 .. && \ + make ${BUILD_THREADS} && make install && ldconfig /opt/dist/usr/local/lib && \ + git clone --recurse-submodules https://github.com/protocolbuffers/protobuf.git /dependencies/protobuf && \ + cd /dependencies/protobuf && git checkout ${PROTOBUF_VERSION_COMMIT} && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ + -DCMAKE_CXX_STANDARD=17 -Dprotobuf_BUILD_SHARED_LIBS=ON \ + -Dprotobuf_ABSL_PROVIDER=package \ + -Dprotobuf_GTEST_PROVIDER=package \ + -Dprotobuf_BUILD_TESTS=ON \ + -Dabsl_DIR=/opt/dist/usr/local/lib/cmake/absl . && \ + make ${BUILD_THREADS} && make install + +# AUTOCONF VERSION FOR NEO4J +# hadolint ignore=DL3003,SC2086 +RUN curl -L -O http://ftpmirror.gnu.org/autoconf/autoconf-${AUTOCONF_VERSION}.tar.gz && \ + tar -xzf autoconf-${AUTOCONF_VERSION}.tar.gz && cd autoconf-${AUTOCONF_VERSION} && \ + ./configure && make ${BUILD_THREADS} && make install DESTDIR=/opt/dist && make install + +# gRPC +RUN ldconfig && git clone -b ${GRPC_VERSION} --depth 1 --recursive https://github.com/grpc/grpc /dependencies/grpc && \ + cd /dependencies/grpc && \ + mkdir -p cmake/build && cd cmake/build && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ + -DCMAKE_CXX_STANDARD=17 -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF \ + -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ + -DgRPC_ABSL_PROVIDER=package \ + -DgRPC_PROTOBUF_PROVIDER=package \ + ../.. && \ + cmake --build . -- -j && cmake --install . # OPENCV # hadolint ignore=DL3003,SC2086 @@ -148,12 +174,6 @@ RUN curl -L -O https://github.com/TileDB-Inc/TileDB/archive/refs/tags/${TILEDB_V -DBUILD_ONLY="s3" -DCUSTOM_MEMORY_MANAGEMENT=OFF -DENABLE_TESTING=OFF && \ make ${BUILD_THREADS} && make install DESTDIR=/opt/dist && make install -# AUTOCONF VERSION FOR NEO4J -# hadolint ignore=DL3003,SC2086 -RUN curl -O https://ftp.gnu.org/gnu/autoconf/autoconf-${AUTOCONF_VERSION}.tar.xz && \ - tar -xf autoconf-${AUTOCONF_VERSION}.tar.xz && cd autoconf-${AUTOCONF_VERSION} && \ - ./configure && make ${BUILD_THREADS} && make install DESTDIR=/opt/dist && make install - # LIB-OMNI FOR NEO4J QUERY HANDLER # hadolint ignore=DL3003,SC2086 RUN curl -L -O https://github.com/gpakosz/peg/releases/download/${PEG_VERSION}/peg-${PEG_VERSION}.tar.gz && \ @@ -239,6 +259,9 @@ COPY --from=build /usr/include/libwebsocket[s] /usr/include/libwebsockets COPY --from=build /usr/local/lib/libkubernetes.s[o] /usr/local/lib/libkubernetes.so COPY --from=build /usr/local/lib/libyaml.s[o] /usr/local/lib/libyaml.so COPY --from=build /usr/lib/x86_64-linux-gnu/libwebsockets.s[o] /usr/lib/x86_64-linux-gnu/libwebsockets.so +COPY --from=build /usr/local/include/ /usr/local/include/ +COPY --from=build /usr/local/bin /usr/local/bin +COPY --from=build /usr/local/lib/ /usr/local/lib/ ENV PATH="$VIRTUAL_ENV/bin:$PATH" # hadolint ignore=DL3008,SC2086 @@ -253,11 +276,12 @@ RUN apt-get update -y && apt-get upgrade -y && \ apt-get --purge remove -y python3.11 && apt-get autoremove -y && \ apt-get clean && rm -rf /var/lib/apt/lists/* && \ echo "/usr/local/lib" >> /etc/ld.so.conf.d/all-libs.conf && ldconfig && \ - python3 -m pip install --no-cache-dir "numpy>=${NUMPY_MIN_VERSION},<2.0.0" "protobuf==4.${PROTOBUF_VERSION}" \ + python3 -m pip install --no-cache-dir "numpy>=${NUMPY_MIN_VERSION},<2.0.0" "protobuf==${PROTOBUF_VERSION}" \ "coverage>=7.3.1" "cryptography>=44.0.1" -# COVERAGE TESTING WORKDIR /vdms + +# COVERAGE TESTING # hadolint ignore=DL3008,SC2086 RUN if [ "${BUILD_COVERAGE}" = "ON" ]; then \ apt-get update -y ; \ diff --git a/.github/scripts/setup_vdms.sh b/.github/scripts/setup_vdms.sh index a63e5a1d..c38fc2b8 100755 --- a/.github/scripts/setup_vdms.sh +++ b/.github/scripts/setup_vdms.sh @@ -190,15 +190,19 @@ fi ####################################################################################################################### # INSTALL DEPENDENCIES ####################################################################################################################### +ABSEIL_VERSION="20250512.1" AUTOCONF_VERSION="2.71" AWS_SDK_VERSION="1.11.336" CMAKE_VERSION="v3.28.5" FAISS_VERSION="v1.9.0" +GRPC_VERSION="v1.75.1" +GTEST_VERSION="52eb8108c5bdec04579160ae17225d66034bd723" LIBEDIT_VERSION="20230828-3.1" NUMPY_MIN_VERSION="1.26.0" OPENCV_VERSION="4.9.0" PEG_VERSION="0.1.19" -PROTOBUF_VERSION="25.8" +PROTOBUF_VERSION="6.31.1" +PROTOBUF_VERSION_COMMIT="74211c0dfc2777318ab53c2cd2c317a2ef9012de" TILEDB_VERSION="2.14.1" VALIJSON_VERSION="v0.6" @@ -213,9 +217,9 @@ make ${BUILD_THREADS} make install -# INSTALL PROTOBUF & ITS DEPENDENCIES -git clone -b "v${PROTOBUF_VERSION}" --recurse-submodules https://github.com/protocolbuffers/protobuf.git $VDMS_DEP_DIR/protobuf -cd $VDMS_DEP_DIR/protobuf/third_party/googletest +# INSTALL PROTOBUF & ITS DEPENDENCIES (GOOGLETEST, ABSEIL-CPP) +git clone https://github.com/google/googletest.git $VDMS_DEP_DIR/googletest +cd $VDMS_DEP_DIR/googletest && git checkout ${GTEST_VERSION} mkdir build && cd build/ cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=/usr/local \ @@ -223,7 +227,8 @@ cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ make ${BUILD_THREADS} make install -cd $VDMS_DEP_DIR/protobuf/third_party/abseil-cpp +git clone -b ${ABSEIL_VERSION} https://github.com/abseil/abseil-cpp.git $VDMS_DEP_DIR/abseil-cpp +cd $VDMS_DEP_DIR/abseil-cpp mkdir build && cd build cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ -DCMAKE_INSTALL_PREFIX=/usr/local -DABSL_BUILD_TESTING=ON \ @@ -233,16 +238,43 @@ make ${BUILD_THREADS} make install ldconfig /usr/local/lib -cd $VDMS_DEP_DIR/protobuf +git clone --recurse-submodules https://github.com/protocolbuffers/protobuf.git $VDMS_DEP_DIR/protobuf +cd $VDMS_DEP_DIR/protobuf && git checkout ${PROTOBUF_VERSION_COMMIT} cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/usr/local \ -DCMAKE_CXX_STANDARD=17 -Dprotobuf_BUILD_SHARED_LIBS=ON \ -Dprotobuf_ABSL_PROVIDER=package \ + -Dprotobuf_GTEST_PROVIDER=package \ -Dprotobuf_BUILD_TESTS=ON \ -Dabsl_DIR=/usr/local/lib/cmake/absl . make ${BUILD_THREADS} make install +# INSTALL AUTOCONF +curl -L -o $VDMS_DEP_DIR/autoconf-${AUTOCONF_VERSION}.tar.gz http://ftpmirror.gnu.org/autoconf/autoconf-${AUTOCONF_VERSION}.tar.gz +cd $VDMS_DEP_DIR +tar -xzf autoconf-${AUTOCONF_VERSION}.tar.gz +cd autoconf-${AUTOCONF_VERSION} +./configure +make ${BUILD_THREADS} +make install + + +# INSTALL gRPC +ldconfig +git clone -b ${GRPC_VERSION} --depth 1 --recursive https://github.com/grpc/grpc $VDMS_DEP_DIR/grpc +cd $VDMS_DEP_DIR/grpc +mkdir -p cmake/build && cd cmake/build +cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ + -DCMAKE_CXX_STANDARD=17 -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF \ + -DCMAKE_INSTALL_PREFIX=/usr/local \ + -DgRPC_ABSL_PROVIDER=package \ + -DgRPC_PROTOBUF_PROVIDER=package \ + ../.. +cmake --build . -- -j +cmake --install . + + # INSTALL OPENCV git clone https://github.com/opencv/opencv.git $VDMS_DEP_DIR/opencv cd $VDMS_DEP_DIR/opencv @@ -255,7 +287,7 @@ make install # INSTALL PYTHON PACKAGES python -m pip install --no-cache-dir "numpy>=${NUMPY_MIN_VERSION},<2.0.0" "coverage>=7.3.1" \ - "protobuf==4.${PROTOBUF_VERSION}" "cryptography>=44.0.1" + "protobuf==${PROTOBUF_VERSION}" "cryptography>=44.0.1" # INSTALL VALIJSON @@ -302,16 +334,6 @@ make ${BUILD_THREADS} make install -# INSTALL AUTOCONF -curl -L -o $VDMS_DEP_DIR/autoconf-${AUTOCONF_VERSION}.tar.xz https://ftp.gnu.org/gnu/autoconf/autoconf-${AUTOCONF_VERSION}.tar.xz -cd $VDMS_DEP_DIR -tar -xf autoconf-${AUTOCONF_VERSION}.tar.xz -cd autoconf-${AUTOCONF_VERSION} -./configure -make ${BUILD_THREADS} -make install - - # INSTALL NEO4J CLIENTS curl -L -o $VDMS_DEP_DIR/peg-${PEG_VERSION}.tar.gz https://github.com/gpakosz/peg/releases/download/${PEG_VERSION}/peg-${PEG_VERSION}.tar.gz cd $VDMS_DEP_DIR/ diff --git a/.gitignore b/.gitignore index 243e1890..99f34827 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ tmp # Jetbrains Specific .idea + +# Remote UDF Temps +remote_function/entity_pb2* \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index bb7d5d31..c3adb02e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,12 @@ find_package( OpenCV REQUIRED ) find_package(Protobuf CONFIG REQUIRED) find_package( CURL REQUIRED ) find_package(AWSSDK REQUIRED COMPONENTS core s3) +find_package(gRPC CONFIG REQUIRED) +find_package(absl CONFIG REQUIRED) + +set(_PROTOBUF_PROTOC $) +set(_GRPC_GRPCPP gRPC::grpc++) +set(_GRPC_CPP_PLUGIN_EXECUTABLE $) include_directories(${Protobuf_INCLUDE_DIRS}) include_directories(${CMAKE_CURRENT_BINARY_DIR}) @@ -35,8 +41,9 @@ add_library(vdms_protobuf OBJECT ${CMAKE_CURRENT_SOURCE_DIR}/utils/src/protobuf/partitionerMessages.proto ${CMAKE_CURRENT_SOURCE_DIR}/utils/src/protobuf/pmgdMessages.proto ${CMAKE_CURRENT_SOURCE_DIR}/utils/src/protobuf/queryMessage.proto + ${CMAKE_CURRENT_SOURCE_DIR}/utils/src/protobuf/entity.proto ) -target_link_libraries(vdms_protobuf PUBLIC protobuf::libprotobuf) +target_link_libraries(vdms_protobuf PUBLIC protobuf::libprotobuf gRPC::grpc++) set(PROTO_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}) target_include_directories(vdms_protobuf PUBLIC "$") protobuf_generate( @@ -46,6 +53,14 @@ protobuf_generate( PROTOC_OUT_DIR "${PROTO_BINARY_DIR}" ) +protobuf_generate( + TARGET vdms_protobuf + LANGUAGE grpc + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc + PLUGIN "protoc-gen-grpc=\$" + IMPORT_DIRS "${CMAKE_CURRENT_SOURCE_DIR}/utils/src/protobuf" + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}") + if (CLIENT) add_definitions("-D CLIENT") @@ -103,11 +118,41 @@ else() src/VideoLoop.cc ) - target_link_libraries(dms vcl pmgd pmgd-util protobuf tbb tiledb vdms-utils pthread -lcurl -lzmq -lzip ${AWSSDK_LINK_LIBRARIES} neo4j-client) + target_link_libraries(dms + vcl + pmgd + pmgd-util + protobuf + tbb + tiledb + vdms-utils + pthread + -lcurl + -lzmq + -lzip + ${AWSSDK_LINK_LIBRARIES} + neo4j-client + gRPC::grpc++ + ) add_executable(vdms src/vdms.cc) - target_link_libraries(vdms dms vdms_protobuf vcl tiledb faiss flinng jsoncpp ${OpenCV_LIBS} ${AWSSDK_LINK_LIBRARIES}) + target_link_libraries(vdms + dms + vdms_protobuf + vcl + tiledb + faiss + flinng + jsoncpp + ${OpenCV_LIBS} + ${AWSSDK_LINK_LIBRARIES} + ${Protobuf_LIBRARIES} + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF} + ) + endif () message("Coverage:" ${CODE_COVERAGE}) diff --git a/client/python/setup.py b/client/python/setup.py index c5c90072..10bb6e2d 100644 --- a/client/python/setup.py +++ b/client/python/setup.py @@ -5,11 +5,11 @@ setuptools.setup( name="vdms", - version="0.0.21", + version="0.0.22", author="Chaunté W. Lacewell", author_email="chaunte.w.lacewell@intel.com", description="VDMS Client Module", - install_requires=["protobuf==4.25.8"], + install_requires=["protobuf==6.31.1"], long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/IntelLabs/vdms", diff --git a/client/python/vdms/queryMessage_pb2.py b/client/python/vdms/queryMessage_pb2.py index 4bd962f5..ec59160d 100644 --- a/client/python/vdms/queryMessage_pb2.py +++ b/client/python/vdms/queryMessage_pb2.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: queryMessage.proto +# Protobuf Python Version: 6.31.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'queryMessage.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -18,8 +29,8 @@ _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'queryMessage_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None _globals['_QUERYMESSAGE']._serialized_start=38 _globals['_QUERYMESSAGE']._serialized_end=81 # @@protoc_insertion_point(module_scope) diff --git a/docker/base/Dockerfile b/docker/base/Dockerfile index 34bcce00..bf7e4391 100644 --- a/docker/base/Dockerfile +++ b/docker/base/Dockerfile @@ -21,7 +21,7 @@ ENV DEBIAN_FRONTEND=noninteractive ENV DEBCONF_NOWARNINGS="yes" ENV PYTHON_BASE="3.12" ENV PYTHON_VERSION="${PYTHON_BASE}.3" -ENV PROTOBUF_VERSION="25.8" +ENV PROTOBUF_VERSION="6.31.1" ENV NUMPY_MIN_VERSION="1.26.0" ENV VIRTUAL_ENV=/opt/venv @@ -73,13 +73,17 @@ ENV PATH="$VIRTUAL_ENV/bin:$PATH" # Pull and Install Dependencies WORKDIR /dependencies -ENV AUTOCONF_VERSION="2.71" \ +ENV ABSEIL_VERSION="20250512.1" \ + AUTOCONF_VERSION="2.71" \ AWS_SDK_VERSION="1.11.336" \ CMAKE_VERSION="v3.28.5" \ FAISS_VERSION="v1.9.0" \ + GRPC_VERSION="v1.75.1" \ + GTEST_VERSION="52eb8108c5bdec04579160ae17225d66034bd723" \ LIBEDIT_VERSION="20230828-3.1" \ OPENCV_VERSION="4.9.0" \ PEG_VERSION="0.1.19" \ + PROTOBUF_VERSION_COMMIT="74211c0dfc2777318ab53c2cd2c317a2ef9012de" \ TILEDB_VERSION="2.14.1" \ VALIJSON_VERSION="v0.6" @@ -89,27 +93,49 @@ RUN git clone --branch ${CMAKE_VERSION} https://github.com/Kitware/CMake.git /de cd /dependencies/CMake && ./bootstrap && make ${BUILD_THREADS} && \ make install DESTDIR=/opt/dist && make install -# PROTOBUF & ITS DEPENDENCIES +# PROTOBUF & ITS DEPENDENCIES (GOOGLETEST, ABSEIL-CPP) # hadolint ignore=DL3003,SC2086 -RUN git clone -b "v${PROTOBUF_VERSION}" --recurse-submodules https://github.com/protocolbuffers/protobuf.git /dependencies/protobuf && \ - cd /dependencies/protobuf/third_party/googletest && mkdir build && cd build/ && \ - cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ - -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ - -DBUILD_GMOCK=ON -DCMAKE_CXX_STANDARD=17 .. && \ - make ${BUILD_THREADS} && make install && \ - cd /dependencies/protobuf/third_party/abseil-cpp && mkdir build && cd build && \ - cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ - -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local -DABSL_BUILD_TESTING=ON \ - -DABSL_USE_EXTERNAL_GOOGLETEST=ON \ - -DABSL_FIND_GOOGLETEST=ON -DCMAKE_CXX_STANDARD=17 .. && \ - make ${BUILD_THREADS} && make install && ldconfig /opt/dist/usr/local/lib && \ - cd /dependencies/protobuf && \ - cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ - -DCMAKE_CXX_STANDARD=17 -Dprotobuf_BUILD_SHARED_LIBS=ON \ - -Dprotobuf_ABSL_PROVIDER=package \ - -Dprotobuf_BUILD_TESTS=ON \ - -Dabsl_DIR=/opt/dist/usr/local/lib/cmake/absl . && \ - make ${BUILD_THREADS} && make install +RUN git clone https://github.com/google/googletest.git /dependencies/googletest && \ + cd /dependencies/googletest && git checkout ${GTEST_VERSION} && \ + mkdir build && cd build && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ + -DBUILD_GMOCK=ON -DCMAKE_CXX_STANDARD=17 .. && \ + make ${BUILD_THREADS} && make install && \ + git clone -b ${ABSEIL_VERSION} https://github.com/abseil/abseil-cpp.git /dependencies/abseil && \ + cd /dependencies/abseil && mkdir build && cd build && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ + -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local -DABSL_BUILD_TESTING=ON \ + -DABSL_USE_EXTERNAL_GOOGLETEST=ON \ + -DABSL_FIND_GOOGLETEST=ON -DCMAKE_CXX_STANDARD=17 .. && \ + make ${BUILD_THREADS} && make install && ldconfig /opt/dist/usr/local/lib && \ + git clone --recurse-submodules https://github.com/protocolbuffers/protobuf.git /dependencies/protobuf && \ + cd /dependencies/protobuf && git checkout ${PROTOBUF_VERSION_COMMIT} && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ + -DCMAKE_CXX_STANDARD=17 -Dprotobuf_BUILD_SHARED_LIBS=ON \ + -Dprotobuf_ABSL_PROVIDER=package \ + -Dprotobuf_GTEST_PROVIDER=package \ + -Dprotobuf_BUILD_TESTS=ON \ + -Dabsl_DIR=/opt/dist/usr/local/lib/cmake/absl . && \ + make ${BUILD_THREADS} && make install + +# AUTOCONF VERSION FOR NEO4J +# hadolint ignore=DL3003,SC2086 +RUN curl -L -O http://ftpmirror.gnu.org/autoconf/autoconf-${AUTOCONF_VERSION}.tar.gz && \ + tar -xzf autoconf-${AUTOCONF_VERSION}.tar.gz && cd autoconf-${AUTOCONF_VERSION} && \ + ./configure && make ${BUILD_THREADS} && make install DESTDIR=/opt/dist && make install + +# gRPC +RUN ldconfig && git clone -b ${GRPC_VERSION} --depth 1 --recursive https://github.com/grpc/grpc /dependencies/grpc && \ + cd /dependencies/grpc && \ + mkdir -p cmake/build && cd cmake/build && \ + cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ + -DCMAKE_CXX_STANDARD=17 -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF \ + -DCMAKE_INSTALL_PREFIX=/opt/dist/usr/local \ + -DgRPC_ABSL_PROVIDER=package \ + -DgRPC_PROTOBUF_PROVIDER=package \ + ../.. && \ + cmake --build . -- -j && cmake --install . # OPENCV # hadolint ignore=DL3003,SC2086 @@ -148,12 +174,6 @@ RUN curl -L -O https://github.com/TileDB-Inc/TileDB/archive/refs/tags/${TILEDB_V -DBUILD_ONLY="s3" -DCUSTOM_MEMORY_MANAGEMENT=OFF -DENABLE_TESTING=OFF && \ make ${BUILD_THREADS} && make install DESTDIR=/opt/dist && make install -# AUTOCONF VERSION FOR NEO4J -# hadolint ignore=DL3003,SC2086 -RUN curl -O https://ftp.gnu.org/gnu/autoconf/autoconf-${AUTOCONF_VERSION}.tar.xz && \ - tar -xf autoconf-${AUTOCONF_VERSION}.tar.xz && cd autoconf-${AUTOCONF_VERSION} && \ - ./configure && make ${BUILD_THREADS} && make install DESTDIR=/opt/dist && make install - # LIB-OMNI FOR NEO4J QUERY HANDLER # hadolint ignore=DL3003,SC2086 RUN curl -L -O https://github.com/gpakosz/peg/releases/download/${PEG_VERSION}/peg-${PEG_VERSION}.tar.gz && \ @@ -204,6 +224,8 @@ RUN rm -rf /dependencies /usr/local/share/doc /usr/local/share/man && \ ############################################################ # FINAL IMAGE FROM base + +# COPY FILES COPY --from=build /opt/dist / COPY --from=build /usr/local/bin/python${PYTHON_BASE} /usr/local/bin/python${PYTHON_BASE} COPY --from=build /usr/local/lib/python${PYTHON_BASE} /usr/local/lib/python${PYTHON_BASE} @@ -213,6 +235,9 @@ COPY --from=build /usr/include/libwebsocket[s] /usr/include/libwebsockets COPY --from=build /usr/local/lib/libkubernetes.s[o] /usr/local/lib/libkubernetes.so COPY --from=build /usr/local/lib/libyaml.s[o] /usr/local/lib/libyaml.so COPY --from=build /usr/lib/x86_64-linux-gnu/libwebsockets.s[o] /usr/lib/x86_64-linux-gnu/libwebsockets.so +COPY --from=build /usr/local/include/ /usr/local/include/ +COPY --from=build /usr/local/bin /usr/local/bin +COPY --from=build /usr/local/lib/ /usr/local/lib/ ENV PATH="$VIRTUAL_ENV/bin:$PATH" # hadolint ignore=DL3008,SC2086 @@ -227,11 +252,12 @@ RUN apt-get update -y && apt-get upgrade -y && \ apt-get --purge remove -y python3.11 && apt-get autoremove -y && \ apt-get clean && rm -rf /var/lib/apt/lists/* && \ echo "/usr/local/lib" >> /etc/ld.so.conf.d/all-libs.conf && ldconfig && \ - python3 -m pip install --no-cache-dir "numpy>=${NUMPY_MIN_VERSION},<2.0.0" "protobuf==4.${PROTOBUF_VERSION}" \ + python3 -m pip install --no-cache-dir "numpy>=${NUMPY_MIN_VERSION},<2.0.0" "protobuf==${PROTOBUF_VERSION}" \ "coverage>=7.3.1" "cryptography>=44.0.1" -# VDMS WORKDIR /vdms + +# VDMS # hadolint ignore=DL3003,SC2086 RUN git clone -b master --recurse-submodules https://github.com/IntelLabs/vdms.git /vdms && \ sed -i "s|java-11-openjdk|java-17-openjdk|g" /vdms/src/pmgd/java/CMakeLists.txt && \ diff --git a/docs/Install.md b/docs/Install.md index 995d24c8..02d3d81e 100644 --- a/docs/Install.md +++ b/docs/Install.md @@ -105,15 +105,17 @@ sudo make install ```
-#### **Protobuf v25.8 (4.25.8)** +#### **Protobuf v31.1 (6.31.1)** Install Protobuf (C++ and Python) which requires GoogleTest and Abseil C++ as dependencies. ```bash -PROTOBUF_VERSION="25.8" -python3 -m pip install --no-cache-dir "protobuf==4.${PROTOBUF_VERSION}" - -git clone -b v${PROTOBUF_VERSION} --recurse-submodules https://github.com/protocolbuffers/protobuf.git $VDMS_DEP_DIR/protobuf - -cd $VDMS_DEP_DIR/protobuf/third_party/googletest +ABSEIL_VERSION="20250512.1" +GTEST_VERSION="52eb8108c5bdec04579160ae17225d66034bd723" +PROTOBUF_VERSION="6.31.1" +PROTOBUF_VERSION_COMMIT="74211c0dfc2777318ab53c2cd2c317a2ef9012de" +python3 -m pip install --no-cache-dir "protobuf==${PROTOBUF_VERSION}" + +git clone https://github.com/google/googletest.git $VDMS_DEP_DIR/googletest +cd $VDMS_DEP_DIR/googletest && git checkout ${GTEST_VERSION} mkdir build && cd build cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=/usr/local \ @@ -121,7 +123,8 @@ cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release \ make ${BUILD_THREADS} sudo make install -cd $VDMS_DEP_DIR/protobuf/third_party/abseil-cpp +git clone -b ${ABSEIL_VERSION} https://github.com/abseil/abseil-cpp.git $VDMS_DEP_DIR/abseil +cd $VDMS_DEP_DIR/abseil mkdir build && cd build cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ -DCMAKE_INSTALL_PREFIX=/usr/local -DABSL_BUILD_TESTING=ON \ @@ -131,10 +134,12 @@ make ${BUILD_THREADS} sudo make install sudo ldconfig /usr/local/lib +git clone --recurse-submodules https://github.com/protocolbuffers/protobuf.git $VDMS_DEP_DIR/protobuf cd $VDMS_DEP_DIR/protobuf cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/usr/local \ -DCMAKE_CXX_STANDARD=17 -Dprotobuf_BUILD_SHARED_LIBS=ON \ -Dprotobuf_ABSL_PROVIDER=package \ + -Dprotobuf_GTEST_PROVIDER=package \ -Dprotobuf_BUILD_TESTS=ON \ -Dabsl_DIR=/usr/local/lib/cmake/absl . make ${BUILD_THREADS} @@ -142,6 +147,39 @@ sudo make install ```
+#### **Autoconf v2.71** +```bash +AUTOCONF_VERSION="2.71" +curl -L -o $VDMS_DEP_DIR/autoconf-${AUTOCONF_VERSION}.tar.gz http://ftpmirror.gnu.org/autoconf/autoconf-${AUTOCONF_VERSION}.tar.gz +cd $VDMS_DEP_DIR +tar -xzf autoconf-${AUTOCONF_VERSION}.tar.gz +cd autoconf-${AUTOCONF_VERSION} +./configure +make ${BUILD_THREADS} +sudo make install +``` +
+ +#### **gRPC v1.75.1** +Install gRPC +```bash +ldconfig +GRPC_VERSION="v1.75.1" +git clone -b ${GRPC_VERSION} --depth 1 --recursive https://github.com/grpc/grpc $VDMS_DEP_DIR/grpc +cd $VDMS_DEP_DIR/grpc +mkdir -p cmake/build +cd cmake/build +cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_SHARED_LIBS=ON \ + -DCMAKE_CXX_STANDARD=17 -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF \ + -DCMAKE_INSTALL_PREFIX=/usr/local \ + -DgRPC_ABSL_PROVIDER=package \ + -DgRPC_PROTOBUF_PROVIDER=package \ + ../.. +cmake --build . -- -j +cmake --install . +``` +
+ #### **[OpenCV](https://opencv.org/) 4.9.0** Below are instructions for installing ***OpenCV v4.9.0***. ```bash @@ -234,19 +272,6 @@ sudo make install ```
-#### **Autoconf v2.71** -```bash -AUTOCONF_VERSION="2.71" -curl -L -o $VDMS_DEP_DIR/autoconf-${AUTOCONF_VERSION}.tar.xz https://ftp.gnu.org/gnu/autoconf/autoconf-${AUTOCONF_VERSION}.tar.xz -cd $VDMS_DEP_DIR -tar -xf autoconf-${AUTOCONF_VERSION}.tar.xz -cd autoconf-${AUTOCONF_VERSION} -./configure -make ${BUILD_THREADS} -sudo make install -``` -
- #### **Neo4j Client** Below are instructions for installing ***libneo4j-omni*** which requires Peg, libcypher-parser and libedit as dependencies. ```bash diff --git a/include/vcl/GRPCEntityClient.h b/include/vcl/GRPCEntityClient.h new file mode 100644 index 00000000..2bf7badf --- /dev/null +++ b/include/vcl/GRPCEntityClient.h @@ -0,0 +1,54 @@ +#ifndef GRPC_ENTITY_CLIENT_H +#define GRPC_ENTITY_CLIENT_H + +#include +#include + +#include +#include +#include +#include +#include + +#include "Exception.h" +#include "entity.grpc.pb.h" + +class GRPCEntityClient { + public: + explicit GRPCEntityClient(std::string url); + + void ProcessEntities(const std::map& input_paths, + const std::map& output_paths, + const std::map& input_metadata, + std::map& output_metadata, + bool& success); + + private: + struct AsyncCall; + + void InitStub(); + void SendRequest(const std::string& entity_id, const std::string& input_path); + bool ReadFile(const std::string& path, std::string& out); + bool WriteFile(const std::string& path, const std::string& data); + void WaitForSlot(); + void HandleRpcs(); + + std::unique_ptr stub_; + std::string url_; + grpc::CompletionQueue cq_; + std::thread worker_; + bool worker_started_ = false; + + const std::map* output_paths_; + const std::map* input_metadata_; + std::map* output_metadata_; + bool* success_; + + std::mutex mutex_; + std::condition_variable cond_; + std::condition_variable all_done_; + int in_flight_ = 0; + int max_concurrent_tasks_; +}; + +#endif // GRPC_ENTITY_CLIENT_H \ No newline at end of file diff --git a/include/vcl/Image.h b/include/vcl/Image.h index 5652be38..4c205fe5 100644 --- a/include/vcl/Image.h +++ b/include/vcl/Image.h @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,7 @@ #include #include "Exception.h" +#include "GRPCEntityClient.h" #include "RemoteConnection.h" #include "TDBImage.h" #include "VDMSConfigHelper.h" diff --git a/include/vcl/Video.h b/include/vcl/Video.h index d475a9cd..a0d640dc 100644 --- a/include/vcl/Video.h +++ b/include/vcl/Video.h @@ -32,6 +32,8 @@ #pragma once #include +#include +#include #include #include @@ -49,7 +51,6 @@ #include "timers/TimerMap.h" #include "utils.h" #include "vcl/Image.h" -#include "zip.h" namespace VCL { diff --git a/remote_function/entity.proto b/remote_function/entity.proto new file mode 100644 index 00000000..1e1bc970 --- /dev/null +++ b/remote_function/entity.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package entity; + +service Operator { + rpc Operate (Entity) returns (Entity); +} + +message Entity { + bytes entity = 1; + bytes options = 2; +} \ No newline at end of file diff --git a/remote_function/functions/caption.py b/remote_function/functions/caption.py index e9535fc8..7e4255ba 100644 --- a/remote_function/functions/caption.py +++ b/remote_function/functions/caption.py @@ -1,38 +1,32 @@ +import imageio.v3 as iio +import skvideo.io import cv2 -import uuid import os +import uuid -def run(ipfilename, format, options, tmp_dir_path=""): - opfilename = os.path.join( - tmp_dir_path, "tmpfile" + uuid.uuid1().hex + "." + str(format) - ) - - vc = cv2.VideoCapture(ipfilename) - frame_width = int(vc.get(cv2.CAP_PROP_FRAME_WIDTH)) - frame_height = int(vc.get(cv2.CAP_PROP_FRAME_HEIGHT)) - video_fps = vc.get(cv2.CAP_PROP_FPS) - - video = cv2.VideoWriter( - opfilename, - cv2.VideoWriter_fourcc(*"mp4v"), - video_fps, - (frame_width, frame_height), - ) - - while True: - (grabbed, frame) = vc.read() - if not grabbed: - print("[INFO] no frame read from stream - exiting") - break +def run(entity, options, tmp_dir_path=""): + fname = os.path.join(tmp_dir_path, "tmpfile" + uuid.uuid1().hex + ".mp4") - label = options["text"] + label = options["text"] + video = skvideo.io.FFmpegWriter(fname) + for frame in iio.imiter(entity, format_hint=".mp4"): cv2.putText( frame, label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2 ) + video.writeFrame(frame) + + video.close() + + ebytes = "" + with open(fname, "rb") as f: + ebytes = f.read() + + # with open('bytefile.mp4', "wb") as out_file: + # out_file.write(ebytes) + + os.remove(fname) - video.write(frame) - vc.release() - video.release() + rdict = {"metadata": "None"} - return opfilename, None + return ebytes, rdict diff --git a/remote_function/functions/facedetect.py b/remote_function/functions/facedetect.py index a11ad89d..4af99ec4 100644 --- a/remote_function/functions/facedetect.py +++ b/remote_function/functions/facedetect.py @@ -1,5 +1,6 @@ import cv2 import os +import numpy as np # Get the real directory where this Python file is currentDir = os.path.realpath(os.path.dirname(__file__)) @@ -20,19 +21,23 @@ ) -def run(ipfilename, format, options, tmp_dir_path=""): +def run(entity, options): global face_cascade - if not os.path.exists(ipfilename): - raise Exception( - f"Facedetect error: File ipfilename: {ipfilename} does not exist" - ) + image_array = np.frombuffer(entity, dtype=np.uint8) - img = cv2.imread(ipfilename) + img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) faces = face_cascade.detectMultiScale(gray, 1.1, 4) for x, y, w, h in faces: cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2) - return img, None + success, encoded_img = cv2.imencode(".jpg", img) + if not success: + raise ValueError("Failed to encode image.") + ebytes = encoded_img.tobytes() + + rdict = {"metadata": "None"} + + return ebytes, rdict diff --git a/remote_function/functions/metadata.py b/remote_function/functions/metadata.py index ce2800f7..ae3e4699 100644 --- a/remote_function/functions/metadata.py +++ b/remote_function/functions/metadata.py @@ -1,7 +1,7 @@ import cv2 -import uuid -import json import os +import imageio.v3 as iio +import numpy as np # Get the real directory where this Python file is currentDir = os.path.realpath(os.path.dirname(__file__)) @@ -29,18 +29,11 @@ def facedetectbbox(frame): return faces -def run(ipfilename, format, options, tmp_dir_path=""): - # Extract metadata for video files +def run(entity, options): if options["media_type"] == "video": - vs = cv2.VideoCapture(ipfilename) frameNum = 1 metadata = {} - while True: - (grabbed, frame) = vs.read() - if not grabbed: - print("[INFO] no frame read from stream - exiting") - break - + for frame in iio.imiter(entity, format_hint=".mp4"): if options["otype"] == "face": faces = facedetectbbox(frame) if len(faces) > 0: @@ -63,7 +56,7 @@ def run(ipfilename, format, options, tmp_dir_path=""): faces = facedetectbbox(frame) if len(faces) > 0: face = faces[0] - # We use placeholder values here as an example to showcase + # We use dummy values here as an example to showcase # different values for car. tdict = { "x": int(face[0]) + 3, @@ -79,27 +72,18 @@ def run(ipfilename, format, options, tmp_dir_path=""): if frameNum == 3: break - - response = {"opFile": ipfilename, "metadata": metadata} - - jsonfile = os.path.join(tmp_dir_path, "jsonfile" + uuid.uuid1().hex + ".json") - with open(jsonfile, "w") as f: - json.dump(response, f, indent=4) - return ipfilename, jsonfile - # Extract metadata for image files + response = {"opFile": "", "metadata": metadata} else: tdict = {} - if not os.path.exists(ipfilename): - raise Exception( - f"Metadata error: File ipfilename {ipfilename} does not exist" - ) - img = cv2.imread(ipfilename) + image_array = np.frombuffer(entity, dtype=np.uint8) + + img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if options["otype"] == "face": faces = facedetectbbox(img) if len(faces) > 0: face = faces[0] - tdict = { + metadata = { "x": int(face[0]), "y": int(face[1]), "height": int(face[2]), @@ -113,7 +97,7 @@ def run(ipfilename, format, options, tmp_dir_path=""): face = faces[0] # We use placeholder values here as an example to showcase # different values for car. - tdict = { + metadata = { "x": int(face[0]) + 3, "y": int(face[1]) + 5, "height": int(face[2]) + 10, @@ -122,7 +106,7 @@ def run(ipfilename, format, options, tmp_dir_path=""): "object_det": {"color": "red"}, } - response = {"opFile": ipfilename, "metadata": tdict} + response = {"opFile": "", "metadata": tdict} + print(response) - r = json.dumps(response) - return img, r + return entity, response diff --git a/remote_function/requirements.txt b/remote_function/requirements.txt index aab16828..67a65a8b 100644 --- a/remote_function/requirements.txt +++ b/remote_function/requirements.txt @@ -1,6 +1,10 @@ -flask>=3.0.2 +grpcio==1.75.1 +grpcio-tools==1.75.1 +imageio==2.31.1 +opencv-contrib-python==4.6.0.66 +opencv-python==4.10.0.82 +opencv-python-headless>=4.9.0.80 +protobuf==6.31.1 +sk-video>=1.1.10 imutils>=0.5.4 numpy<2.0.0 -opencv-python-headless>=4.9.0.80 -pillow>=11.1.0 -sk-video>=1.1.10 \ No newline at end of file diff --git a/remote_function/udf_server.py b/remote_function/udf_server.py index 0e69ac53..0114a456 100644 --- a/remote_function/udf_server.py +++ b/remote_function/udf_server.py @@ -1,16 +1,23 @@ -from flask import Flask, request, jsonify, send_file, after_this_request -import cv2 -import numpy as np +import sys +import grpc +import entity_pb2 +import entity_pb2_grpc import json -from datetime import datetime, timezone import os -import sys -import uuid -from zipfile import ZipFile import importlib.util -from werkzeug.utils import secure_filename +import asyncio +import signal +from concurrent.futures import ProcessPoolExecutor +import multiprocessing tmp_dir_path = None +UDF_MAP = {} + +cpu_cores = multiprocessing.cpu_count() +max_workers = max(cpu_cores - 1, 1) +executor = ProcessPoolExecutor(max_workers=max_workers) + +MAX_MSG_SIZE = 500 * 1024 * 1024 # Function to dynamically import a module given its full path @@ -58,174 +65,76 @@ def setup(tmp_path): raise Exception( "setup() error: module '" + entry + "' could not be loaded" ) - globals()[module_name] = module - - -app = Flask(__name__) - - -def get_current_timestamp(): - dt = datetime.now(timezone.utc) - - utc_time = dt.replace(tzinfo=timezone.utc) - utc_timestamp = utc_time.timestamp() - - return utc_timestamp - - -@app.route("/hello", methods=["GET"]) -def hello(): - return jsonify({"response": "true"}) + UDF_MAP[module_name] = module -@app.route("/image", methods=["POST"]) -def image_api(): - global tmp_dir_path - try: - json_data = json.loads(request.form["jsonData"]) - image_data = request.files["imageData"] +def run_udf(module_name, result, options): + udf = UDF_MAP[module_name] + return udf.run(result, options) - format = json_data["format"] if "format" in json_data else "jpg" - tmpfile = secure_filename( - os.path.join(tmp_dir_path, "tmpfile" + uuid.uuid1().hex + "." + str(format)) +# gRPC Servicer +class OperatorServicer(entity_pb2_grpc.OperatorServicer): + async def Operate(self, request, context): + result = request.entity + options = json.loads(request.options.decode("utf-8")) + loop = asyncio.get_running_loop() + ebytes, rdict = await loop.run_in_executor( + executor, run_udf, options["id"], result, options ) - - image_data.save(tmpfile) - - r_img, r_meta = "", "" - - if "id" not in json_data: - raise Exception("id value was not found in json_data") - - id = json_data["id"] - - if id not in globals(): - raise Exception(f"id={id} value was not found in globals()") - - udf = globals()[id] - - if "ingestion" in json_data: - r_img, r_meta = udf.run(tmpfile, format, json_data, tmp_dir_path) - else: - r_img, _ = udf.run(tmpfile, format, json_data, tmp_dir_path) - - img_encode = cv2.imencode("." + str(format), r_img)[1] - - # Converting the image into numpy array - data_encode = np.array(img_encode) - - # Converting the array to bytes. - return_string = data_encode.tobytes() - - if r_meta != "": - return_string += ":metadata:".encode("utf-8") - return_string += r_meta.encode("utf-8") - - os.remove(tmpfile) - - if return_string == "" or return_string is None: - return "error" - - return return_string - except Exception as e: - error_message = f"Exception: {str(e)}" - print(error_message, file=sys.stderr) - return "An internal error has occurred. Please try again later." - - -@app.route("/video", methods=["POST"]) -def video_api(): - global tmp_dir_path - try: - json_data = json.loads(request.form["jsonData"]) - video_data = request.files["videoData"] - format = json_data["format"] if "format" in json_data else "mp4" - - tmpfile = secure_filename( - os.path.join(tmp_dir_path, "tmpfile" + uuid.uuid1().hex + "." + str(format)) + return entity_pb2.Entity( + entity=ebytes, options=json.dumps(rdict).encode("utf-8") ) - video_data.save(tmpfile) - - video_file, metadata_file = "", "" - - if "id" not in json_data: - raise Exception("id value was not found in json_data") - id = json_data["id"] - if id not in globals(): - raise Exception(f"id={id} value was not found in globals()") +# Graceful shutdown handler +async def shutdown(server, executor): + print("\nShutting down...") + await server.stop(5) # Allow 5 seconds to finish active RPCs + executor.shutdown(wait=True) + print("Shutdown complete.") - udf = globals()[id] - if "ingestion" in json_data: - video_file, metadata_file = udf.run( - tmpfile, format, json_data, tmp_dir_path - ) - else: - video_file, _ = udf.run(tmpfile, format, json_data, tmp_dir_path) - - response_file = os.path.join( - tmp_dir_path, "tmpfile" + uuid.uuid1().hex + ".zip" - ) - - with ZipFile(response_file, "w") as zip_object: - zip_object.write(video_file) - if metadata_file != "": - zip_object.write(metadata_file) +async def main(port): + global MAX_MSG_SIZE + # server = grpc.aio.server() + server = grpc.aio.server( + options=[ + ("grpc.max_receive_message_length", MAX_MSG_SIZE), + ("grpc.max_send_message_length", MAX_MSG_SIZE), + ] + ) + entity_pb2_grpc.add_OperatorServicer_to_server(OperatorServicer(), server) + server.add_insecure_port("[::]:{}".format(port)) + await server.start() + print("Async gRPC server (multiprocessing) started on port", port) - os.remove(tmpfile) + stop_event = asyncio.Event() - # Delete the temporary files after the response is sent - @after_this_request - def remove_tempfile(response): - try: - os.remove(response_file) - os.remove(video_file) - os.remove(metadata_file) - except Exception: - print("Warning: Some files cannot be deleted or are not present") - return response + # Handle SIGINT and SIGTERM + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, stop_event.set) - try: - return send_file( - response_file, as_attachment=True, download_name=response_file - ) - except Exception as e: - print("Error in file read:", str(e), file=sys.stderr) - return "Error in file read" - except Exception: - return "An internal error has occurred. Please try again later." - - -@app.errorhandler(400) -def handle_bad_request(e): - response = e.get_response() - response.data = json.dumps( - { - "code": e.code, - "name": e.name, - "description": e.description, - } - ) - response.content_type = "application/json" - return response + await stop_event.wait() + await shutdown(server, executor) -def main(): +if __name__ == "__main__": if sys.argv[1] is None: print("Port missing\n Correct Usage: python3 udf_server.py [tmp_path]") - elif sys.argv[2] is None: + elif len(sys.argv) < 3: print( "Warning: Path to the temporary directory is missing\nBy default the path will be the current directory" ) setup(None) - app.run(host="0.0.0.0", port=int(sys.argv[1])) + try: + asyncio.run(main(int(sys.argv[1]))) + except KeyboardInterrupt: + pass else: setup(sys.argv[2]) - app.run(host="0.0.0.0", port=int(sys.argv[1])) - - -if __name__ == "__main__": - main() + try: + asyncio.run(main(int(sys.argv[1]))) + except KeyboardInterrupt: + pass diff --git a/src/ImageLoop.cc b/src/ImageLoop.cc index 4b718d5f..d29e28e7 100644 --- a/src/ImageLoop.cc +++ b/src/ImageLoop.cc @@ -30,7 +30,6 @@ */ #include "ImageLoop.h" -#include #include "VDMSConfig.h" @@ -141,80 +140,6 @@ void ImageLoop::operationThread() noexcept { } } -size_t writeCallback(char *ip, size_t size, size_t nmemb, void *op) { - ((std::string *)op)->append((char *)ip, size * nmemb); - return size * nmemb; -} - -cv::Mat write_image(std::string readBuffer) { - std::vector vectordata(readBuffer.begin(), readBuffer.end()); - cv::Mat data_mat(vectordata, true); - cv::Mat decoded_mat(cv::imdecode(data_mat, 1)); - return decoded_mat; -} - -CURL *ImageLoop::get_easy_handle(VCL::Image *img, std::string &readBuffer) { - CURL *curl = NULL; - CURLcode res; - struct curl_slist *headers = NULL; - curl_mime *form = NULL; - curl_mimepart *field = NULL; - - Json::Value rParams = img->get_remoteOp_params(); - std::string url = rParams["url"].toStyledString().data(); - url.erase(std::remove(url.begin(), url.end(), '\n'), url.end()); - url = url.substr(1, url.size() - 2); - Json::Value options = rParams["options"]; - - curl = curl_easy_init(); - - if (curl) { - std::string imageId = img->get_image_id().data(); - form = curl_mime_init(curl); - - auto time_now = std::chrono::system_clock::now(); - std::chrono::duration utc_time = time_now.time_since_epoch(); - - VCL::Format img_format = img->get_image_format(); - std::string format = VCL::format_to_string(img_format); - - if (format == "" && options.isMember("format")) { - format = options["format"].toStyledString().data(); - format.erase(std::remove(format.begin(), format.end(), '\n'), - format.end()); - format = format.substr(1, format.size() - 2); - } else { - format = "jpg"; - } - - std::string filePath = VDMS::VDMSConfig::instance()->get_path_tmp() + - "/tempfile" + std::to_string(utc_time.count()) + - "." + format; - cv::imwrite(filePath, img->get_cvmat(false, false)); - _tempfiles.push_back(filePath); - - field = curl_mime_addpart(form); - curl_mime_name(field, "imageData"); - curl_mime_filedata(field, filePath.data()); - - field = curl_mime_addpart(form); - curl_mime_name(field, "jsonData"); - curl_mime_data(field, options.toStyledString().data(), - options.toStyledString().length()); - - // Post data - url = url + "?id=" + imageId; - curl_easy_setopt(curl, CURLOPT_URL, url.data()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); - curl_easy_setopt(curl, CURLOPT_MIMEPOST, form); - - return curl; - } - - return NULL; -} - void clear_temp_files(std::vector tempfiles) { for (std::string fPath : tempfiles) { if (std::remove(fPath.data()) != 0) { @@ -225,118 +150,72 @@ void clear_temp_files(std::vector tempfiles) { void ImageLoop::execute_remote_operations( std::vector &readBuffer) { - int flag = 0; - int start_index = 0; - int step = 10; - int end_index = readBuffer.size() > step ? step : readBuffer.size(); - std::vector responseBuffer(readBuffer.size()); - int rindex = 0; - std::vector redoBuffer; - std::vector pendingImages; try { - while (start_index != readBuffer.size()) { - CURLM *multi_handle; - CURLMsg *msg = NULL; - CURL *eh = NULL; - CURLcode return_code; - int still_running = 0, i = 0, msgs_left = 0; - int http_status_code; - char *szUrl; - - multi_handle = curl_multi_init(); - - auto start = readBuffer.begin() + start_index; - auto end = readBuffer.begin() + end_index; - - std::vector tempBuffer(start, end); - - for (VCL::Image *img : tempBuffer) { - CURL *curl = get_easy_handle(img, responseBuffer[rindex]); - rindex++; - curl_multi_add_handle(multi_handle, curl); - } + std::map input_paths; + std::map output_paths; + std::map input_metadata; + std::map output_metadata; + bool success = true; - do { - CURLMcode mc = curl_multi_perform(multi_handle, &still_running); - if (still_running) - mc = curl_multi_wait(multi_handle, NULL, 0, 1000, NULL); + std::string url; - if (mc) { - break; - } - } while (still_running); - - while ((msg = curl_multi_info_read(multi_handle, &msgs_left))) { - if (msg->msg == CURLMSG_DONE) { - eh = msg->easy_handle; - - return_code = msg->data.result; - - szUrl = NULL; - long rsize = 0; - - curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_status_code); - curl_easy_getinfo(eh, CURLINFO_EFFECTIVE_URL, &szUrl); - curl_easy_getinfo(eh, CURLINFO_REQUEST_SIZE, &rsize); - - if (http_status_code != 200) { - // Throw specific exceptions if error codes received as response. - if (http_status_code == 0) { - throw VCLException(ObjectEmpty, "Remote server is not running."); - } - if (http_status_code == 400) { - throw VCLException(ObjectEmpty, - "Invalid Request to the Remote Server."); - } else if (http_status_code == 404) { - throw VCLException(ObjectEmpty, - "Invalid URL Request. Please check the URL."); - } else if (http_status_code == 500) { - throw VCLException(ObjectEmpty, - "Exception occurred at the remote server. " - "Please check your query."); - } else if (http_status_code == 503) { - throw VCLException(ObjectEmpty, "Unable to reach remote server"); - } else { - throw VCLException(ObjectEmpty, "Remote Server error."); - } - } + for (VCL::Image *img : readBuffer) { + auto time_now = std::chrono::system_clock::now(); + std::chrono::duration utc_time = time_now.time_since_epoch(); + + Json::Value rParams = img->get_remoteOp_params(); + url = rParams["url"].toStyledString().data(); + url.erase(std::remove(url.begin(), url.end(), '\n'), url.end()); + url = url.substr(1, url.size() - 2); + + Json::Value options = rParams["options"]; + Json::StreamWriterBuilder builder; + std::string output = Json::writeString(builder, options); + + VCL::Format img_format = img->get_image_format(); + std::string format = VCL::format_to_string(img_format); + + if (format == "" && options.isMember("format")) { + format = options["format"].toStyledString().data(); + format.erase(std::remove(format.begin(), format.end(), '\n'), + format.end()); + format = format.substr(1, format.size() - 2); + } else { + format = "jpg"; + } - curl_multi_remove_handle(multi_handle, eh); - curl_easy_cleanup(eh); - } else { - fprintf(stderr, "error: after curl_multi_info_read(), CURLMsg=%d\n", - msg->msg); - } + std::string filePath = VDMS::VDMSConfig::instance()->get_path_tmp() + + "/tempfile" + std::to_string(utc_time.count()) + + "." + format; + cv::imwrite(filePath, img->get_cvmat(false, false)); + // _tempfiles.push_back(filePath); + + int fd = open(filePath.c_str(), O_RDONLY); + if (fd != -1) { + fsync(fd); + close(fd); } - tempBuffer.clear(); - start_index = end_index; - end_index = readBuffer.size() > (end_index + step) ? (end_index + step) - : readBuffer.size(); + std::string imageId = img->get_image_id().data(); + + input_paths[imageId] = filePath; + output_paths[imageId] = filePath; + input_metadata[imageId] = output; + } + GRPCEntityClient client(url); + client.ProcessEntities(input_paths, output_paths, input_metadata, output_metadata, success); + if (!success){ + throw VCLException(ObjectEmpty, + "Remote Server Error: RPC failed or connection error with url: " + url); } - rindex = -1; - for (VCL::Image *img : readBuffer) { - rindex++; - if (std::find(redoBuffer.begin(), redoBuffer.end(), - img->get_image_id().data()) != redoBuffer.end()) { - pendingImages.push_back(img); - continue; - } - int rthresh = 0; - auto t_start = std::chrono::high_resolution_clock::now(); - bool rflag = false; - while (responseBuffer[rindex].size() == 0) { - continue; - } - cv::Mat dmat = write_image(responseBuffer[rindex]); + for (VCL::Image *img : readBuffer) { + std::string imageId = img->get_image_id().data(); + cv::Mat dmat = cv::imread(output_paths[imageId], cv::IMREAD_ANYCOLOR); if (dmat.rows == 0 || dmat.cols == 0) { throw VCLException(ObjectEmpty, "Invalid response from the remote server."); } - if (dmat.empty()) { - pendingImages.push_back(img); - } img->shallow_copy_cv(dmat); img->update_op_completed(); @@ -346,14 +225,10 @@ void ImageLoop::execute_remote_operations( if (not result.second) { result.first->second = img; } - if (rindex == readBuffer.size() - 1 && pendingImages.size() == 0) { - _remote_running = false; - } - enqueue(img); } + _remote_running = false; readBuffer.clear(); - std::swap(readBuffer, pendingImages); } catch (VCL::Exception e) { VCL::Image *img = readBuffer[0]; img->set_query_error_response(e.msg); diff --git a/src/ImageLoop.h b/src/ImageLoop.h index 56a39922..f3303ad8 100644 --- a/src/ImageLoop.h +++ b/src/ImageLoop.h @@ -29,6 +29,9 @@ * */ +#include +#include + #include #include #include @@ -78,6 +81,5 @@ class ImageLoop { std::thread r_thread{&ImageLoop::remoteOperationThread, this}; void remoteOperationThread() noexcept; - CURL *get_easy_handle(VCL::Image *img, std::string &readBuffer); void execute_remote_operations(std::vector &readBuffer); }; \ No newline at end of file diff --git a/src/VideoLoop.cc b/src/VideoLoop.cc index e199f522..fd54a4cd 100644 --- a/src/VideoLoop.cc +++ b/src/VideoLoop.cc @@ -1,6 +1,5 @@ #include "VideoLoop.h" #include "vcl/Exception.h" -#include #include "VDMSConfig.h" @@ -136,228 +135,74 @@ void VideoLoop::operationThread() noexcept { } } -/** - * Write the remote response to a local file - */ -static size_t videoCallback(void *ptr, size_t size, size_t nmemb, - void *stream) { - - size_t written = fwrite(ptr, size, nmemb, (FILE *)stream); - return written; -} - -CURL *VideoLoop::get_easy_handle(VCL::Video video, - std::string response_filepath) { - - // Get the remote operations parameters shared by the client - Json::Value rParams = video.get_remoteOp_params(); - std::string url = rParams["url"].toStyledString().data(); - url.erase(std::remove(url.begin(), url.end(), '\n'), url.end()); - url = url.substr(1, url.size() - 2); - Json::Value options = rParams["options"]; - - // Initialize curl - CURL *curl = NULL; - - CURLcode res; - struct curl_slist *headers = NULL; - curl_mime *form = NULL; - curl_mimepart *field = NULL; - - curl = curl_easy_init(); - - if (curl) { - - // Create the form to be sent to the remote operation - // We send the video file and the set of remote operation paramters - // as two form fields. - form = curl_mime_init(curl); - - field = curl_mime_addpart(form); - curl_mime_name(field, "videoData"); - if (curl_mime_filedata(field, video.get_operated_video_id().data()) != - CURLE_OK) { - throw VCLException(ObjectEmpty, - "Unable to retrieve local file for remoting"); - } - - field = curl_mime_addpart(form); - curl_mime_name(field, "jsonData"); - if (curl_mime_data(field, options.toStyledString().data(), - options.toStyledString().length()) != CURLE_OK) { - throw VCLException(ObjectEmpty, - "Unable to create curl mime data for client params"); - } - - // Post data - FILE *response_file = fopen(response_filepath.data(), "wb"); - url = url + "?id=" + video.get_video_id(); - - if (curl_easy_setopt(curl, CURLOPT_URL, url.data()) != CURLE_OK) { - throw VCLException(UndefinedException, "CURL setup error with URL"); - } - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, videoCallback) != - CURLE_OK) { - throw VCLException(UndefinedException, "CURL setup error with callback"); - } - - if (response_file) { - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, response_file) != - CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error callback response file"); - } - if (curl_easy_setopt(curl, CURLOPT_MIMEPOST, form) != CURLE_OK) { - throw VCLException(UndefinedException, "CURL setup error with form"); - } - fclose(response_file); - return curl; - } - - return NULL; - } - - return NULL; -} - void VideoLoop::execute_remote_operations(std::vector &readBuffer) { - int flag = 0; - int start_index = 0; - int step = 10; - int end_index = readBuffer.size() > step ? step : readBuffer.size(); - std::vector responseBuffer; - int rindex = 0; - std::map responseFileMaps; try { - // Use multicurl to perform call to the remote API - // and receive response. We perform multiple amsll multicurl calls - // instead of a single large call to ensure that the remote server - // does not suspect an attack. - while (start_index != readBuffer.size()) { - CURLM *multi_handle; - CURLMsg *msg = NULL; - CURL *eh = NULL; - CURLcode return_code; - int still_running = 0, i = 0, msgs_left = 0; - int http_status_code; - char *szUrl; - - multi_handle = curl_multi_init(); - - auto start = readBuffer.begin() + start_index; - auto end = readBuffer.begin() + end_index; - - std::vector tempBuffer(start, end); - - for (VCL::Video video : tempBuffer) { - std::string video_id = video.get_operated_video_id(); - - Json::Value rParams = video.get_remoteOp_params(); - Json::Value options = rParams["options"]; - - std::string format = ""; - char *s = const_cast(video_id.data()); - std::string delimiter = "."; - char *p = std::strtok(s, delimiter.data()); - while (p != NULL) { - p = std::strtok(NULL, delimiter.data()); - if (p != NULL) { - format.assign(p, std::strlen(p)); - } - } - - auto time_now = std::chrono::system_clock::now(); - std::chrono::duration utc_time = time_now.time_since_epoch(); - std::string response_filepath = - VDMS::VDMSConfig::instance()->get_path_tmp() + "/rtempfile" + - std::to_string(utc_time.count()) + "." + format; - - responseBuffer.push_back(response_filepath); - CURL *curl = get_easy_handle(video, responseBuffer[rindex]); - FILE *response_file = fopen(response_filepath.data(), "wb"); - responseFileMaps.insert( - std::pair(response_filepath, response_file)); - rindex++; - curl_multi_add_handle(multi_handle, curl); - } + // Finalize the remote operation and enqueue video on local queue + std::map input_paths; + std::map output_paths; + std::map input_metadata; + std::map output_metadata; + bool success = true; - do { - CURLMcode mc = curl_multi_perform(multi_handle, &still_running); - if (still_running) - mc = curl_multi_wait(multi_handle, NULL, 0, 1000, NULL); + std::string url; - if (mc) { - break; + for (VCL::Video video : readBuffer) { + std::string video_id = video.get_operated_video_id(); + + Json::Value rParams = video.get_remoteOp_params(); + url = rParams["url"].toStyledString().data(); + url.erase(std::remove(url.begin(), url.end(), '\n'), url.end()); + url = url.substr(1, url.size() - 2); + Json::Value options = rParams["options"]; + Json::StreamWriterBuilder builder; + std::string output = Json::writeString(builder, options); + std::cout<< "URL: " << url << std::endl; + + std::string format = ""; + char *s = const_cast(video_id.data()); + std::string delimiter = "."; + char *p = std::strtok(s, delimiter.data()); + while (p != NULL) { + p = std::strtok(NULL, delimiter.data()); + if (p != NULL) { + format.assign(p, std::strlen(p)); } - } while (still_running); - - while ((msg = curl_multi_info_read(multi_handle, &msgs_left))) { - if (msg->msg == CURLMSG_DONE) { - eh = msg->easy_handle; + } - return_code = msg->data.result; + auto time_now = std::chrono::system_clock::now(); + std::chrono::duration utc_time = time_now.time_since_epoch(); + std::string response_filepath = + VDMS::VDMSConfig::instance()->get_path_tmp() + "/rtempfile" + + std::to_string(utc_time.count()) + "." + format; - // Get HTTP status code - szUrl = NULL; - long rsize = 0; - curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_status_code); - curl_easy_getinfo(eh, CURLINFO_EFFECTIVE_URL, &szUrl); - curl_easy_getinfo(eh, CURLINFO_REQUEST_SIZE, &rsize); + input_paths[video.get_operated_video_id()] = video.get_operated_video_id(); + output_paths[video.get_operated_video_id()] = response_filepath; + input_metadata[video.get_operated_video_id()] = output; - if (http_status_code != 200) { - // Throw exceptions for different error codes received from the - // remote server - if (http_status_code == 0) { - throw VCLException(ObjectEmpty, "Remote server is not running."); - } - if (http_status_code == 400) { - throw VCLException(ObjectEmpty, - "Invalid Request to the Remote Server."); - } else if (http_status_code == 404) { - throw VCLException(ObjectEmpty, - "Invalid URL Request. Please check the URL."); - } else if (http_status_code == 500) { - throw VCLException(ObjectEmpty, - "Exception occurred at the remote server. " - "Please check your query."); - } else if (http_status_code == 503) { - throw VCLException(ObjectEmpty, "Unable to reach remote server"); - } else { - throw VCLException(ObjectEmpty, "Remote Server error."); - } - } + } - curl_multi_remove_handle(multi_handle, eh); - curl_easy_cleanup(eh); - } else { - fprintf(stderr, "error: after curl_multi_info_read(), CURLMsg=%d\n", - msg->msg); - } - } + GRPCEntityClient client(url); + client.ProcessEntities(input_paths, output_paths, input_metadata, output_metadata, success); - tempBuffer.clear(); - start_index = end_index; - end_index = readBuffer.size() > (end_index + step) ? (end_index + step) - : readBuffer.size(); + if (!success){ + throw VCLException(ObjectEmpty, + "Remote Server Error: RPC failed or connection error."); } - rindex = -1; - // Finalize the remote operation and enqueue video on local queue + + for (VCL::Video video : readBuffer) { - rindex++; - fclose(responseFileMaps[responseBuffer[rindex].data()]); - video.set_operated_video_id(responseBuffer[rindex]); + std::string video_id = video.get_operated_video_id(); + video.set_operated_video_id(output_paths[video_id]); auto const result = videoMap.insert( std::pair(video.get_video_id(), video)); if (not result.second) { result.first->second = video; } - if (rindex == readBuffer.size() - 1) { - _remote_running = false; - } enqueue(video); } + _remote_running = false; readBuffer.clear(); } catch (VCL::Exception e) { // Exception occured. Terminate the event loop. diff --git a/src/VideoLoop.h b/src/VideoLoop.h index bc978432..1c93b900 100644 --- a/src/VideoLoop.h +++ b/src/VideoLoop.h @@ -29,6 +29,10 @@ * */ +#include +#include +#include + #include #include #include @@ -39,7 +43,6 @@ #include #include -#include "vcl/Image.h" #include "vcl/Video.h" class VideoLoop { @@ -129,7 +132,7 @@ class VideoLoop { * @param response_filepath Path to the local file where the remote response * file will be stored */ - CURL *get_easy_handle(VCL::Video video, std::string response_filepath); + // CURL *get_easy_handle(VCL::Video video, std::string response_filepath); /** * Execute the remote operation using multi-curl diff --git a/src/vcl/CMakeLists.txt b/src/vcl/CMakeLists.txt index 68e1aefb..858eb218 100644 --- a/src/vcl/CMakeLists.txt +++ b/src/vcl/CMakeLists.txt @@ -23,6 +23,7 @@ add_library(vcl SHARED TDBSparseDescriptorSet.cc utils.cc Video.cc + GRPCEntityClient.cc RemoteConnection.cc ../../utils/src/timers/TimerMap.cc Filter.cc diff --git a/src/vcl/GRPCEntityClient.cc b/src/vcl/GRPCEntityClient.cc new file mode 100644 index 00000000..ab7113e8 --- /dev/null +++ b/src/vcl/GRPCEntityClient.cc @@ -0,0 +1,195 @@ +#include "vcl/GRPCEntityClient.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using grpc::Channel; +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::Status; +using grpc::ChannelArguments; +using entity::Entity; +using entity::Operator; + +int MAX_SIZE_BYTES = 500*1024*1024; + +struct GRPCEntityClient::AsyncCall { + Entity response; + ClientContext context; + Status status; + std::unique_ptr> reader; + std::string entity_id; + GRPCEntityClient* parent; +}; + +GRPCEntityClient::GRPCEntityClient(std::string url) + : url_(std::move(url)) { + unsigned int concurrency = std::thread::hardware_concurrency(); + if (concurrency == 0) concurrency = 8; + max_concurrent_tasks_ = std::min(64u, concurrency * 2); +} + +void GRPCEntityClient::InitStub() { + if (!stub_) { + grpc::ChannelArguments args; + args.SetMaxReceiveMessageSize(MAX_SIZE_BYTES); + args.SetMaxSendMessageSize(MAX_SIZE_BYTES); + stub_ = Operator::NewStub( + grpc::CreateCustomChannel(url_, grpc::InsecureChannelCredentials(), args) + ); + std::cerr << "Stub initialized for URL: " << url_ << "\n"; + } +} + +void GRPCEntityClient::ProcessEntities(const std::map& input_paths, + const std::map& output_paths, + const std::map& input_metadata, + std::map& output_metadata, + bool& success) { + output_metadata_ = &output_metadata; + output_paths_ = &output_paths; + input_metadata_ = &input_metadata; + success_ = &success; + + for (const auto& [entity_id, input_path] : input_paths) { + WaitForSlot(); + SendRequest(entity_id, input_path); + } + + { + std::unique_lock lock(mutex_); + all_done_.wait(lock, [this] { return in_flight_ == 0; }); + } + + cq_.Shutdown(); + if (worker_.joinable()) { + worker_.join(); + } +} + +void GRPCEntityClient::SendRequest(const std::string& entity_id, const std::string& input_path) { + + InitStub(); + auto* call = new AsyncCall; + call->entity_id = entity_id; + call->parent = this; + + Entity request; + std::string entity_data; + if (!ReadFile(input_path, entity_data)) { + std::cerr << "Failed to read " << input_path << "\n"; + *success_ = false; + delete call; + return; + } + + request.set_entity(entity_data); + + auto it = input_metadata_->find(entity_id); + std::string json = (it != input_metadata_->end()) ? it->second : "{}"; + request.set_options(json); + + // Set timeout + // call->context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(30)); + + call->reader = stub_->AsyncOperate(&call->context, request, &cq_); + call->reader->Finish(&call->response, &call->status, (void*)call); + + { + std::lock_guard lock(mutex_); + ++in_flight_; + } +} + +bool GRPCEntityClient::ReadFile(const std::string& path, std::string& out) { + std::ifstream file(path, std::ios::binary); + if (!file) { + std::cerr << "Cannot open file: " << path << "\n"; + return false; + } + file.seekg(0, std::ios::end); + size_t size = file.tellg(); + if (size == 0) { + *success_ = false; + std::cerr << "File is empty: " << path << "\n"; + return false; + } + file.seekg(0); + out.resize(size); + file.read(&out[0], size); + if (!file) { + *success_ = false; + std::cerr << "Failed to read full file: " << path << "\n"; + return false; + } + return true; +} + +bool GRPCEntityClient::WriteFile(const std::string& path, const std::string& data) { + std::ofstream file(path, std::ios::binary); + if (!file){ + *success_ = false; + return false; + } + file.write(data.data(), data.size()); + return file.good(); +} + +void GRPCEntityClient::WaitForSlot() { + std::unique_lock lock(mutex_); + if (!worker_started_) { + worker_ = std::thread([this]() { HandleRpcs(); }); + worker_started_ = true; + } + cond_.wait(lock, [this]() { return in_flight_ < max_concurrent_tasks_; }); +} + +void GRPCEntityClient::HandleRpcs() { + void* tag; + bool ok = false; + while (cq_.Next(&tag, &ok)) { + auto* call = static_cast(tag); + if (ok && call->status.ok()) { + const std::string& out_data = call->response.entity(); + const std::string& out_json = call->response.options(); + + std::string output_path = output_paths_->at(call->entity_id); + if (!WriteFile(output_path, out_data)) { + *success_ = false; + std::cerr << "Failed to write entity: " << output_path << "\n"; + } + + (*output_metadata_)[call->entity_id] = out_json; + } else { + *success_ = false; + std::cerr << "RPC failed or connection error for: " << call->entity_id + << " - status: " << call->status.error_message() << " " << int(*success_) << "\n"; + } + + { + std::lock_guard lock(mutex_); + --in_flight_; + cond_.notify_one(); + if (in_flight_ == 0) { + all_done_.notify_one(); + } + } + + delete call; + } + + { + std::lock_guard lock(mutex_); + if (in_flight_ == 0) { + all_done_.notify_one(); + } + } +} diff --git a/src/vcl/Image.cc b/src/vcl/Image.cc index e63f7e0d..0d1b715d 100644 --- a/src/vcl/Image.cc +++ b/src/vcl/Image.cc @@ -371,142 +371,70 @@ void Image::SyncRemoteOperation::operator()(Image *img) { } else { if (!img->_cv_img.empty()) { - std::string readBuffer; + std::map input_paths; + std::map output_paths; + std::map input_metadata; + std::map output_metadata; - CURL *curl = NULL; - - CURLcode res; - struct curl_slist *headers = NULL; - curl_mime *form = NULL; - curl_mimepart *field = NULL; - - curl = curl_easy_init(); - - if (curl) { - auto time_now = std::chrono::system_clock::now(); - std::chrono::duration utc_time = time_now.time_since_epoch(); - - VCL::Format img_format = img->get_image_format(); - std::string format = VCL::format_to_string(img_format); - - if (format == "" && _options.isMember("format")) { - format = _options["format"].toStyledString().data(); - format.erase(std::remove(format.begin(), format.end(), '\n'), - format.end()); - format = format.substr(1, format.size() - 2); - } else { - format = "jpg"; - } - - std::string filePath = - VDMS::VDMSConfig::instance()->get_path_tmp() + "/tempfile" + - std::to_string(utc_time.count()) + "." + format; - cv::imwrite(filePath, img->_cv_img); - - std::ofstream tsfile; + auto time_now = std::chrono::system_clock::now(); + std::chrono::duration utc_time = time_now.time_since_epoch(); - auto opstart = std::chrono::system_clock::now(); + VCL::Format img_format = img->get_image_format(); + std::string format = VCL::format_to_string(img_format); - form = curl_mime_init(curl); + if (format == "" && _options.isMember("format")) { + format = _options["format"].toStyledString().data(); + format.erase(std::remove(format.begin(), format.end(), '\n'), + format.end()); + format = format.substr(1, format.size() - 2); + } else { + format = "jpg"; + } - field = curl_mime_addpart(form); - curl_mime_name(field, "imageData"); - if (curl_mime_filedata(field, filePath.data()) != CURLE_OK) { - if (std::remove(filePath.data()) != 0) { - } - throw VCLException(ObjectEmpty, - "Unable to create file for remoting"); - } + std::string filePath = VDMS::VDMSConfig::instance()->get_path_tmp() + + "/tempfile" + std::to_string(utc_time.count()) + + "." + format; + cv::imwrite(filePath, img->get_cvmat(false, false)); - field = curl_mime_addpart(form); - curl_mime_name(field, "jsonData"); - if (curl_mime_data(field, _options.toStyledString().data(), - _options.toStyledString().length()) != CURLE_OK) { - if (std::remove(filePath.data()) != 0) { - } - throw VCLException(ObjectEmpty, "Unable to create curl mime data"); - } + std::string imageId = img->get_image_id().data(); - // Post data - if (curl_easy_setopt(curl, CURLOPT_URL, _url.data()) != CURLE_OK) { - throw VCLException(UndefinedException, "CURL setup error with URL"); - } - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback) != - CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error with callback"); - } - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer) != - CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error with read buffer"); - } - if (curl_easy_setopt(curl, CURLOPT_MIMEPOST, form) != CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error with form"); - } + Json::StreamWriterBuilder builder; + std::string output = Json::writeString(builder, _options); - res = curl_easy_perform(curl); + input_paths[imageId] = filePath; + output_paths[imageId] = filePath; + input_metadata[imageId] = output; + bool success = true; + GRPCEntityClient client(_url); + client.ProcessEntities(input_paths, output_paths, input_metadata, output_metadata, success); - int http_status_code; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_status_code); + if (!success){ + throw VCLException(ObjectEmpty, + "Remote Server Error: RPC failed or connection error."); + } - curl_easy_cleanup(curl); - curl_mime_free(form); + Json::CharReaderBuilder metabuilder; + for (const auto& [id, metadata] : output_metadata) { + Json::Value root; + std::string errs; + std::istringstream iss(metadata); - if (http_status_code != 200) { - if (http_status_code == 0) { - throw VCLException(ObjectEmpty, "Remote server is not running."); - } - if (http_status_code == 400) { - throw VCLException(ObjectEmpty, - "Invalid Request to the Remote Server."); - } else if (http_status_code == 404) { - throw VCLException(ObjectEmpty, - "Invalid URL Request. Please check the URL."); - } else if (http_status_code == 500) { - throw VCLException(ObjectEmpty, - "Exception occurred at the remote server. " - "Please check your query."); - } else if (http_status_code == 503) { - throw VCLException(ObjectEmpty, "Unable to reach remote server"); + if (Json::parseFromStream(metabuilder, iss, &root, &errs)) { + img->set_ingest_metadata(root["metadata"]); } else { - throw VCLException(ObjectEmpty, "Remote Server error."); + throw VCLException(ObjectEmpty, "Metdata object is empty"); } - } - - std::string delimiter = ":metadata:"; - - size_t pos = 0; - std::string token; - std::string tmpBuffer = readBuffer; - if ((pos = tmpBuffer.find(delimiter)) != std::string::npos) { - readBuffer = tmpBuffer.substr(0, pos); - tmpBuffer.erase(0, pos + delimiter.length()); - Json::Value message; - Json::Reader reader; - bool parsingSuccessful = reader.parse(tmpBuffer, message); - if (!parsingSuccessful) { - throw VCLException(ObjectEmpty, "Error parsing string."); - } - img->set_ingest_metadata(message["metadata"]); - } - // Decode the response - std::vector vectordata(readBuffer.begin(), - readBuffer.end()); - cv::Mat data_mat(vectordata, true); - - if (data_mat.empty()) { - throw VCLException(ObjectEmpty, - "Empty response from remote server"); - } + } - cv::Mat decoded_mat(cv::imdecode(data_mat, 1)); + cv::Mat dmat = cv::imread(output_paths[imageId], cv::IMREAD_ANYCOLOR); + if (dmat.rows == 0 || dmat.cols == 0) { + throw VCLException(ObjectEmpty, + "Invalid response from the remote server."); + } - img->shallow_copy_cv(decoded_mat); + img->shallow_copy_cv(dmat); - if (std::remove(filePath.data()) != 0) { - } + if (std::remove(filePath.data()) != 0) { } } else @@ -538,6 +466,10 @@ void Image::UserOperation::operator()(Image *img) { zmq::context_t context(1); zmq::socket_t socket(context, zmq::socket_type::req); + int timeout_ms = 30000; + socket.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)); + socket.setsockopt(ZMQ_SNDTIMEO, &timeout_ms, sizeof(timeout_ms)); + std::string port = _options["port"].asString(); std::string address = "tcp://127.0.0.1:" + port; @@ -575,7 +507,10 @@ void Image::UserOperation::operator()(Image *img) { zmq::message_t ipfile(message_len); memcpy(ipfile.data(), message_to_send.data(), message_len); - socket.send(ipfile, 0); + // socket.send(ipfile, 0); + if (!socket.send(ipfile, zmq::send_flags::none)) { + throw VCLException(ObjectEmpty, "Failed to send message to receiver — connection timeout or error."); + } std::string response; while (true) { zmq::message_t reply; diff --git a/src/vcl/Video.cc b/src/vcl/Video.cc index fbbe062d..c54fa708 100644 --- a/src/vcl/Video.cc +++ b/src/vcl/Video.cc @@ -1012,77 +1012,6 @@ static size_t videoCallback(void *ptr, size_t size, size_t nmemb, return written; } -Json::Value process_response(std::string zip_file_name, - std::string video_file_name, std::string format) { - const char *zipFileName = zip_file_name.c_str(); - Json::Value metadata; - int zip_err = 0; - - zip *archive = zip_open(zipFileName, 0, &zip_err); - if (!archive) { - zip_error_t error; - zip_error_init_with_code(&error, zip_err); - std::string errorMessage = - "Failed to open the zip file: " + std::string(zipFileName); - errorMessage += ". Error: " + std::string(zip_error_strerror(&error)); - zip_error_fini(&error); - - std::cerr << errorMessage << std::endl; - throw VCLException(OpenFailed, errorMessage); - } - - int numFiles = zip_get_num_files(archive); - - for (int i = 0; i < numFiles; ++i) { - struct zip_stat fileInfo; - zip_stat_init(&fileInfo); - - if (zip_stat_index(archive, i, 0, &fileInfo) == 0) { - std::string filename(fileInfo.name); - zip_file *file = zip_fopen_index(archive, i, 0); - if (file) { - - if (filename.find(format) != std::string::npos) { - - char *new_filename = video_file_name.data(); - FILE *new_file = fopen(new_filename, "wb"); - if (!new_file) { - delete[] new_filename; - continue; - } - - char buffer[1024]; - int bytes_read; - while ((bytes_read = zip_fread(file, buffer, sizeof(buffer))) > 0) { - fwrite(buffer, 1, bytes_read, new_file); - } - - fclose(new_file); - } else { - char buffer[1024]; - std::string jsonString; - int bytes_read; - while ((bytes_read = zip_fread(file, buffer, sizeof(buffer))) > 0) { - jsonString += buffer; - } - - Json::Reader reader; - bool parsingSuccessful = reader.parse(jsonString, metadata); - if (!parsingSuccessful) { - return metadata; - } - } - zip_fclose(file); - } - } - } - - // Close the zip archive - zip_close(archive); - - return metadata; -} - void Video::SyncRemoteOperation::operator()(Video *video, cv::Mat &frame, std::string args) { try { @@ -1090,129 +1019,74 @@ void Video::SyncRemoteOperation::operator()(Video *video, cv::Mat &frame, if (frame_count > 0) { std::string fname = args; - CURL *curl = NULL; - - CURLcode res; - struct curl_slist *headers = NULL; - curl_mime *form = NULL; - curl_mimepart *field = NULL; - - curl = curl_easy_init(); - - if (curl) { - - form = curl_mime_init(curl); - - field = curl_mime_addpart(form); - curl_mime_name(field, "videoData"); - if (curl_mime_filedata(field, fname.data()) != CURLE_OK) { - throw VCLException(ObjectEmpty, - "Unable to retrieve local file for remoting"); + std::string format = ""; + char *s = const_cast(args.data()); + if (fname != "") { + std::string delimiter = "."; + char *p = std::strtok(s, delimiter.data()); + while (p != NULL) { + p = std::strtok(NULL, delimiter.data()); + if (p != NULL) { + format.assign(p, std::strlen(p)); + } } + } else { + throw VCLException(ObjectNotFound, "Video file not available"); + } - field = curl_mime_addpart(form); - curl_mime_name(field, "jsonData"); - if (curl_mime_data(field, _options.toStyledString().data(), - _options.toStyledString().length()) != CURLE_OK) { - throw VCLException( - ObjectEmpty, "Unable to create curl mime data for client params"); - } + auto time_now = std::chrono::system_clock::now(); + std::chrono::duration utc_time = time_now.time_since_epoch(); + std::string response_filepath = + VDMS::VDMSConfig::instance()->get_path_tmp() + "/rtempfile" + + std::to_string(utc_time.count()) + "." + format; - // Post data - std::string format = ""; - char *s = const_cast(args.data()); - if (fname != "") { - std::string delimiter = "."; - char *p = std::strtok(s, delimiter.data()); - while (p != NULL) { - p = std::strtok(NULL, delimiter.data()); - if (p != NULL) { - format.assign(p, std::strlen(p)); - } - } - } else { - throw VCLException(ObjectNotFound, "Video file not available"); - } + Json::StreamWriterBuilder builder; + std::string output = Json::writeString(builder, _options); - auto time_now = std::chrono::system_clock::now(); - std::chrono::duration utc_time = time_now.time_since_epoch(); - std::string response_filepath = - VDMS::VDMSConfig::instance()->get_path_tmp() + "/rtempfile" + - std::to_string(utc_time.count()) + "." + format; + std::map input_paths = { + {video->get_video_id(), fname} + }; - std::string zip_response_filepath = - VDMS::VDMSConfig::instance()->get_path_tmp() + "/rtempzipfile" + - std::to_string(utc_time.count()) + ".zip"; - FILE *zip_response_file = fopen(zip_response_filepath.data(), "wb"); + std::map output_paths = { + {video->get_video_id(), response_filepath} + }; - if (curl_easy_setopt(curl, CURLOPT_URL, _url.data()) != CURLE_OK) { - throw VCLException(UndefinedException, "CURL setup error with URL"); - } - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, videoCallback) != - CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error with callback"); - } + std::map input_metadata = { + {video->get_video_id(), output} + }; - if (zip_response_file) { - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, zip_response_file) != - CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error callback response file"); - } - if (curl_easy_setopt(curl, CURLOPT_MIMEPOST, form) != CURLE_OK) { - throw VCLException(UndefinedException, - "CURL setup error with form"); - } - curl_easy_perform(curl); - fclose(zip_response_file); - } + std::map output_metadata; + bool success = true; - int http_status_code; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_status_code); + GRPCEntityClient client(_url); + client.ProcessEntities(input_paths, output_paths, input_metadata, output_metadata, success); + if (!success){ + throw VCLException(ObjectEmpty, + "Remote Server Error: RPC failed or connection error."); + } - curl_easy_cleanup(curl); - curl_mime_free(form); + Json::CharReaderBuilder metabuilder; + for (const auto& [id, metadata] : output_metadata) { + Json::Value root; + std::string errs; + std::istringstream iss(metadata); - // Throw exceptions for different error codes received from the remote - // server - if (http_status_code != 200) { - std::cerr << "SyncRemoteOperation returned status code: " - << http_status_code << std::endl; - if (http_status_code == 0) { - throw VCLException(ObjectEmpty, "Remote server is not running."); - } - if (http_status_code == 400) { - throw VCLException(ObjectEmpty, - "Invalid Request to the Remote Server."); - } else if (http_status_code == 404) { - throw VCLException(ObjectEmpty, - "Invalid URL Request. Please check the URL."); - } else if (http_status_code == 500) { - throw VCLException(ObjectEmpty, - "Exception occurred at the remote server. " - "Please check your query."); - } else if (http_status_code == 503) { - throw VCLException(ObjectEmpty, "Unable to reach remote server"); + if (Json::parseFromStream(metabuilder, iss, &root, &errs)) { + video->set_ingest_metadata(root["metadata"]); } else { - throw VCLException(ObjectEmpty, "Remote Server error."); + throw VCLException(ObjectEmpty, "Metdata object is empty"); } - } + } - Json::Value metadata_response = - process_response(zip_response_filepath, response_filepath, format); - if (!metadata_response.empty()) { - video->set_ingest_metadata(metadata_response["metadata"]); - } + std::cout<< video->get_ingest_metadata()[0].toStyledString() << std::endl; - if (std::remove(fname.data()) != 0) { - throw VCLException(ObjectEmpty, - "Error encountered while removing the file."); - } - if (std::rename(response_filepath.data(), fname.data()) != 0) { - throw VCLException(ObjectEmpty, - "Error encountered while renaming the file."); - } + if (std::remove(fname.data()) != 0) { + throw VCLException(ObjectEmpty, + "Error encountered while removing the file."); + } + if (std::rename(response_filepath.data(), fname.data()) != 0) { + throw VCLException(ObjectEmpty, + "Error encountered while renaming the file."); } } else throw VCLException(ObjectEmpty, "Video object is empty"); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7fe767eb..94fa05dc 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -14,6 +14,8 @@ project(tests find_package( OpenCV REQUIRED ) find_package( Threads REQUIRED ) find_package(AWSSDK REQUIRED COMPONENTS core s3) +find_package(gRPC CONFIG REQUIRED) +find_package(absl CONFIG REQUIRED) if(USE_K8S) add_definitions("-D HAS_KUBERNETES_CLIENT") @@ -93,5 +95,6 @@ target_link_libraries(unit_tests ${OpenCV_LIBS} ${AWSSDK_LINK_LIBRARIES} neo4j-client + -lgtest ) diff --git a/tests/cleandbs.sh b/tests/cleandbs.sh index e34f3c4a..cdf0fc30 100755 --- a/tests/cleandbs.sh +++ b/tests/cleandbs.sh @@ -6,4 +6,6 @@ rm -rf test_db_1 || true rm -rf db || true rm -rf db_backup || true rm -rf /tmp/rpathimage.jpg || true -rm -rf /tmp/kubeconfig || true \ No newline at end of file +rm -rf /tmp/kubeconfig || true +rm -rf remote_function_test/entity_pb2_grpc.py || true +rm -rf remote_function_test/entity_pb2.py || true \ No newline at end of file diff --git a/tests/remote_function_test/entity.proto b/tests/remote_function_test/entity.proto new file mode 100644 index 00000000..1e1bc970 --- /dev/null +++ b/tests/remote_function_test/entity.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package entity; + +service Operator { + rpc Operate (Entity) returns (Entity); +} + +message Entity { + bytes entity = 1; + bytes options = 2; +} \ No newline at end of file diff --git a/tests/remote_function_test/functions/caption.py b/tests/remote_function_test/functions/caption.py index 72a260fb..551206b3 100644 --- a/tests/remote_function_test/functions/caption.py +++ b/tests/remote_function_test/functions/caption.py @@ -1,41 +1,19 @@ import cv2 -import uuid -import os +import numpy as np -def run(ipfilename, format, options, tmp_dir_path): - if not os.path.exists(tmp_dir_path): - raise Exception(f"{tmp_dir_path}: path is invalid") +def run(entity, options): + image_array = np.frombuffer(entity, dtype=np.uint8) - opfilename = os.path.join( - tmp_dir_path, "tmpfile" + uuid.uuid1().hex + "." + str(format) - ) + img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - vc = cv2.VideoCapture(ipfilename) - frame_width = int(vc.get(cv2.CAP_PROP_FRAME_WIDTH)) - frame_height = int(vc.get(cv2.CAP_PROP_FRAME_HEIGHT)) - video_fps = vc.get(cv2.CAP_PROP_FPS) + img = cv2.flip(img, 0) - video = cv2.VideoWriter( - opfilename, - cv2.VideoWriter_fourcc(*"mp4v"), - video_fps, - (frame_width, frame_height), - ) + success, encoded_img = cv2.imencode(".jpg", img) + if not success: + raise ValueError("Failed to encode image.") + ebytes = encoded_img.tobytes() - while True: - (grabbed, frame) = vc.read() - if not grabbed: - print("[INFO] no frame read from stream - exiting") - break + rdict = {"metadata": "None"} - label = options["text"] - cv2.putText( - frame, label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2 - ) - - video.write(frame) - vc.release() - video.release() - - return opfilename, None + return ebytes, rdict diff --git a/tests/remote_function_test/functions/flip.py b/tests/remote_function_test/functions/flip.py index ef579630..551206b3 100644 --- a/tests/remote_function_test/functions/flip.py +++ b/tests/remote_function_test/functions/flip.py @@ -1,13 +1,19 @@ import cv2 -import os +import numpy as np -def run(ipfilename, format, options, tmp_dir_path): - if not os.path.exists(ipfilename): - raise Exception(f"Flip error: File ipfilename: {ipfilename} does not exist") +def run(entity, options): + image_array = np.frombuffer(entity, dtype=np.uint8) - img = cv2.imread(ipfilename) + img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) img = cv2.flip(img, 0) - return img, None + success, encoded_img = cv2.imencode(".jpg", img) + if not success: + raise ValueError("Failed to encode image.") + ebytes = encoded_img.tobytes() + + rdict = {"metadata": "None"} + + return ebytes, rdict diff --git a/tests/remote_function_test/functions/metadata.py b/tests/remote_function_test/functions/metadata.py index 22a808a3..178d40a0 100644 --- a/tests/remote_function_test/functions/metadata.py +++ b/tests/remote_function_test/functions/metadata.py @@ -1,7 +1,7 @@ import cv2 -import uuid -import json import os +import imageio.v3 as iio +import numpy as np # Get the real directory where this Python file is currentDir = os.path.realpath(os.path.dirname(__file__)) @@ -12,7 +12,7 @@ if not os.path.exists(haarcascade_frontalface_default_path): raise Exception( - f"{haarcascade_frontalface_default_path}: path is invalid in metadata for the remote function tests" + f"{haarcascade_frontalface_default_path}: path is invalid in metadata for the remote function" ) face_cascade = cv2.CascadeClassifier( @@ -29,17 +29,11 @@ def facedetectbbox(frame): return faces -def run(ipfilename, format, options, tmp_dir_path): +def run(entity, options): if options["media_type"] == "video": - vs = cv2.VideoCapture(ipfilename) frameNum = 1 metadata = {} - while True: - (grabbed, frame) = vs.read() - if not grabbed: - print("[INFO] no frame read from stream - exiting") - break - + for frame in iio.imiter(entity, format_hint=".mp4"): if options["otype"] == "face": faces = facedetectbbox(frame) if len(faces) > 0: @@ -62,6 +56,8 @@ def run(ipfilename, format, options, tmp_dir_path): faces = facedetectbbox(frame) if len(faces) > 0: face = faces[0] + # We use dummy values here as an example to showcase + # different values for car. tdict = { "x": int(face[0]) + 3, "y": int(face[1]) + 5, @@ -76,17 +72,12 @@ def run(ipfilename, format, options, tmp_dir_path): if frameNum == 3: break - - response = {"opFile": ipfilename, "metadata": metadata} - - jsonfile = os.path.join(tmp_dir_path, "jsonfile" + uuid.uuid1().hex + ".json") - with open(jsonfile, "w") as f: - json.dump(response, f, indent=4) - return ipfilename, jsonfile - else: tdict = {} - img = cv2.imread(ipfilename) + + image_array = np.frombuffer(entity, dtype=np.uint8) + + img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if options["otype"] == "face": faces = facedetectbbox(img) if len(faces) > 0: @@ -103,6 +94,8 @@ def run(ipfilename, format, options, tmp_dir_path): faces = facedetectbbox(img) if len(faces) > 0: face = faces[0] + # We use placeholder values here as an example to showcase + # different values for car. tdict = { "x": int(face[0]) + 3, "y": int(face[1]) + 5, @@ -112,7 +105,7 @@ def run(ipfilename, format, options, tmp_dir_path): "object_det": {"color": "red"}, } - response = {"opFile": ipfilename, "metadata": tdict} + response = {"opFile": "", "metadata": tdict} + print(response) - r = json.dumps(response) - return img, r + return entity, response diff --git a/tests/remote_function_test/udf_server.py b/tests/remote_function_test/udf_server.py index a3484237..0114a456 100644 --- a/tests/remote_function_test/udf_server.py +++ b/tests/remote_function_test/udf_server.py @@ -1,15 +1,23 @@ -from flask import Flask, request, jsonify, send_file, after_this_request -import cv2 +import sys +import grpc +import entity_pb2 +import entity_pb2_grpc import json -from datetime import datetime, timezone import os -import sys -import uuid -from zipfile import ZipFile, is_zipfile import importlib.util -from werkzeug.utils import secure_filename +import asyncio +import signal +from concurrent.futures import ProcessPoolExecutor +import multiprocessing tmp_dir_path = None +UDF_MAP = {} + +cpu_cores = multiprocessing.cpu_count() +max_workers = max(cpu_cores - 1, 1) +executor = ProcessPoolExecutor(max_workers=max_workers) + +MAX_MSG_SIZE = 500 * 1024 * 1024 # Function to dynamically import a module given its full path @@ -57,143 +65,76 @@ def setup(tmp_path): raise Exception( "setup() error: module '" + entry + "' could not be loaded" ) - globals()[module_name] = module - - -app = Flask(__name__) - - -def get_current_timestamp(): - dt = datetime.now(timezone.utc) - - utc_time = dt.replace(tzinfo=timezone.utc) - utc_timestamp = utc_time.timestamp() + UDF_MAP[module_name] = module - return utc_timestamp +def run_udf(module_name, result, options): + udf = UDF_MAP[module_name] + return udf.run(result, options) -@app.route("/hello", methods=["GET"]) -def hello(): - return jsonify({"response": "true"}) - - -@app.route("/image", methods=["POST"]) -def image_api(): - json_data = json.loads(request.form["jsonData"]) - image_data = request.files["imageData"] - - format = json_data["format"] if "format" in json_data else "jpg" - - tmpfile = secure_filename( - os.path.join(tmp_dir_path, "tmpfile" + uuid.uuid1().hex + "." + str(format)) - ) - - image_data.save(tmpfile) - - r_img, r_meta = "", "" - - udf = globals()[json_data["id"]] - if "ingestion" in json_data: - r_img, r_meta = udf.run(tmpfile, format, json_data, tmp_dir_path) - else: - r_img, _ = udf.run(tmpfile, format, json_data, tmp_dir_path) - return_string = cv2.imencode("." + str(format), r_img)[1].tostring() - - if r_meta != "": - return_string += ":metadata:".encode("utf-8") - return_string += r_meta.encode("utf-8") +# gRPC Servicer +class OperatorServicer(entity_pb2_grpc.OperatorServicer): + async def Operate(self, request, context): + result = request.entity + options = json.loads(request.options.decode("utf-8")) + loop = asyncio.get_running_loop() + ebytes, rdict = await loop.run_in_executor( + executor, run_udf, options["id"], result, options + ) + return entity_pb2.Entity( + entity=ebytes, options=json.dumps(rdict).encode("utf-8") + ) - os.remove(tmpfile) - return return_string +# Graceful shutdown handler +async def shutdown(server, executor): + print("\nShutting down...") + await server.stop(5) # Allow 5 seconds to finish active RPCs + executor.shutdown(wait=True) + print("Shutdown complete.") -@app.route("/video", methods=["POST"]) -def video_api(): - json_data = json.loads(request.form["jsonData"]) - video_data = request.files["videoData"] - format = json_data["format"] if "format" in json_data else "mp4" - tmpfile = secure_filename( - os.path.join(tmp_dir_path, "tmpfile" + uuid.uuid1().hex + "." + str(format)) +async def main(port): + global MAX_MSG_SIZE + # server = grpc.aio.server() + server = grpc.aio.server( + options=[ + ("grpc.max_receive_message_length", MAX_MSG_SIZE), + ("grpc.max_send_message_length", MAX_MSG_SIZE), + ] ) - video_data.save(tmpfile) + entity_pb2_grpc.add_OperatorServicer_to_server(OperatorServicer(), server) + server.add_insecure_port("[::]:{}".format(port)) + await server.start() + print("Async gRPC server (multiprocessing) started on port", port) - video_file, metadata_file = "", "" + stop_event = asyncio.Event() - udf = globals()[json_data["id"]] - if "ingestion" in json_data: - video_file, metadata_file = udf.run(tmpfile, format, json_data, tmp_dir_path) - else: - video_file, metadata_file = udf.run(tmpfile, format, json_data, tmp_dir_path) + # Handle SIGINT and SIGTERM + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, stop_event.set) - response_file = os.path.join(tmp_dir_path, "tmpfile" + uuid.uuid1().hex + ".zip") + await stop_event.wait() + await shutdown(server, executor) - try: - with ZipFile(response_file, "w") as zip_object: - zip_object.write(video_file, os.path.basename(video_file)) - if metadata_file is not None and metadata_file != "": - zip_object.write(metadata_file, os.path.basename(metadata_file)) - zip_object.close() - if not is_zipfile(response_file): - raise Exception("response_file is invalid: " + response_file) - except Exception: - error_message = "An internal error has occurred." - return error_message, 500 - - @after_this_request - def remove_tempfile(response): - try: - os.remove(tmpfile) - os.remove(response_file) - os.remove(video_file) - os.remove(metadata_file) - except Exception as e: - print( - "Some files cannot be deleted or are not present:", - str(e), - file=sys.stderr, - ) - return response - try: - return send_file( - response_file, - as_attachment=True, - download_name=os.path.basename(response_file), - ) - except Exception: - return "Error in file read" - - -@app.errorhandler(400) -def handle_bad_request(e): - response = e.get_response() - response.data = json.dumps( - { - "code": e.code, - "name": e.name, - "description": e.description, - } - ) - response.content_type = "application/json" - print("400 error:", response, file=sys.stderr) - return response - - -def main(): +if __name__ == "__main__": if sys.argv[1] is None: print("Port missing\n Correct Usage: python3 udf_server.py [tmp_path]") - elif sys.argv[2] is None: + elif len(sys.argv) < 3: print( "Warning: Path to the temporary directory is missing\nBy default the path will be the current directory" ) setup(None) - app.run(host="0.0.0.0", port=int(sys.argv[1])) + try: + asyncio.run(main(int(sys.argv[1]))) + except KeyboardInterrupt: + pass else: setup(sys.argv[2]) - app.run(host="0.0.0.0", port=int(sys.argv[1])) - - -if __name__ == "__main__": - main() + try: + asyncio.run(main(int(sys.argv[1]))) + except KeyboardInterrupt: + pass diff --git a/tests/run_all_tests.py b/tests/run_all_tests.py index 0e2cf6c9..91672b68 100755 --- a/tests/run_all_tests.py +++ b/tests/run_all_tests.py @@ -1450,6 +1450,36 @@ def setup_requirements_for_remote_udf_server(self, stderrFD, stdoutFD): "setup_requirements_for_remote_udf_server() error: " + str(e) ) + def setup_protobufs_for_remote_udf_server(self, stderrFD, stdoutFD): + """ + Sets up the protobufs for the remote UDF server. + + This method installs the necessary Python packages for the remote UDF + server by running the `pip install` command with the requirements file. + It also handles exceptions and prints debug information if DEBUG_MODE + is enabled. + + Parameters: + - stderrFD: The file descriptor for capturing stderr output. + - stdoutFD: The file descriptor for capturing stdout output. + + Raises: + - Exception: If any error occurs during the setup process. + """ + + try: + subprocess.run( + f"python3 -m grpc_tools.protoc -I{DEFAULT_DIR_REPO}/tests/remote_function_test --python_out={DEFAULT_DIR_REPO}/tests/remote_function_test --grpc_python_out={DEFAULT_DIR_REPO}/tests/remote_function_test {DEFAULT_DIR_REPO}/tests/remote_function_test/entity.proto", + shell=True, + stderr=stderrFD, + stdout=stdoutFD, + text=True, + check=True, + ) + + except Exception as e: + raise Exception("setup_protobufs_for_remote_udf_server() error: " + str(e)) + def run_remote_udf_server(self, tmp_dir, stderrFD, stdoutFD): """ Runs the remote UDF server. @@ -1521,6 +1551,7 @@ def setup_for_remote_udf_server_tests(self, tmp_dir, stderrFD, stdoutFD): print("setup_for_remote_udf_server_tests...") self.setup_requirements_for_remote_udf_server(stderrFD, stdoutFD) + self.setup_protobufs_for_remote_udf_server(stderrFD, stdoutFD) self.run_remote_udf_server(tmp_dir, stderrFD, stdoutFD) except Exception as e: raise Exception( diff --git a/tests/run_tests.sh b/tests/run_tests.sh index d63bf760..bdeb80f3 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -74,6 +74,7 @@ function execute_commands() { echo 'Start remote server for test' cd remote_function_test + python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. entity.proto python3 -m pip install -r ../../remote_function/requirements.txt python3 udf_server.py 5010 /tmp/tests_output_dir > /tmp/tests_output_dir/tests_remote_screen.log 2> /tmp/tests_output_dir/tests_remote_log.log & diff --git a/tests/unit_tests/Image_test.cc b/tests/unit_tests/Image_test.cc index 02b73aa5..8f2c647a 100644 --- a/tests/unit_tests/Image_test.cc +++ b/tests/unit_tests/Image_test.cc @@ -30,7 +30,7 @@ #include "ImageLoop.h" #include "VDMSConfig.h" #include "stats/SystemStats.h" -#include "vcl/Image.h" +// #include "vcl/Image.h" #include "gtest/gtest.h" #include @@ -869,7 +869,7 @@ TEST_F(ImageTest, SyncRemote) { ASSERT_TRUE(fs::exists(fs::path(inputFile))); cv::Mat cv_img_flipped = cv::imread(inputFile); - std::string _url = "http://localhost:5010/image"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "jpg"; _options["id"] = "flip"; @@ -916,7 +916,7 @@ TEST_F(ImageTest, ImageLoop) { VCL::Image img(img_); ImageLoop imageLoop; - std::string _url = "http://localhost:5010/image"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "jpg"; _options["id"] = "flip"; @@ -947,7 +947,7 @@ TEST_F(ImageTest, ImageLoopURLError) { VCL::Image img(img_); ImageLoop imageLoop; - std::string _url = "http://localhost:5010/imag"; + std::string _url = "localhost:5015"; Json::Value _options; _options["format"] = "jpg"; _options["id"] = "flip"; @@ -973,7 +973,7 @@ TEST_F(ImageTest, ImageLoopRemoteFunctionError) { VCL::Image img(img_); ImageLoop imageLoop; - std::string _url = "http://localhost:5010/image"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "jpg"; _options["id"] = "gray"; @@ -998,7 +998,7 @@ TEST_F(ImageTest, ImageLoopSyncRemoteFunctionError) { VCL::Image img(img_); ImageLoop imageLoop; - std::string _url = "http://localhost:5010/imag"; + std::string _url = "localhost:5015"; Json::Value _options; _options["format"] = "jpg"; _options["id"] = "gray"; @@ -1104,7 +1104,7 @@ TEST_F(ImageTest, RemoteMetadata) { VCL::Image img = VCL::Image(temp_image_path, true); - std::string _url = "http://localhost:5010/image"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "jpg"; _options["id"] = "metadata"; diff --git a/tests/unit_tests/Video_test.cc b/tests/unit_tests/Video_test.cc index 28574e9b..9917c9c9 100644 --- a/tests/unit_tests/Video_test.cc +++ b/tests/unit_tests/Video_test.cc @@ -28,7 +28,7 @@ */ #include "VideoLoop.h" -#include "vcl/Video.h" +// #include "vcl/Video.h" #include "gtest/gtest.h" #include @@ -825,7 +825,7 @@ TEST_F(VideoTest, CropWrite) { * that undergoes a captioning operation. */ TEST_F(VideoTest, SyncRemoteWrite) { - std::string _url = "http://localhost:5010/video"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "mp4"; _options["text"] = "Video"; @@ -991,7 +991,7 @@ TEST_F(VideoTest, UDFWrite) { * The resulting video being encoded should not be null. */ TEST_F(VideoTest, VideoLoopTest) { - std::string _url = "http://localhost:5010/video"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "mp4"; _options["text"] = "Video"; @@ -1042,7 +1042,7 @@ TEST_F(VideoTest, VideoLoopTest) { * The resulting video being encoded should not be null. */ TEST_F(VideoTest, VideoLoopPipelineTest) { - std::string _url = "http://localhost:5010/video"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "mp4"; _options["text"] = "Video"; @@ -1101,7 +1101,7 @@ TEST_F(VideoTest, VideoLoopPipelineTest) { * The resulting video object should have an error message. */ TEST_F(VideoTest, VideoLoopTestError) { - std::string _url = "http://localhost:5010/vide"; + std::string _url = "localhost:5015"; Json::Value _options; _options["format"] = "mp4"; _options["text"] = "Video"; @@ -1143,7 +1143,7 @@ TEST_F(VideoTest, VideoLoopTestError) { * The resulting video object should have an error message. */ TEST_F(VideoTest, VideoLoopSyncRemoteTestError) { - std::string _url = "http://localhost:5010/vide"; + std::string _url = "localhost:5015"; Json::Value _options; _options["format"] = "mp4"; _options["text"] = "Video"; @@ -1408,7 +1408,7 @@ TEST_F(VideoTest, FilePathAccessError) { * Metadata check is performed by this test. */ TEST_F(VideoTest, SyncRemoteWriteWithMetadata) { - std::string _url = "http://localhost:5010/video"; + std::string _url = "localhost:5010"; Json::Value _options; _options["format"] = "mp4"; _options["id"] = "metadata"; diff --git a/tests/unit_tests/client_image.cc b/tests/unit_tests/client_image.cc index 6c65ee9e..6b5cd9be 100644 --- a/tests/unit_tests/client_image.cc +++ b/tests/unit_tests/client_image.cc @@ -123,7 +123,7 @@ TEST(CLIENT_CPP, find_image_remote) { Json::Value tuple; Json::Value op; op["type"] = "remoteOp"; - op["url"] = "http://localhost:5010/image"; + op["url"] = "localhost:5010"; op["options"]["id"] = "flip"; op["options"]["format"] = "jpg"; tuple = meta_obj->construct_find_image_withop(op); @@ -148,7 +148,7 @@ TEST(CLIENT_CPP, find_image_syncremote) { Json::Value tuple; Json::Value op; op["type"] = "syncremoteOp"; - op["url"] = "http://localhost:5010/image"; + op["url"] = "localhost:5010"; op["options"]["id"] = "flip"; op["options"]["format"] = "jpg"; tuple = meta_obj->construct_find_image_withop(op); @@ -227,7 +227,7 @@ TEST(CLIENT_CPP, add_image_dynamic_metadata_remote) { op["options"]["format"] = "jpg"; op["options"]["media_type"] = "image"; op["options"]["otype"] = "face"; - op["url"] = "http://localhost:5010/image"; + op["url"] = "localhost:5010"; Meta_Data *meta_obj = new Meta_Data(); blobs.push_back(meta_obj->read_blob(filename)); meta_obj->_aclient.reset( diff --git a/tests/unit_tests/client_videos.cc b/tests/unit_tests/client_videos.cc index 5aa0fdf3..d9710676 100644 --- a/tests/unit_tests/client_videos.cc +++ b/tests/unit_tests/client_videos.cc @@ -177,7 +177,7 @@ TEST(CLIENT_CPP_Video, add_dynamic_metadata) { Json::Value op; op["type"] = "syncremoteOp"; - op["url"] = "http://localhost:5010/video"; + op["url"] = "localhost:5010"; op["options"]["id"] = "metadata"; op["options"]["media_type"] = "video"; op["options"]["otype"] = "face"; @@ -216,7 +216,7 @@ TEST(CLIENT_CPP_Video, find_dynamic_metadata) { Json::Value op; op["type"] = "syncremoteOp"; - op["url"] = "http://localhost:5010/video"; + op["url"] = "localhost:5010"; op["options"]["id"] = "metadata"; op["options"]["media_type"] = "video"; op["options"]["otype"] = "face"; diff --git a/utils/src/protobuf/entity.proto b/utils/src/protobuf/entity.proto new file mode 100644 index 00000000..1e1bc970 --- /dev/null +++ b/utils/src/protobuf/entity.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package entity; + +service Operator { + rpc Operate (Entity) returns (Entity); +} + +message Entity { + bytes entity = 1; + bytes options = 2; +} \ No newline at end of file