Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions internal/migrations/001-common-actions.prod.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,23 @@ CREATE OR REPLACE ACTION create_streams(

$base_uuid := uuid_generate_kwil('create_streams_' || @txid);

-- Get the data provider id
-- Permissionless onboarding: the per-stream fee is the only requirement.
-- If the caller has no data_providers row yet, create one now.
INSERT INTO data_providers (id, address, created_at)
VALUES (
COALESCE((SELECT MAX(id) FROM data_providers), 0) + 1,
$data_provider,
@height
)
ON CONFLICT (address) DO NOTHING;

$data_provider_id INT;
$dp_found BOOL := false;
for $data_provider_row in SELECT id
FROM data_providers
WHERE address = $data_provider
LIMIT 1 {
$dp_found := true;
$data_provider_id := $data_provider_row.id;
}

if $dp_found = false {
ERROR('Data provider not found: ' || $data_provider);
}

-- Create the streams using UNNEST for optimal performance
INSERT INTO streams (id, data_provider_id, data_provider, stream_id, stream_type, created_at, tx_id)
Expand Down
17 changes: 10 additions & 7 deletions internal/migrations/001-common-actions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,23 @@ CREATE OR REPLACE ACTION create_streams(

$base_uuid := uuid_generate_kwil('create_streams_' || @txid);

-- Get the data provider id
-- Permissionless onboarding: the per-stream fee is the only requirement.
-- If the caller has no data_providers row yet, create one now.
INSERT INTO data_providers (id, address, created_at)
VALUES (
COALESCE((SELECT MAX(id) FROM data_providers), 0) + 1,
$data_provider,
@height
)
ON CONFLICT (address) DO NOTHING;
Comment thread
MicBun marked this conversation as resolved.

$data_provider_id INT;
$dp_found BOOL := false;
for $data_provider_row in SELECT id
FROM data_providers
WHERE address = $data_provider
LIMIT 1 {
$dp_found := true;
$data_provider_id := $data_provider_row.id;
}

if $dp_found = false {
ERROR('Data provider not found: ' || $data_provider);
}

-- Create the streams using UNNEST for optimal performance
INSERT INTO streams (id, data_provider_id, data_provider, stream_id, stream_type, created_at, tx_id)
Expand Down
76 changes: 76 additions & 0 deletions tests/streams/stream_creation_fee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestStreamCreationFees(t *testing.T) {
testBatchChargesPerStreamFee(t),
testLeaderReceivesFees(t),
testUnenrolledWalletStillCharged(t),
testPermissionlessOnboarding(t),
},
}, testutils.GetTestOptionsWithCache())
}
Expand Down Expand Up @@ -370,6 +371,56 @@ func testUnenrolledWalletStillCharged(t *testing.T) func(ctx context.Context, pl
}
}

// Test 8: A wallet with NO data_providers row and NO system:network_writer role
// can create streams — the action auto-registers the data provider on first call.
// Second call from the same wallet succeeds via the ON CONFLICT DO NOTHING path.
func testPermissionlessOnboarding(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
userAddrVal := util.Unsafe_NewEthereumAddressFromString("0x9999999999999999999999999999999999999999")
userAddr := &userAddrVal

// Fund with 300 TRUF — enough for two single-stream creates.
// Deliberately skip CreateDataProvider / CreateDataProviderWithoutRole.
err := giveBalance(ctx, platform, userAddr.Address(), "300000000000000000000")
require.NoError(t, err, "failed to give balance")

initialBalance, err := getBalance(ctx, platform, userAddr.Address())
require.NoError(t, err, "failed to get initial balance")

// First create: auto-registers this wallet as a data provider.
err = createStream(ctx, platform, userAddr, "st00000000000000000000000000000c", "primitive")
require.NoError(t, err, "first stream creation should succeed without prior data provider registration")

dpCount, err := countDataProviders(ctx, platform, userAddr.Address())
require.NoError(t, err, "failed to count data providers after first create")
require.Equal(t, 1, dpCount, "exactly one data_providers row should exist after first create")

balanceAfterFirst, err := getBalance(ctx, platform, userAddr.Address())
require.NoError(t, err, "failed to get balance after first create")

expectedAfterFirst := new(big.Int).Sub(initialBalance, perStreamFee)
require.Equal(t, 0, expectedAfterFirst.Cmp(balanceAfterFirst),
"first create should charge 100 TRUF, expected %s but got %s", expectedAfterFirst, balanceAfterFirst)

// Second create: data provider already exists — ON CONFLICT DO NOTHING path.
err = createStream(ctx, platform, userAddr, "st00000000000000000000000000000d", "primitive")
require.NoError(t, err, "second stream creation should succeed (ON CONFLICT path)")

dpCount, err = countDataProviders(ctx, platform, userAddr.Address())
require.NoError(t, err, "failed to count data providers after second create")
require.Equal(t, 1, dpCount, "data_providers row count must remain 1 after ON CONFLICT path")

balanceAfterSecond, err := getBalance(ctx, platform, userAddr.Address())
require.NoError(t, err, "failed to get balance after second create")

expectedAfterSecond := new(big.Int).Sub(balanceAfterFirst, perStreamFee)
require.Equal(t, 0, expectedAfterSecond.Cmp(balanceAfterSecond),
"second create should charge another 100 TRUF, expected %s but got %s", expectedAfterSecond, balanceAfterSecond)

return nil
}
}

