Skip to content
Merged
68 changes: 36 additions & 32 deletions plumber/ppl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,55 @@ RUN mix local.hex --force --if-missing && \
mix local.rebar --force --if-missing

# install mix dependencies
COPY ppl/mix.exs ppl/mix.lock ./
COPY block/mix.exs block/mix.lock ../block/
COPY definition_validator/mix.exs definition_validator/mix.lock ../definition_validator/
COPY gofer_client/mix.exs gofer_client/mix.lock ../gofer_client/
COPY job_matrix/mix.exs job_matrix/mix.lock ../job_matrix/
COPY looper/mix.exs looper/mix.lock ../looper/
COPY proto/mix.exs proto/mix.lock ../proto/
COPY spec/mix.exs spec/mix.lock ../spec/
# NOTE: the build context is the repository root (see DOCKER_BUILD_PATH in the
# Makefile) so that feature_provider, which lives outside the plumber/ tree,
# can be copied into the image.
COPY plumber/ppl/mix.exs plumber/ppl/mix.lock ./
COPY plumber/block/mix.exs plumber/block/mix.lock ../block/
COPY plumber/definition_validator/mix.exs plumber/definition_validator/mix.lock ../definition_validator/
COPY plumber/gofer_client/mix.exs plumber/gofer_client/mix.lock ../gofer_client/
COPY plumber/job_matrix/mix.exs plumber/job_matrix/mix.lock ../job_matrix/
COPY plumber/looper/mix.exs plumber/looper/mix.lock ../looper/
COPY plumber/proto/mix.exs plumber/proto/mix.lock ../proto/
COPY plumber/spec/mix.exs plumber/spec/mix.lock ../spec/
COPY feature_provider ../feature_provider
RUN mix deps.get --only $MIX_ENV
RUN mkdir config

# copy compile-time config files before we compile dependencies
# to ensure any relevant config change will trigger the dependencies
# to be re-compiled.
COPY ppl/config/config.exs ppl/config/${MIX_ENV}.exs config/
COPY block/config/config.exs block/config/${MIX_ENV}.exs ../block/config/
COPY definition_validator/config/config.exs ../definition_validator/config/
COPY gofer_client/config/config.exs ../gofer_client/config/
COPY job_matrix/config/config.exs ../job_matrix/config/
COPY looper/config/config.exs ../looper/config/
COPY proto/config/config.exs ../proto/config/
COPY spec/config/config.exs ../spec/config/
COPY plumber/ppl/config/config.exs plumber/ppl/config/${MIX_ENV}.exs config/
COPY plumber/block/config/config.exs plumber/block/config/${MIX_ENV}.exs ../block/config/
COPY plumber/definition_validator/config/config.exs ../definition_validator/config/
COPY plumber/gofer_client/config/config.exs ../gofer_client/config/
COPY plumber/job_matrix/config/config.exs ../job_matrix/config/
COPY plumber/looper/config/config.exs ../looper/config/
COPY plumber/proto/config/config.exs ../proto/config/
COPY plumber/spec/config/config.exs ../spec/config/
RUN mix deps.compile
# copy the rest of the config files
COPY ppl/config/ config/
COPY plumber/ppl/config/ config/

# Compile the release
COPY ppl/lib lib
COPY ppl/priv/ecto_repo/migrations priv/ecto_repo/migrations
COPY block/lib ../block/lib
COPY block/priv/ecto_repo/migrations ../block/priv/ecto_repo/migrations
COPY block/priv/repos ../block/priv/repos
COPY definition_validator/lib ../definition_validator/lib
COPY gofer_client/lib ../gofer_client/lib
COPY job_matrix/lib ../job_matrix/lib
COPY looper/lib ../looper/lib
COPY proto/lib ../proto/lib
COPY spec/lib ../spec/lib
COPY spec/priv ../spec/priv
COPY plumber/ppl/lib lib
COPY plumber/ppl/priv/ecto_repo/migrations priv/ecto_repo/migrations
COPY plumber/block/lib ../block/lib
COPY plumber/block/priv/ecto_repo/migrations ../block/priv/ecto_repo/migrations
COPY plumber/block/priv/repos ../block/priv/repos
COPY plumber/definition_validator/lib ../definition_validator/lib
COPY plumber/gofer_client/lib ../gofer_client/lib
COPY plumber/job_matrix/lib ../job_matrix/lib
COPY plumber/looper/lib ../looper/lib
COPY plumber/proto/lib ../proto/lib
COPY plumber/spec/lib ../spec/lib
COPY plumber/spec/priv ../spec/priv

FROM base AS dev

COPY ppl/.formatter.exs .formatter.exs
COPY ppl/.credo.exs .credo.exs
COPY ppl/test test
COPY plumber/ppl/.formatter.exs .formatter.exs
COPY plumber/ppl/.credo.exs .credo.exs
COPY plumber/ppl/test test

RUN mix compile

Expand Down
5 changes: 4 additions & 1 deletion plumber/ppl/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ export MIX_ENV?=dev

include ../../Makefile

DOCKER_BUILD_PATH=..
# Build context is the repository root (two levels up from plumber/ppl) so the
# Docker image can include feature_provider, which lives outside the plumber/
# tree. The Dockerfile COPY paths are prefixed with plumber/ accordingly.
DOCKER_BUILD_PATH=../..
EX_CATCH_WARRNINGS_FLAG=

APP_NAME=ppl
Expand Down
8 changes: 8 additions & 0 deletions plumber/ppl/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ config :vmstats,
sink: Ppl.VmstatsWatchmanSink,
interval: 10_000

# Feature flags (FeatureHub). The Feature service address is read at call time
# from INTERNAL_API_URL_FEATURE (see Ppl.FeatureClient). Results are cached in
# the :feature_cache Cachex instance started by Ppl.Application.
config :ppl,
feature_provider:
Comment thread
dexyk marked this conversation as resolved.
{Ppl.FeatureHubProvider,
[cache: {FeatureProvider.CachexCache, name: :feature_cache, ttl_ms: :timer.minutes(10)}]}

# Mappings to function definitions for functions available in when condition DSL
config :when, change_in: {Block.ChangeInResolver, :change_in, [1, 2]}

Expand Down
4 changes: 4 additions & 0 deletions plumber/ppl/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ config :ppl, Ppl.Cache.OrganizationSettings,
size_limit: 1_000,
reclaim_coef: 0.5

# In tests, use the FeatureHub provider without caching so the gRPC mock is
# always consulted. Tests that need to control the flag stub Ppl.Features.
config :ppl, feature_provider: {Ppl.FeatureHubProvider, []}

# Time to wait before pipeline status is reexamined
# -2 means 'do not wait' or take all
config :ppl, general_looper_cooling_time_sec: -2
Expand Down
4 changes: 2 additions & 2 deletions plumber/ppl/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ services:
app:
image: ${IMAGE:-ppl}:${TAG:-latest}
build:
context: ..
context: ../..
cache_from:
- "${IMAGE:-ppl}:${IMAGE_TAG:-latest}"
dockerfile: ppl/Dockerfile
dockerfile: plumber/ppl/Dockerfile
target: ${DOCKER_BUILD_TARGET:-dev}
args:
- BUILD_ENV=dev
Expand Down
26 changes: 21 additions & 5 deletions plumber/ppl/lib/ppl/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,31 @@ defmodule Ppl.Application do
Application.stop(:watchman)
Application.ensure_all_started(:watchman)

init_feature_provider()

# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Ppl.Supervisor]
(children(get_env()) ++ grpc_supervisor(get_env())) |> Supervisor.start_link(opts)
end

defp init_feature_provider() do
case Application.get_env(:ppl, :feature_provider) do
nil -> :ok
provider -> FeatureProvider.init(provider)
end
end

def children(:test) do
[
{Looper.Publisher.AMQP, amqp_url()},
Supervisor.child_spec({Ppl.Grpc.InFlightCounter, in_flight_counter_args(:describe)}, id: InFlightCounterDescribe),
Supervisor.child_spec({Ppl.Grpc.InFlightCounter, in_flight_counter_args(:list)}, id: InFlightCounterList),
Supervisor.child_spec({Ppl.Grpc.InFlightCounter, in_flight_counter_args(:describe)},
id: InFlightCounterDescribe
),
Supervisor.child_spec({Ppl.Grpc.InFlightCounter, in_flight_counter_args(:list)},
id: InFlightCounterList
),
%{id: :feature_cache, start: {Cachex, :start_link, [:feature_cache, []]}},
supervisor(Ppl.Cache, []),
supervisor(Ppl.EctoRepo, [])
]
Expand All @@ -41,12 +55,14 @@ defmodule Ppl.Application do
[
Test.Support.Mocks.UserServer,
Test.Support.Mocks.PFCServer,
Test.Support.Mocks.OrgServer
Test.Support.Mocks.OrgServer,
Test.Support.Mocks.FeatureServer
] ++ grpc_servers()

defp grpc_servers(_), do: grpc_servers()

defp grpc_servers, do: [Ppl.Grpc.Server, Plumber.WorkflowAPI.Server, Ppl.Admin.Server, Ppl.Grpc.HealthCheck]
defp grpc_servers,
do: [Ppl.Grpc.Server, Plumber.WorkflowAPI.Server, Ppl.Admin.Server, Ppl.Grpc.HealthCheck]

