Skip to content

Commit 852cfdd

Browse files
committed
Implement super stream exchange type
new murmur3 module rename
1 parent 8f0dbdd commit 852cfdd

4 files changed

Lines changed: 371 additions & 1 deletion

File tree

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_rec
260260

261261
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
262262
PARALLEL_CT_SET_3_B = list_consumers_sanity_check list_queues_online_and_offline logging lqueue rabbit_fifo_q rabbit_fifo_pq
263-
PARALLEL_CT_SET_3_C = cli_forget_cluster_node mc_unit message_size_limit
263+
PARALLEL_CT_SET_3_C = cli_forget_cluster_node mc_unit message_size_limit rabbit_murmur3
264264
PARALLEL_CT_SET_3_D = metrics mirrored_supervisor proxy_protocol runtime_parameters unit_rabbit_vm unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
265265

266266
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_exchange_type_murmur3).
9+
10+
-include_lib("rabbit_common/include/rabbit.hrl").
11+
12+
-behaviour(rabbit_exchange_type).
13+
14+
-export([description/0,
15+
serialise_events/0,
16+
route/3,
17+
info/1,
18+
info/2]).
19+
-export([validate/1,
20+
validate_binding/2,
21+
create/2,
22+
delete/2,
23+
policy_changed/2,
24+
add_binding/3,
25+
remove_bindings/3,
26+
assert_args_equivalence/2]).
27+
28+
-rabbit_boot_step({rabbit_exchange_type_murmur3_registry,
29+
[{description, "exchange type x-super-stream: registry"},
30+
{mfa, {rabbit_registry, register,
31+
[exchange, <<"x-super-stream">>, ?MODULE]}},
32+
{cleanup, {rabbit_registry, unregister,
33+
[exchange, <<"x-super-stream">>]}},
34+
{requires, rabbit_registry},
35+
{enables, kernel_ready}]}).
36+
37+
-define(SEED, 104729).
38+
39+
description() ->
40+
[{description, <<"Super stream exchange type using murmur3 hashing">>}].
41+
42+
serialise_events() -> false.
43+
44+
route(#exchange{name = Name}, Msg, _Opts) ->
45+
case mc:routing_keys(Msg) of
46+
[RKey | _] ->
47+
case rabbit_binding:list_for_source(Name) of
48+
[] ->
49+
[];
50+
Bindings ->
51+
N = integer_to_binary(hash_mod(RKey, length(Bindings))),
52+
case lists:search(fun(#binding{key = Key}) ->
53+
Key =:= N
54+
end, Bindings) of
55+
{value, #binding{destination = Dest}} ->
56+
[Dest];
57+
false ->
58+
[]
59+
end
60+
end;
61+
[] ->
62+
[]
63+
end.
64+
65+
info(_) ->
66+
[].
67+
68+
info(_, _) ->
69+
[].
70+
71+
validate(_X) ->
72+
ok.
73+
74+
validate_binding(_X, #binding{key = K}) ->
75+
try
76+
_ = binary_to_integer(K),
77+
ok
78+
catch
79+
error:badarg ->
80+
{error,
81+
{binding_invalid, "The binding key must be an integer: ~tp", [K]}}
82+
end.
83+
84+
create(_Serial, _X) ->
85+
ok.
86+
87+
delete(_Serial, _X) ->
88+
ok.
89+
90+
policy_changed(_X1, _X2) ->
91+
ok.
92+
93+
add_binding(_Serial, _X, _B) ->
94+
ok.
95+
96+
remove_bindings(_Serial, _X, _Bs) ->
97+
ok.
98+
99+
assert_args_equivalence(X, Args) ->
100+
rabbit_exchange:assert_args_equivalence(X, Args).
101+
102+
hash_mod(RKey, N) ->
103+
rabbit_murmur3:hash_32(RKey, ?SEED) rem N.

deps/rabbit/src/rabbit_murmur3.erl

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% @doc MurmurHash3 32-bit implementation.
9+
%%
10+
%% This module provides an efficient pure Erlang implementation of the
11+
%% MurmurHash3 32-bit hash algorithm. MurmurHash3 is a non-cryptographic
12+
%% hash function suitable for general hash-based lookup.
13+
%%
14+
%% Reference implementation:
15+
%% https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
16+
%%
17+
%% @end
18+
-module(rabbit_murmur3).
19+
20+
-export([hash_32/1, hash_32/2]).
21+
22+
-compile({inline, [rotl32/2, fmix32/1, process_tail/2]}).
23+
24+
%% MurmurHash3 constants
25+
-define(C1, 16#cc9e2d51).
26+
-define(C2, 16#1b873593).
27+
-define(MASK32, 16#ffffffff).
28+
29+
%% @doc Compute the 32-bit MurmurHash3 of the given data with seed 0.
30+
-spec hash_32(Data :: iodata()) -> non_neg_integer().
31+
hash_32(Data) ->
32+
hash_32(Data, 0).
33+
34+
%% @doc Compute the 32-bit MurmurHash3 of the given data with the specified seed.
35+
-spec hash_32(Data :: iodata(), Seed :: non_neg_integer()) -> non_neg_integer().
36+
hash_32(Data, Seed) when is_list(Data) ->
37+
hash_32(iolist_to_binary(Data), Seed);
38+
hash_32(Data, Seed) when is_binary(Data), is_integer(Seed), Seed >= 0 ->
39+
Len = byte_size(Data),
40+
{Tail, H1} = process_blocks(Data, Seed band ?MASK32),
41+
H2 = process_tail(Tail, H1),
42+
fmix32(H2 bxor Len).
43+
44+
%% Process 4-byte blocks, returns remaining tail and accumulated hash
45+
-spec process_blocks(binary(), non_neg_integer()) -> {binary(), non_neg_integer()}.
46+
process_blocks(<<K:32/unsigned-little-integer, Rest/binary>>, H) ->
47+
K1 = (K * ?C1) band ?MASK32,
48+
K2 = rotl32(K1, 15),
49+
K3 = (K2 * ?C2) band ?MASK32,
50+
H1 = H bxor K3,
51+
H2 = rotl32(H1, 13),
52+
H3 = ((H2 * 5) + 16#e6546b64) band ?MASK32,
53+
process_blocks(Rest, H3);
54+
process_blocks(Tail, H) ->
55+
{Tail, H}.
56+
57+
%% Process remaining bytes (tail: 0-3 bytes)
58+
-spec process_tail(binary(), non_neg_integer()) -> non_neg_integer().
59+
process_tail(<<>>, H) ->
60+
H;
61+
process_tail(<<B1:8>>, H) ->
62+
K1 = (B1 * ?C1) band ?MASK32,
63+
K2 = rotl32(K1, 15),
64+
K3 = (K2 * ?C2) band ?MASK32,
65+
H bxor K3;
66+
process_tail(<<B1:8, B2:8>>, H) ->
67+
K = (B2 bsl 8) bor B1,
68+
K1 = (K * ?C1) band ?MASK32,
69+
K2 = rotl32(K1, 15),
70+
K3 = (K2 * ?C2) band ?MASK32,
71+
H bxor K3;
72+
process_tail(<<B1:8, B2:8, B3:8>>, H) ->
73+
K = (B3 bsl 16) bor (B2 bsl 8) bor B1,
74+
K1 = (K * ?C1) band ?MASK32,
75+
K2 = rotl32(K1, 15),
76+
K3 = (K2 * ?C2) band ?MASK32,
77+
H bxor K3.
78+
79+
%% Final mix function
80+
-spec fmix32(non_neg_integer()) -> non_neg_integer().
81+
fmix32(H) ->
82+
H1 = H bxor (H bsr 16),
83+
H2 = (H1 * 16#85ebca6b) band ?MASK32,
84+
H3 = H2 bxor (H2 bsr 13),
85+
H4 = (H3 * 16#c2b2ae35) band ?MASK32,
86+
H4 bxor (H4 bsr 16).
87+
88+
%% 32-bit rotate left
89+
-spec rotl32(non_neg_integer(), 0..31) -> non_neg_integer().
90+
rotl32(X, R) ->
91+
((X bsl R) bor (X bsr (32 - R))) band ?MASK32.
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_murmur3_SUITE).
9+
10+
-include_lib("eunit/include/eunit.hrl").
11+
12+
-compile(export_all).
13+
14+
all() ->
15+
[
16+
{group, parallel_tests}
17+
].
18+
19+
groups() ->
20+
[
21+
{parallel_tests, [parallel], [
22+
hash_empty_string,
23+
hash_with_seed_zero,
24+
hash_with_custom_seed,
25+
hash_known_test_vectors,
26+
hash_binary_data,
27+
hash_iolist,
28+
hash_consistency,
29+
hash_distribution
30+
]}
31+
].
32+
33+
%% -------------------------------------------------------------------
34+
%% Test Cases
35+
%% -------------------------------------------------------------------
36+
37+
%% Test empty string hashing
38+
hash_empty_string(_Config) ->
39+
%% Empty string with seed 0 should produce 0
40+
%% Reference: with zero data and zero seed, everything becomes zero
41+
?assertEqual(0, rabbit_murmur3:hash_32(<<>>)),
42+
?assertEqual(0, rabbit_murmur3:hash_32(<<>>, 0)),
43+
%% Empty string with seed 1 - ignores nearly all the math
44+
?assertEqual(16#514E28B7, rabbit_murmur3:hash_32(<<>>, 1)),
45+
%% Empty string with seed 0xffffffff - make sure seed uses unsigned 32-bit math
46+
?assertEqual(16#81F16F39, rabbit_murmur3:hash_32(<<>>, 16#ffffffff)),
47+
passed.
48+
49+
%% Test hashing with seed 0
50+
hash_with_seed_zero(_Config) ->
51+
%% Known test vectors from reference implementation with seed 0
52+
%% Reference: https://gist.github.com/vladimirgamalyan/defb2482feefbf5c3ea25b14c557753b
53+
?assertEqual(16#B3DD93FA, rabbit_murmur3:hash_32(<<"abc">>)),
54+
?assertEqual(16#EE925B90, rabbit_murmur3:hash_32(<<"abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq">>)),
55+
passed.
56+
57+
%% Test hashing with custom seeds
58+
hash_with_custom_seed(_Config) ->
59+
%% Test with seed 0x9747b28c (common test seed)
60+
Seed = 16#9747b28c,
61+
62+
%% Reference test vectors with seed 0x9747b28c
63+
?assertEqual(16#5A97808A, rabbit_murmur3:hash_32(<<"aaaa">>, Seed)),
64+
?assertEqual(16#283E0130, rabbit_murmur3:hash_32(<<"aaa">>, Seed)),
65+
?assertEqual(16#5D211726, rabbit_murmur3:hash_32(<<"aa">>, Seed)),
66+
?assertEqual(16#7FA09EA6, rabbit_murmur3:hash_32(<<"a">>, Seed)),
67+
68+
%% Endian order within chunks
69+
?assertEqual(16#F0478627, rabbit_murmur3:hash_32(<<"abcd">>, Seed)),
70+
?assertEqual(16#C84A62DD, rabbit_murmur3:hash_32(<<"abc">>, Seed)),
71+
?assertEqual(16#74875592, rabbit_murmur3:hash_32(<<"ab">>, Seed)),
72+
73+
?assertEqual(16#24884CBA, rabbit_murmur3:hash_32(<<"Hello, world!">>, Seed)),
74+
?assertEqual(16#2FA826CD, rabbit_murmur3:hash_32(<<"The quick brown fox jumps over the lazy dog">>, Seed)),
75+
76+
%% Different seeds should produce different hashes
77+
Hash1 = rabbit_murmur3:hash_32(<<"test">>, Seed),
78+
Hash2 = rabbit_murmur3:hash_32(<<"test">>, 0),
79+
Hash3 = rabbit_murmur3:hash_32(<<"test">>, 42),
80+
?assertNotEqual(Hash1, Hash2),
81+
?assertNotEqual(Hash1, Hash3),
82+
?assertNotEqual(Hash2, Hash3),
83+
passed.
84+
85+
%% Test known test vectors from reference implementation
86+
hash_known_test_vectors(_Config) ->
87+
%% Test vectors verified against reference C implementation
88+
%% Reference: https://gist.github.com/vladimirgamalyan/defb2482feefbf5c3ea25b14c557753b
89+
90+
%% Binary array tests with seed 0
91+
%% Make sure 4-byte chunks use unsigned math
92+
?assertEqual(16#76293B50, rabbit_murmur3:hash_32(<<16#ff, 16#ff, 16#ff, 16#ff>>)),
93+
94+
%% Endian order test: bytes 0x21, 0x43, 0x65, 0x87 should be read as little-endian
95+
?assertEqual(16#F55B516B, rabbit_murmur3:hash_32(<<16#21, 16#43, 16#65, 16#87>>)),
96+
97+
%% Special seed value test
98+
?assertEqual(16#2362F9DE, rabbit_murmur3:hash_32(<<16#21, 16#43, 16#65, 16#87>>, 16#5082EDEE)),
99+
100+
%% Tail length tests (3, 2, 1 bytes)
101+
?assertEqual(16#7E4A8634, rabbit_murmur3:hash_32(<<16#21, 16#43, 16#65>>)),
102+
?assertEqual(16#A0F7B07A, rabbit_murmur3:hash_32(<<16#21, 16#43>>)),
103+
?assertEqual(16#72661CF4, rabbit_murmur3:hash_32(<<16#21>>)),
104+
105+
%% Zero bytes tests
106+
?assertEqual(16#2362F9DE, rabbit_murmur3:hash_32(<<0, 0, 0, 0>>)),
107+
?assertEqual(16#85F0B427, rabbit_murmur3:hash_32(<<0, 0, 0>>)),
108+
?assertEqual(16#30F4C306, rabbit_murmur3:hash_32(<<0, 0>>)),
109+
?assertEqual(16#514E28B7, rabbit_murmur3:hash_32(<<0>>)),
110+
passed.
111+
112+
%% Test binary data hashing
113+
hash_binary_data(_Config) ->
114+
%% Test with raw binary data
115+
Bin = <<0, 1, 2, 3, 4, 5, 6, 7, 8, 9>>,
116+
Hash = rabbit_murmur3:hash_32(Bin),
117+
?assert(is_integer(Hash)),
118+
?assert(Hash >= 0),
119+
?assert(Hash =< 16#ffffffff),
120+
121+
%% Test with binary containing high bytes
122+
Bin2 = <<255, 254, 253, 252, 251>>,
123+
Hash2 = rabbit_murmur3:hash_32(Bin2),
124+
?assert(is_integer(Hash2)),
125+
passed.
126+
127+
%% Test iolist input
128+
hash_iolist(_Config) ->
129+
%% iolist should produce same result as equivalent binary
130+
Binary = <<"hello world">>,
131+
IoList = [<<"hello">>, <<" ">>, <<"world">>],
132+
?assertEqual(rabbit_murmur3:hash_32(Binary), rabbit_murmur3:hash_32(IoList)),
133+
134+
%% Nested iolist
135+
NestedIoList = [[<<"hel">>, <<"lo">>], [<<" ">>, [<<"wor">>, <<"ld">>]]],
136+
?assertEqual(rabbit_murmur3:hash_32(Binary), rabbit_murmur3:hash_32(NestedIoList)),
137+
passed.
138+
139+
%% Test hash consistency
140+
hash_consistency(_Config) ->
141+
%% Same input should always produce same output
142+
Input = <<"consistent input">>,
143+
Hash1 = rabbit_murmur3:hash_32(Input),
144+
Hash2 = rabbit_murmur3:hash_32(Input),
145+
Hash3 = rabbit_murmur3:hash_32(Input),
146+
?assertEqual(Hash1, Hash2),
147+
?assertEqual(Hash2, Hash3),
148+
149+
%% Same input with same seed
150+
Hash4 = rabbit_murmur3:hash_32(Input, 12345),
151+
Hash5 = rabbit_murmur3:hash_32(Input, 12345),
152+
?assertEqual(Hash4, Hash5),
153+
passed.
154+
155+
%% Test hash distribution (basic avalanche property)
156+
hash_distribution(_Config) ->
157+
%% Generate hashes for sequential integers and verify reasonable distribution
158+
Hashes = [rabbit_murmur3:hash_32(integer_to_binary(N)) || N <- lists:seq(1, 1000)],
159+
UniqueHashes = length(lists:usort(Hashes)),
160+
161+
%% All hashes should be unique for this small set
162+
?assertEqual(1000, UniqueHashes),
163+
164+
%% Check that hashes are well distributed across the 32-bit range
165+
%% by verifying we have hashes in different "buckets"
166+
Buckets = lists:foldl(
167+
fun(H, Acc) ->
168+
Bucket = H div (16#ffffffff div 16),
169+
maps:update_with(Bucket, fun(V) -> V + 1 end, 1, Acc)
170+
end,
171+
#{},
172+
Hashes
173+
),
174+
%% We should have hashes in at least 10 of the 16 buckets
175+
?assert(maps:size(Buckets) >= 10),
176+
passed.

0 commit comments

Comments
 (0)