// ===== HELPER FUNCTIONS =====

// revokeRoleBypass revokes a role using direct SQL with OverrideAuthz
Expand Down Expand Up @@ -437,6 +488,31 @@ func getBalance(ctx context.Context, platform *kwilTesting.Platform, wallet stri
return balance, nil
}

// countDataProviders returns the number of data_providers rows matching the given address.
func countDataProviders(ctx context.Context, platform *kwilTesting.Platform, address string) (int, error) {
engineCtx := &common.EngineContext{
TxContext: &common.TxContext{
Ctx: ctx,
BlockContext: &common.BlockContext{Height: 0},
TxID: platform.Txid(),
Signer: []byte("system"),
Caller: "0x0000000000000000000000000000000000000000",
},
OverrideAuthz: true,
}

var count int
err := platform.Engine.Execute(engineCtx, platform.DB,
`SELECT COUNT(*) AS cnt FROM data_providers WHERE address = $addr`,
map[string]any{"$addr": address},
func(row *common.Row) error {
count = int(row.Values[0].(int64))
return nil
},
)
return count, err
}

// callCreateStreamsAction is the base implementation - calls the create_streams action
func callCreateStreamsAction(ctx context.Context, platform *kwilTesting.Platform, signer *util.EthereumAddress, leaderPub *crypto.Secp256k1PublicKey, streamIds []string, streamTypes []string) error {
tx := &common.TxContext{
Expand Down
87 changes: 44 additions & 43 deletions tests/streams/taxonomy_fee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func setupTaxonomyTestEnvironment(t *testing.T) func(ctx context.Context, platfo
}

// Test 1: Wallet with network_writer role still pays insert_taxonomy fees.
// 100 TRUF in → 1 (composed) + 1 (child) + 1 (1-child taxonomy) = 3 TRUF spent.
// 300 TRUF in → 100 (composed) + 100 (child) + 1 (1-child taxonomy) = 201 TRUF spent.
func testTaxonomyWriterRolePaysFee(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
writerAddrVal := util.Unsafe_NewEthereumAddressFromString("0x2111111111111111111111111111111111111111")
Expand All @@ -86,7 +86,7 @@ func testTaxonomyWriterRolePaysFee(t *testing.T) func(ctx context.Context, platf
err = setup.AddMemberToRoleBypass(ctx, platform, "system", "fee_required", writerAddr.Address())
require.NoError(t, err, "failed to enroll in fee_required role")

err = giveBalance(ctx, platform, writerAddr.Address(), "100000000000000000000") // 100 TRUF
err = giveBalance(ctx, platform, writerAddr.Address(), "300000000000000000000") // 300 TRUF
require.NoError(t, err, "failed to give balance")

initialBalance, err := getBalance(ctx, platform, writerAddr.Address())
Expand All @@ -112,19 +112,18 @@ func testTaxonomyWriterRolePaysFee(t *testing.T) func(ctx context.Context, platf
finalBalance, err := getBalance(ctx, platform, writerAddr.Address())
require.NoError(t, err, "failed to get final balance")

// 1 (composed create) + 1 (child create) + 1 (taxonomy w/ 1 child) — flat per tx.
threeTRUF := mustParseBigInt("3000000000000000000")
expectedBalance := new(big.Int).Sub(initialBalance, threeTRUF)
// 100 (composed create) + 100 (child create) + 1 (taxonomy w/ 1 child) = 201 TRUF.
totalFee := mustParseBigInt("201000000000000000000")
expectedBalance := new(big.Int).Sub(initialBalance, totalFee)
require.Equal(t, 0, expectedBalance.Cmp(finalBalance),
"network_writer should pay 3 TRUF total, expected %s but got %s", expectedBalance, finalBalance)
"network_writer should pay 201 TRUF total, expected %s but got %s", expectedBalance, finalBalance)

return nil
}
}

// Test 2: Non-exempt wallet (without network_writer role) pays a flat 1 TRUF
// per write tx — fund precisely the 3 TRUF needed (1 composed + 1 child + 1 taxonomy)
// to prove the per-tx invariant.
// per insert_taxonomy tx. Fund exactly 201 TRUF: 100 (composed) + 100 (child) + 1 (taxonomy).
func testTaxonomyNonExemptWalletPaysFee(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
nonExemptAddrVal := util.Unsafe_NewEthereumAddressFromString("0x3222222222222222222222222222222222222222")
Expand All @@ -134,34 +133,31 @@ func testTaxonomyNonExemptWalletPaysFee(t *testing.T) func(ctx context.Context,
err := setup.CreateDataProviderWithoutRole(ctx, platform, nonExemptAddr.Address())
require.NoError(t, err, "failed to create data provider without role")

// Enroll wallet in fee_required so all three writes (composed,
// child, taxonomy) are charged.
// Enroll wallet in fee_required so taxonomy writes are charged.
err = setup.AddMemberToRoleBypass(ctx, platform, "system", "fee_required", nonExemptAddr.Address())
require.NoError(t, err, "failed to enroll in fee_required role")

// Give exactly 3 TRUF: 1 (composed stream fee) + 1 (child stream fee) + 1 (taxonomy fee)
threeTRUF := mustParseBigInt("3000000000000000000") // 3 TRUF
err = giveBalance(ctx, platform, nonExemptAddr.Address(), threeTRUF.String())
// Give exactly 201 TRUF: 100 (composed) + 100 (child) + 1 (taxonomy)
exactFund := mustParseBigInt("201000000000000000000") // 201 TRUF
err = giveBalance(ctx, platform, nonExemptAddr.Address(), exactFund.String())
require.NoError(t, err, "failed to give balance")

// Get initial balance
initialBalance, err := getBalance(ctx, platform, nonExemptAddr.Address())
require.NoError(t, err, "failed to get initial balance")
require.Equal(t, threeTRUF, initialBalance, "Initial balance should be 3 TRUF")
require.Equal(t, exactFund, initialBalance, "Initial balance should be 201 TRUF")

// Create streams using direct engine calls (each costs a flat 1 TRUF)
composedStreamId := util.GenerateStreamId("taxonomy_nonexempt_composed")
childStreamId := util.GenerateStreamId("taxonomy_nonexempt_child")

// Create composed stream (costs 1 TRUF)
// Create composed stream (costs 100 TRUF)
err = createStream(ctx, platform, nonExemptAddr, composedStreamId.String(), "composed")
require.NoError(t, err, "failed to create composed stream")

// Create child stream (costs 1 TRUF)
// Create child stream (costs 100 TRUF)
err = createStream(ctx, platform, nonExemptAddr, childStreamId.String(), "primitive")
require.NoError(t, err, "failed to create child stream")

// Balance after stream creation should be 1 TRUF (3 - 1 - 1)
// Balance after stream creation should be 1 TRUF (201 - 100 - 100)
balanceAfterStreams, err := getBalance(ctx, platform, nonExemptAddr.Address())
require.NoError(t, err, "failed to get balance after stream creation")
require.Equal(t, oneTRUFTaxonomy, balanceAfterStreams, "Balance should be 1 TRUF after creating streams")
Expand Down Expand Up @@ -200,13 +196,13 @@ func testTaxonomyInsufficientBalance(t *testing.T) func(ctx context.Context, pla
err = setup.AddMemberToRoleBypass(ctx, platform, "system", "fee_required", insufficientAddr.Address())
require.NoError(t, err, "failed to enroll in fee_required role")

// Give exactly 2 TRUF: enough for the two create_stream calls (1 + 1)
// Give exactly 200 TRUF: enough for two create_stream calls (100 + 100)
// but nothing left over for the 1 TRUF taxonomy fee.
twoTRUF := mustParseBigInt("2000000000000000000")
err = giveBalance(ctx, platform, insufficientAddr.Address(), twoTRUF.String())
twoHundredTRUF := mustParseBigInt("200000000000000000000")
err = giveBalance(ctx, platform, insufficientAddr.Address(), twoHundredTRUF.String())
require.NoError(t, err, "failed to give balance")

// Create streams (costs 2 TRUF total, leaving 0)
// Create streams (costs 200 TRUF total, leaving 0)
composedStreamId := util.GenerateStreamId("taxonomy_insufficient_composed")
childStreamId := util.GenerateStreamId("taxonomy_insufficient_child")

Expand All @@ -216,7 +212,7 @@ func testTaxonomyInsufficientBalance(t *testing.T) func(ctx context.Context, pla
err = createStream(ctx, platform, insufficientAddr, childStreamId.String(), "primitive")
require.NoError(t, err, "failed to create child stream")

// Should have 0 TRUF left (2 - 1 - 1 = 0), not enough for the 1 TRUF taxonomy fee
// Should have 0 TRUF left (200 - 100 - 100 = 0), not enough for the 1 TRUF taxonomy fee
remainingBalance, err := getBalance(ctx, platform, insufficientAddr.Address())
require.NoError(t, err, "failed to get remaining balance")
require.Equal(t, big.NewInt(0), remainingBalance, "Should have 0 TRUF left after creating streams")
Expand Down Expand Up @@ -253,35 +249,34 @@ func testTaxonomyMultipleChildrenChargesFlatFee(t *testing.T) func(ctx context.C
err = setup.AddMemberToRoleBypass(ctx, platform, "system", "fee_required", multiAddr.Address())
require.NoError(t, err, "failed to enroll in fee_required role")

// Give exactly 5 TRUF: 1 (composed) + 3 (3 children) + 1 (taxonomy, flat).
// Give exactly 401 TRUF: 100 (composed) + 300 (3 children) + 1 (taxonomy, flat).
// If the migration were still per-child, the 3-child taxonomy would
// cost 3 TRUF and this test would fail with insufficient balance.
fiveTRUF := mustParseBigInt("5000000000000000000")
err = giveBalance(ctx, platform, multiAddr.Address(), fiveTRUF.String())
exactFund := mustParseBigInt("401000000000000000000")
err = giveBalance(ctx, platform, multiAddr.Address(), exactFund.String())
require.NoError(t, err, "failed to give balance")

// Get initial balance
initialBalance, err := getBalance(ctx, platform, multiAddr.Address())
require.NoError(t, err, "failed to get initial balance")
require.Equal(t, fiveTRUF, initialBalance, "Initial balance should be 5 TRUF")
require.Equal(t, exactFund, initialBalance, "Initial balance should be 401 TRUF")

// Create streams (costs 1 + 3 = 4 TRUF total)
// Create streams (costs 100 + 300 = 400 TRUF total)
composedStreamId := util.GenerateStreamId("taxonomy_multi_composed")
child1StreamId := util.GenerateStreamId("taxonomy_multi_child1")
child2StreamId := util.GenerateStreamId("taxonomy_multi_child2")
child3StreamId := util.GenerateStreamId("taxonomy_multi_child3")

// Create composed stream (costs 1 TRUF)
// Create composed stream (costs 100 TRUF)
err = createStream(ctx, platform, multiAddr, composedStreamId.String(), "composed")
require.NoError(t, err, "failed to create composed stream")

// Create 3 child streams (costs 3 TRUF total, one per create_stream call)
// Create 3 child streams (costs 300 TRUF total, 100 per create_stream call)
for _, childId := range []util.StreamId{child1StreamId, child2StreamId, child3StreamId} {
err = createStream(ctx, platform, multiAddr, childId.String(), "primitive")
require.NoError(t, err, "failed to create child stream")
}

// Balance after stream creation should be 1 TRUF (5 - 4)
// Balance after stream creation should be 1 TRUF (401 - 400)
balanceAfterStreams, err := getBalance(ctx, platform, multiAddr.Address())
require.NoError(t, err, "failed to get balance after stream creation")
require.Equal(t, oneTRUFTaxonomy, balanceAfterStreams, "Balance should be 1 TRUF after creating streams")
Expand All @@ -305,9 +300,9 @@ func testTaxonomyMultipleChildrenChargesFlatFee(t *testing.T) func(ctx context.C
}
}

// Test 5: A wallet not enrolled in system:fee_required runs the whole
// composed/child/taxonomy sequence for free. Phased rollout of #3805 —
// until enrolled, none of the three write actions touches the bridge.
// Test 5: A wallet not enrolled in system:fee_required pays the universal
// stream creation fee (100 TRUF/stream) but the taxonomy insertion is free.
// Phased rollout of #3805 — insert_taxonomy fee only fires for enrolled wallets.
func testTaxonomyUnenrolledWalletWritesFree(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
freeAddrVal := util.Unsafe_NewEthereumAddressFromString("0x6555555555555555555555555555555555555555")
Expand All @@ -316,20 +311,26 @@ func testTaxonomyUnenrolledWalletWritesFree(t *testing.T) func(ctx context.Conte
err := setup.CreateDataProviderWithoutRole(ctx, platform, freeAddr.Address())
require.NoError(t, err, "failed to create data provider without role")

// NOT enrolled in fee_required. Wallet has zero TRUF.
initialBalance, err := getBalance(ctx, platform, freeAddr.Address())
require.NoError(t, err, "failed to get initial balance")
require.Equal(t, big.NewInt(0), initialBalance, "free-write wallet should start with zero TRUF")
// Fund with exactly 200 TRUF for two stream creations (100 each, universal).
// NOT enrolled in fee_required — taxonomy itself should be free.
err = giveBalance(ctx, platform, freeAddr.Address(), "200000000000000000000")
require.NoError(t, err, "failed to give balance")

composedStreamId := util.GenerateStreamId("taxonomy_free_composed")
childStreamId := util.GenerateStreamId("taxonomy_free_child")

err = createStream(ctx, platform, freeAddr, composedStreamId.String(), "composed")
require.NoError(t, err, "un-enrolled wallet should create composed stream for free")
require.NoError(t, err, "failed to create composed stream")

err = createStream(ctx, platform, freeAddr, childStreamId.String(), "primitive")
require.NoError(t, err, "un-enrolled wallet should create child stream for free")
require.NoError(t, err, "failed to create child stream")

// After 2 stream creations (200 TRUF spent), balance should be 0
balanceAfterStreams, err := getBalance(ctx, platform, freeAddr.Address())
require.NoError(t, err, "failed to get balance after stream creation")
require.Equal(t, big.NewInt(0), balanceAfterStreams, "Balance should be 0 after creating streams")

// Taxonomy insertion should succeed for free (not enrolled in fee_required)
err = insertTaxonomy(ctx, platform, freeAddr,
freeAddr.Address(), composedStreamId.String(),
[]string{freeAddr.Address()},
Expand All @@ -340,7 +341,7 @@ func testTaxonomyUnenrolledWalletWritesFree(t *testing.T) func(ctx context.Conte

finalBalance, err := getBalance(ctx, platform, freeAddr.Address())
require.NoError(t, err, "failed to get final balance")
require.Equal(t, big.NewInt(0), finalBalance, "un-enrolled wallet must not be charged for any of the three writes")
require.Equal(t, big.NewInt(0), finalBalance, "un-enrolled wallet must not be charged for taxonomy insertion")

return nil
}
Expand Down
Loading