def children_ do
[
Expand Down Expand Up @@ -100,7 +116,7 @@ defmodule Ppl.Application do
defp in_flight_counter_args(type), do: [type: type, limit: in_flight_counter_limit(type)]

defp in_flight_counter_limit(type) do
up_type = type |> Atom.to_string |> String.upcase
up_type = type |> Atom.to_string() |> String.upcase()

"IN_FLIGHT_#{up_type}_LIMIT"
|> System.get_env()
Expand Down
105 changes: 105 additions & 0 deletions plumber/ppl/lib/ppl/feature_client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
defmodule Ppl.FeatureClient do
@moduledoc """
gRPC client consuming the Feature (FeatureHub) API.

Used by `Ppl.FeatureHubProvider` to fetch the features enabled for an
organization. Failures are turned into `{:error, _}` so callers can fail
closed (treat the feature as disabled).
"""

@watchman_prefix_key "Ppl.FeatureClient"
@url_env_var "INTERNAL_API_URL_FEATURE"
# This call sits in the pipeline initialization hot path (the compile task is
# built and awaited inside a deadline-bounded looper step), so it must be
# tightly bounded and fail fast. @grpc_timeout is the per-call gRPC deadline;
# @timeout is a slightly larger Wormhole backstop covering connect hangs.
@grpc_timeout 1_000
@timeout 1_500

alias InternalApi.Feature, as: API
alias API.FeatureService.Stub
require Logger

@doc """
Calls the Feature gRPC API to list the features enabled for an organization.
"""
@spec list_organization_features(String.t()) ::
{:ok, [InternalApi.Feature.OrganizationFeature.t()]}
| {:error, :timeout}
| {:error, any()}
def list_organization_features(organization_id) do
result =
Wormhole.capture(__MODULE__, :do_list_organization_features, [organization_id],
stacktrace: true,
timeout: @timeout
)

case result do
{:ok, features} ->
{:ok, features}

{:error, {:timeout, timeout}} ->
log_timeout(organization_id, timeout)
{:error, :timeout}

{:error, {:shutdown, {reason, _stacktrace}}} ->
log_shutdown(organization_id, reason)
{:error, reason}
end
end

#
# gRPC connection
#

@doc false
def do_list_organization_features(organization_id) do
Watchman.benchmark("#{@watchman_prefix_key}.list_organization_features.duration", fn ->
request = API.ListOrganizationFeaturesRequest.new(org_id: organization_id)

case send_request(request) do
{:ok, response} ->
Watchman.increment("#{@watchman_prefix_key}.list_organization_features.success")
response.organization_features

{:error, reason} ->
Watchman.increment("#{@watchman_prefix_key}.list_organization_features.failure")
raise reason
end
end)
end

defp send_request(request) do
url =
System.get_env(@url_env_var) || raise "environment variable #{@url_env_var} was not found"

with {:ok, channel} <- GRPC.Stub.connect(url) do
Watchman.increment("#{@watchman_prefix_key}.list_organization_features.connect")

try do
Stub.list_organization_features(channel, request, timeout: @grpc_timeout)
after
GRPC.Stub.disconnect(channel)
end
end
end

#
# Logging functions
#

defp log_timeout(organization_id, _timeout) do
metadata = log_metadata(organization_id: organization_id)
Logger.error("Ppl.FeatureClient.list_organization_features/1: TIMEOUT #{metadata}")
end

defp log_shutdown(organization_id, reason) do
metadata = log_metadata(organization_id: organization_id, reason: reason)
Logger.error("Ppl.FeatureClient.list_organization_features/1: SHUTDOWN #{metadata}")
end

defp log_metadata(metadata) do
formatter = &"#{elem(&1, 0)}=#{inspect(elem(&1, 1))}"
metadata |> Enum.map_join(" ", formatter)
end
end
61 changes: 61 additions & 0 deletions plumber/ppl/lib/ppl/feature_hub_provider.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Ppl.FeatureHubProvider do
@moduledoc """
`FeatureProvider.Provider` implementation backed by the Feature (FeatureHub)
gRPC API.

Only organization features are needed in plumber; machines are not used here.
On any error fetching features we return `{:error, _}`, which makes
`FeatureProvider.feature_enabled?/2` fail closed (return `false`).
"""

use FeatureProvider.Provider

alias Ppl.FeatureClient
alias InternalApi.Feature.{Availability, OrganizationFeature}

@impl FeatureProvider.Provider
def provide_features(org_id, _opts \\ []) do
case FeatureClient.list_organization_features(org_id) do
{:ok, organization_features} ->
features =
organization_features
|> Enum.map(&feature_from_grpc/1)
|> Enum.filter(&FeatureProvider.Feature.visible?/1)

{:ok, features}

{:error, reason} ->
{:error, reason}
end
end

@impl FeatureProvider.Provider
def provide_machines(_org_id, _opts \\ []) do
{:ok, []}
end

defp feature_from_grpc(%OrganizationFeature{feature: feature, availability: availability}) do
%FeatureProvider.Feature{
name: feature.name,
type: feature.type,
description: feature.description,
quantity: quantity_from_availability(availability),
state: state_from_availability(availability)
}
end

defp quantity_from_availability(%Availability{quantity: quantity}), do: quantity
defp quantity_from_availability(_), do: 0

defp state_from_availability(%Availability{state: state}) do
state
|> Availability.State.key()
|> case do
:ENABLED -> :enabled
:HIDDEN -> :disabled
:ZERO_STATE -> :zero_state
end
end

defp state_from_availability(_), do: :disabled
end
Loading