Skip to content

ekaf_picker:pick_sync does not consider custom strategy #46

@anthonyrisinger

Description

@anthonyrisinger

ekaf_lib:common_sync/4 uses ekaf:pick(Topic) -> pg2:get_closest_pid(Topic) which does not take custom strategies into account. This makes ekaf:produce_sync* functions work differently for a binary key vs a {binkey, binval} tuple:

158> ekaf:produce_sync_batched(<<"hai">>,<<"value">>). 
{buffered,0,1}
159> ekaf:produce_sync_batched(<<"hai">>,{<<>>,<<"value">>}).
>>>>>>>>>>>>>>>>>>> {<<"hai">>,
                     {<<>>,<<"value">>},
                     [<0.4644.1>,<0.5064.1>,<0.8221.1>]}
[{buffered,0,1}]

The async variants already work as expected in all cases. The partitioner might want to do it's own default thing for all ekaf:produce_* calls.

Reproduce by setting ekaf_callback_custom_partition_picker to:

partition_picker(Topic, Data, State) ->                                         
  io:format(">>>>>>>>>>>>>>>>>>> ~p~n",[{Topic, Data, State}]),                 
  {partition, 0}                                                                
.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions