From e955568b887c570248bc2739ec3899f273651c8c Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 29 Apr 2026 15:31:14 +0200 Subject: [PATCH 1/5] dealpusher: switch PDP path to FWSS-mediated /pdp/piece/pull Drops the broken create-with-self-as-SP flow in favor of the FWSS-mediated SP-submits model. Singularity now never touches PDPVerifier directly. For each batch it: looks up the SP's PDP service URL via ServiceProviderRegistry, signs the EIP-712 extraData, POSTs /pdp/piece/pull to have the SP fetch pieces from singularity's content provider, waits for SP-side completion, then POSTs /pdp/data-sets/create-and-add (new sets, atomic with first batch) or /pdp/data-sets/{id}/pieces (existing assembling sets) to trigger the on-chain commit which the SP submits from its own wallet. - pdp_pull.go (new): OnChainPDP adapter. SP service-URL discovery with caching, EIP-712 signing via go-synapse SignDigest, atomic create-and-add for new sets, add-pieces for existing. ClientDataSetID persisted on PDPProofSet. - pdp_api.go: PDPProofSetManager collapses to one method, PullPiecesToFWSS. PDPSchedulingConfig drops ConfirmationDepth + PollingInterval, gains PullTimeout. PDPTransactionConfirmer, PDPQueuedTx, PDPTransactionReceipt deleted (Curio handles confirmation). - pdp_schedule.go: drops the currentProofSetID transfer-handoff state machine and resolveProviderEVMAddress; calls PullPiecesToFWSS per batch. - pdp_onchain.go + pdp_onchain_test.go deleted. - model.PDPProofSet: adds ClientDataSetID + ServiceURL. ProposedProviderEVM field dropped from struct (column lingers harmlessly on existing DBs). - cmd/run/dealpusher.go: replaces --pdp-confirmation-depth / --pdp-poll-interval with --pdp-pull-timeout. Adds required --pdp-source-url-base and optional --pdp-record-keeper. - send-manual-pdp.go: rewritten for the new flow. - pdp_wiring_test.go, pdp_api_test.go: updated mocks + assertions. NOTE: requires go-synapse feat/signdigest-authhelper. Branch carries a local replace directive; drop it and bump go-synapse once the PR lands. --- cmd/deal/send-manual-pdp.go | 91 ++--- cmd/run/dealpusher.go | 41 ++- model/replication.go | 28 +- service/dealpusher/dealpusher.go | 17 +- service/dealpusher/options.go | 6 - service/dealpusher/pdp_api.go | 88 ++--- service/dealpusher/pdp_api_test.go | 20 +- service/dealpusher/pdp_onchain.go | 331 ------------------ service/dealpusher/pdp_onchain_test.go | 186 ---------- service/dealpusher/pdp_pull.go | 451 +++++++++++++++++++++++++ service/dealpusher/pdp_schedule.go | 130 +------ service/dealpusher/pdp_wiring_test.go | 149 +++----- 12 files changed, 676 insertions(+), 862 deletions(-) delete mode 100644 service/dealpusher/pdp_onchain.go delete mode 100644 service/dealpusher/pdp_onchain_test.go create mode 100644 service/dealpusher/pdp_pull.go diff --git a/cmd/deal/send-manual-pdp.go b/cmd/deal/send-manual-pdp.go index 0530ba5c1..d6d95e3d8 100644 --- a/cmd/deal/send-manual-pdp.go +++ b/cmd/deal/send-manual-pdp.go @@ -18,9 +18,11 @@ import ( var SendManualPDPCmd = &cli.Command{ Name: "send-manual-pdp", - Usage: "Send a manual PDP deal on-chain", - Description: `Create/reuse a proof set and add a piece to it on-chain via PDPVerifier. - Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1`, + Usage: "Send a manual PDP deal via the FWSS-pull flow", + Description: `Push a single piece to an SP via Curio's /pdp/piece/pull, then trigger the +SP's on-chain commit (createDataSet+addPieces if no assembling set yet, or addPieces +into the existing one). Useful for e2e/diagnostic testing of the FWSS pull path. + Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 --source-url-base https://static.example.org`, Flags: []cli.Flag{ &cli.StringFlag{ Name: "client", @@ -34,12 +36,12 @@ var SendManualPDPCmd = &cli.Command{ }, &cli.StringFlag{ Name: "piece-cid", - Usage: "Piece CID (commp)", + Usage: "Piece CID (commp v1)", Required: true, }, &cli.Int64Flag{ Name: "piece-size", - Usage: "Piece size in bytes", + Usage: "Padded piece size in bytes", Required: true, }, &cli.StringFlag{ @@ -48,10 +50,21 @@ var SendManualPDPCmd = &cli.Command{ EnvVars: []string{"ETH_RPC_URL"}, Required: true, }, - &cli.Uint64Flag{ - Name: "confirmation-depth", - Usage: "Blocks to wait for tx confirmation", - Value: 5, + &cli.StringFlag{ + Name: "source-url-base", + Usage: "HTTPS base where Curio fetches the piece (sourceUrl = /piece/)", + EnvVars: []string{"PDP_SOURCE_URL_BASE"}, + Required: true, + }, + &cli.StringFlag{ + Name: "record-keeper", + Usage: "FWSS contract address. Defaults to network FWSS from go-synapse.", + EnvVars: []string{"PDP_RECORD_KEEPER"}, + }, + &cli.DurationFlag{ + Name: "pull-timeout", + Usage: "How long to wait for Curio to finish each phase", + Value: 5 * time.Minute, }, }, Action: func(c *cli.Context) error { @@ -61,13 +74,17 @@ var SendManualPDPCmd = &cli.Command{ } defer closer.Close() - pdp, err := dealpusher.NewOnChainPDP(c.Context, db, c.String("eth-rpc")) + pdp, err := dealpusher.NewOnChainPDP(c.Context, dealpusher.OnChainPDPConfig{ + DB: db, + RPCURL: c.String("eth-rpc"), + SourceURLBase: c.String("source-url-base"), + RecordKeeper: c.String("record-keeper"), + }) if err != nil { return errors.WithStack(err) } defer pdp.Close() - // load wallet var walletObj model.Wallet err = db.WithContext(c.Context).Where("address = ?", c.String("client")).First(&walletObj).Error if errors.Is(err, gorm.ErrRecordNotFound) { @@ -86,56 +103,40 @@ var SendManualPDPCmd = &cli.Command{ return errors.WithStack(err) } - provider := c.String("provider") - - cfg := dealpusher.PDPSchedulingConfig{ - BatchSize: 1, - MaxPiecesPerProofSet: 1024, - ConfirmationDepth: c.Uint64("confirmation-depth"), - PollingInterval: 5 * time.Second, - } - - // ensure proof set exists (or create one) - fmt.Println("ensuring proof set...") - proofSetID, err := pdp.EnsureProofSet(c.Context, evmSigner, provider, cfg) - if err != nil { - return errors.Wrap(err, "failed to ensure proof set") - } - fmt.Printf("proof set ID: %d\n", proofSetID) - - // parse piece cid pieceCID, err := cid.Parse(c.String("piece-cid")) if err != nil { return errors.Wrap(err, "invalid piece CID") } - - // add piece to proof set - fmt.Println("submitting add-roots tx...") pieceSize := c.Int64("piece-size") - queuedTx, err := pdp.QueueAddRoots(c.Context, evmSigner, proofSetID, []cid.Cid{pieceCID}, []int64{pieceSize}, cfg) - if err != nil { - return errors.Wrap(err, "failed to add roots") + + cfg := dealpusher.PDPSchedulingConfig{ + BatchSize: 1, + MaxPiecesPerProofSet: 1024, + PullTimeout: c.Duration("pull-timeout"), } - fmt.Printf("tx: %s\n", queuedTx.Hash) - // wait for confirmation - fmt.Println("waiting for confirmation...") - receipt, err := pdp.WaitForConfirmations(c.Context, queuedTx.Hash, cfg.ConfirmationDepth, cfg.PollingInterval) + fmt.Println("pushing piece to SP via /pdp/piece/pull + on-chain commit...") + result, err := pdp.PullPiecesToFWSS( + c.Context, + evmSigner, + c.String("provider"), + []dealpusher.PDPPieceInput{{PieceCID: pieceCID, PieceSize: pieceSize}}, + cfg, + ) if err != nil { - return errors.Wrap(err, "tx failed") + return errors.Wrap(err, "FWSS pull push failed") } - fmt.Printf("confirmed at block %d (gas: %d)\n", receipt.BlockNumber, receipt.GasUsed) + fmt.Printf("data set ID: %d\n", result.DataSetID) - // save deal record - proofSetIDCopy := proofSetID + dataSetIDCopy := result.DataSetID dealModel := &model.Deal{ State: model.DealProposed, DealType: model.DealTypePDP, - Provider: provider, + Provider: c.String("provider"), PieceCID: model.CID(pieceCID), PieceSize: pieceSize, WalletID: &walletObj.ID, - ProofSetID: &proofSetIDCopy, + ProofSetID: &dataSetIDCopy, } if err := db.WithContext(c.Context).Create(dealModel).Error; err != nil { return errors.Wrap(err, "failed to save deal") diff --git a/cmd/run/dealpusher.go b/cmd/run/dealpusher.go index b78a25ff0..c36360b4a 100644 --- a/cmd/run/dealpusher.go +++ b/cmd/run/dealpusher.go @@ -29,23 +29,28 @@ var DealPusherCmd = &cli.Command{ }, &cli.IntFlag{ Name: "pdp-batch-size", - Usage: "Number of roots to include in each PDP add-roots transaction", + Usage: "Number of pieces to include in each /pdp/piece/pull request", Value: 128, }, &cli.IntFlag{ Name: "pdp-max-pieces-per-proofset", - Usage: "Maximum pieces per proof set before handing off to the storage provider", + Usage: "Maximum pieces per proof set before starting a new one", Value: 1024, }, - &cli.Uint64Flag{ - Name: "pdp-confirmation-depth", - Usage: "Number of block confirmations required for PDP transactions", - Value: 5, - }, &cli.DurationFlag{ - Name: "pdp-poll-interval", - Usage: "Polling interval for PDP transaction confirmation checks", - Value: 30 * time.Second, + Name: "pdp-pull-timeout", + Usage: "How long to wait for Curio to finish pulling a batch (per request)", + Value: 5 * time.Minute, + }, + &cli.StringFlag{ + Name: "pdp-source-url-base", + Usage: "HTTPS base URL where Curio fetches pieces from; sourceUrl is built as /piece/", + EnvVars: []string{"PDP_SOURCE_URL_BASE"}, + }, + &cli.StringFlag{ + Name: "pdp-record-keeper", + Usage: "FWSS contract address (recordKeeper). Defaults to the network default from go-synapse.", + EnvVars: []string{"PDP_RECORD_KEEPER"}, }, &cli.StringFlag{ Name: "eth-rpc", @@ -114,8 +119,7 @@ var DealPusherCmd = &cli.Command{ pdpCfg := dealpusher.PDPSchedulingConfig{ BatchSize: c.Int("pdp-batch-size"), MaxPiecesPerProofSet: c.Int("pdp-max-pieces-per-proofset"), - ConfirmationDepth: c.Uint64("pdp-confirmation-depth"), - PollingInterval: c.Duration("pdp-poll-interval"), + PullTimeout: c.Duration("pdp-pull-timeout"), } if err := pdpCfg.Validate(); err != nil { return errors.WithStack(err) @@ -125,16 +129,19 @@ var DealPusherCmd = &cli.Command{ dealpusher.WithPDPSchedulingConfig(pdpCfg), } if rpcURL := c.String("eth-rpc"); rpcURL != "" { - pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, db, rpcURL) + adapterCfg := dealpusher.OnChainPDPConfig{ + DB: db, + RPCURL: rpcURL, + SourceURLBase: c.String("pdp-source-url-base"), + RecordKeeper: c.String("pdp-record-keeper"), + } + pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, adapterCfg) if err != nil { return errors.Wrap(err, "failed to initialize PDP on-chain adapter") } defer pdpAdapter.Close() - opts = append(opts, - dealpusher.WithPDPProofSetManager(pdpAdapter), - dealpusher.WithPDPTransactionConfirmer(pdpAdapter), - ) + opts = append(opts, dealpusher.WithPDPProofSetManager(pdpAdapter)) } if ddoContract := c.String("ddo-contract"); ddoContract != "" { diff --git a/model/replication.go b/model/replication.go index 0134194ba..ec6d63920 100644 --- a/model/replication.go +++ b/model/replication.go @@ -219,14 +219,22 @@ const ( // This is a materialized view built from Shovel-indexed events, replacing // the per-cycle RPC scans of GetProofSets/GetProofSetsForClient. type PDPProofSet struct { - SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"` - ClientAddress string `gorm:"not null;index" json:"clientAddress"` - Provider string `gorm:"not null" json:"provider"` - IsLive bool `gorm:"default:false" json:"isLive"` - ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"` - CreatedBlock int64 `gorm:"not null" json:"createdBlock"` - Deleted bool `gorm:"default:false" json:"deleted"` - HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"` - ProposedProviderEVM string ` json:"proposedProviderEVM,omitempty"` - PieceCount int `gorm:"default:0" json:"pieceCount"` + SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"` + ClientAddress string `gorm:"not null;index" json:"clientAddress"` + Provider string `gorm:"not null" json:"provider"` + IsLive bool `gorm:"default:false" json:"isLive"` + ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"` + CreatedBlock int64 `gorm:"not null" json:"createdBlock"` + Deleted bool `gorm:"default:false" json:"deleted"` + HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"` + PieceCount int `gorm:"default:0" json:"pieceCount"` + // ClientDataSetID is the per-(payer, set) nonce the client signs into + // CreateDataSet/AddPieces extraData. FWSS rejects reused IDs via its + // clientNonces[payer][id] check, so we persist before signing to make + // retries idempotent. Stored decimal because uint256 doesn't fit any + // native int. + ClientDataSetID string ` json:"clientDataSetId,omitempty"` + // ServiceURL caches the SP's PDP HTTP endpoint, fetched from + // ServiceProviderRegistry the first time we push to this provider. + ServiceURL string ` json:"serviceUrl,omitempty"` } diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go index 6aba9d0c0..f193febca 100644 --- a/service/dealpusher/dealpusher.go +++ b/service/dealpusher/dealpusher.go @@ -40,15 +40,14 @@ var waitPendingInterval = time.Minute // DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process. type DealPusher struct { - dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection. - keyStore keystore.KeyStore // Keystore for loading private keys - lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries - dealMaker replication.DealMaker // Object responsible for making a deal in replication. - pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager. - pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer. - pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for root batching and tx confirmation. - ddoDealManager DDODealManager // Optional DDO deal lifecycle manager. - ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation. + dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection. + keyStore keystore.KeyStore // Keystore for loading private keys + lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries + dealMaker replication.DealMaker // Object responsible for making a deal in replication. + pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager. + pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for batch sizing and pull timeouts. + ddoDealManager DDODealManager // Optional DDO deal lifecycle manager. + ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation. // Resolver is injected so tests and future wiring can switch deal type behavior without coupling DealPusher to config storage. scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType workerID uuid.UUID // UUID identifying the associated worker. diff --git a/service/dealpusher/options.go b/service/dealpusher/options.go index 649dd2f61..ed9fb04c2 100644 --- a/service/dealpusher/options.go +++ b/service/dealpusher/options.go @@ -11,12 +11,6 @@ func WithPDPProofSetManager(manager PDPProofSetManager) Option { } } -func WithPDPTransactionConfirmer(confirmer PDPTransactionConfirmer) Option { - return func(d *DealPusher) { - d.pdpTxConfirmer = confirmer - } -} - func WithPDPSchedulingConfig(cfg PDPSchedulingConfig) Option { return func(d *DealPusher) { d.pdpSchedulingConfig = cfg diff --git a/service/dealpusher/pdp_api.go b/service/dealpusher/pdp_api.go index 412388280..9ea001bd1 100644 --- a/service/dealpusher/pdp_api.go +++ b/service/dealpusher/pdp_api.go @@ -3,20 +3,23 @@ package dealpusher import ( "context" "errors" - "math/big" "time" "github.com/data-preservation-programs/go-synapse/signer" - "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-cid" ) -// PDPSchedulingConfig holds PDP-specific scheduling knobs for on-chain operations. +// PDPSchedulingConfig holds PDP-specific scheduling knobs for the FWSS-pull +// flow. type PDPSchedulingConfig struct { - BatchSize int // pieces per addPieces transaction (gas constraint) - MaxPiecesPerProofSet int // pieces per proof set before handoff to SP - ConfirmationDepth uint64 - PollingInterval time.Duration + // BatchSize bounds pieces per /pdp/piece/pull request. + BatchSize int + // MaxPiecesPerProofSet bounds pieces per data set; the scheduler starts + // a new set when the current one fills. + MaxPiecesPerProofSet int + // PullTimeout bounds the time we wait for Curio to finish pulling a + // batch (per request, not aggregate). + PullTimeout time.Duration } // Validate validates PDP scheduling configuration. @@ -27,44 +30,51 @@ func (c PDPSchedulingConfig) Validate() error { if c.MaxPiecesPerProofSet <= 0 { return errors.New("pdp max pieces per proof set must be greater than 0") } - if c.ConfirmationDepth == 0 { - return errors.New("pdp confirmation depth must be greater than 0") - } - if c.PollingInterval <= 0 { - return errors.New("pdp polling interval must be greater than 0") + if c.PullTimeout <= 0 { + return errors.New("pdp pull timeout must be greater than 0") } return nil } -// PDPProofSetManager defines proof set lifecycle operations needed by scheduling. -// All methods take an EVMSigner because they submit FEVM transactions. -type PDPProofSetManager interface { - // EnsureProofSet returns an existing proof set ID in assembling state or creates one. - EnsureProofSet(ctx context.Context, evmSigner signer.EVMSigner, provider string, cfg PDPSchedulingConfig) (uint64, error) - // QueueAddRoots submits root additions for a proof set and returns the queued tx reference. - // pieceSizes are the padded piece sizes corresponding to each CID, needed for CommPv2 conversion. - QueueAddRoots(ctx context.Context, evmSigner signer.EVMSigner, proofSetID uint64, pieceCIDs []cid.Cid, pieceSizes []int64, cfg PDPSchedulingConfig) (*PDPQueuedTx, error) - // ProposeTransfer proposes the proof set for handoff to the given SP. - ProposeTransfer(ctx context.Context, evmSigner signer.EVMSigner, proofSetID uint64, spEVMAddress common.Address) error - // IncrementPieceCount atomically increments the durable piece count after confirmed on-chain add. - IncrementPieceCount(ctx context.Context, proofSetID uint64, count int) error +// PDPPieceInput names a piece the scheduler wants pushed to the SP. The +// implementation constructs the SP-side source URL. +type PDPPieceInput struct { + PieceCID cid.Cid + // PieceSize is the padded piece size, needed for CommPv2 conversion + // before signing. + PieceSize int64 } -// PDPTransactionConfirmer defines confirmation checks for queued on-chain transactions. -type PDPTransactionConfirmer interface { - WaitForConfirmations(ctx context.Context, txHash string, depth uint64, pollInterval time.Duration) (*PDPTransactionReceipt, error) +// PDPPullResult reports the outcome of a /pdp/piece/pull batch. +type PDPPullResult struct { + // DataSetID is the FWSS-listened data set the pieces ended up in. + // For new sets, this is the SetID Curio returns after the + // createDataSet+addPieces tx confirms. + DataSetID uint64 } -// PDPQueuedTx represents an on-chain transaction submitted by PDP scheduling. -type PDPQueuedTx struct { - Hash string -} - -// PDPTransactionReceipt represents a confirmed on-chain transaction result. -type PDPTransactionReceipt struct { - Hash string - BlockNumber uint64 - GasUsed uint64 - Status uint64 - CostAttoFIL *big.Int +// PDPProofSetManager pushes pieces to an SP's Curio via /pdp/piece/pull, +// then triggers the SP's on-chain commit via /pdp/data-sets/create-and-add +// (new sets) or /pdp/data-sets/{id}/pieces (existing). It encapsulates SP +// service-URL discovery, clientDataSetId persistence, EIP-712 signing, +// and post-completion bookkeeping. +type PDPProofSetManager interface { + // PullPiecesToFWSS pushes a batch of pieces to the SP via the FWSS-pull + // flow. If no assembling proof set has room, a new FWSS-listened set is + // created atomically with the first batch; otherwise pieces are added + // to the existing assembling set. Blocks until Curio reports the SP + // transfer is complete and the on-chain tx confirms (giving us the + // dataSetId), or the configured timeout elapses. The returned + // DataSetID is the set the pieces landed in. + // + // evmSigner is the client's secp256k1 wallet; its EVMAddress is the + // on-chain payer, and its raw key (via signer.EVMSigner.ECDSAKey) + // is used for the EIP-712 extraData signing. + PullPiecesToFWSS( + ctx context.Context, + evmSigner signer.EVMSigner, + provider string, + pieces []PDPPieceInput, + cfg PDPSchedulingConfig, + ) (PDPPullResult, error) } diff --git a/service/dealpusher/pdp_api_test.go b/service/dealpusher/pdp_api_test.go index 515c84d48..2af81339c 100644 --- a/service/dealpusher/pdp_api_test.go +++ b/service/dealpusher/pdp_api_test.go @@ -12,25 +12,33 @@ func TestPDPSchedulingConfigValidate(t *testing.T) { cfg := PDPSchedulingConfig{ BatchSize: 100, MaxPiecesPerProofSet: 1024, - ConfirmationDepth: 5, - PollingInterval: 30 * time.Second, + PullTimeout: 5 * time.Minute, } require.NoError(t, cfg.Validate()) }) - t.Run("invalid", func(t *testing.T) { + t.Run("invalid (zero values)", func(t *testing.T) { cfg := PDPSchedulingConfig{} require.Error(t, cfg.Validate()) }) t.Run("invalid max pieces per proof set", func(t *testing.T) { cfg := PDPSchedulingConfig{ - BatchSize: 100, - ConfirmationDepth: 5, - PollingInterval: 30 * time.Second, + BatchSize: 100, + PullTimeout: 5 * time.Minute, } err := cfg.Validate() require.Error(t, err) require.Contains(t, err.Error(), "pdp max pieces per proof set must be greater than 0") }) + + t.Run("invalid pull timeout", func(t *testing.T) { + cfg := PDPSchedulingConfig{ + BatchSize: 100, + MaxPiecesPerProofSet: 1024, + } + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "pdp pull timeout must be greater than 0") + }) } diff --git a/service/dealpusher/pdp_onchain.go b/service/dealpusher/pdp_onchain.go deleted file mode 100644 index cafd48df8..000000000 --- a/service/dealpusher/pdp_onchain.go +++ /dev/null @@ -1,331 +0,0 @@ -package dealpusher - -import ( - "context" - "fmt" - "math/big" - "time" - - "github.com/cockroachdb/errors" - synapse "github.com/data-preservation-programs/go-synapse" - "github.com/data-preservation-programs/go-synapse/constants" - "github.com/data-preservation-programs/go-synapse/contracts" - "github.com/data-preservation-programs/go-synapse/pdp" - "github.com/data-preservation-programs/go-synapse/signer" - commcid "github.com/filecoin-project/go-fil-commcid" - - "github.com/data-preservation-programs/singularity/model" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/filecoin-project/go-address" - "github.com/ipfs/go-cid" - "gorm.io/gorm" -) - -// defaultGasLimit is a fixed gas limit for FEVM transactions. -// FEVM gas estimation (NoSend=true) is unreliable, so we use a fixed value. -// EVM traces show ~200K gas but FEVM execution needs ~17M due to actor overhead. -const defaultGasLimit = 30_000_000 - -type confirmationClient interface { - TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - BlockNumber(ctx context.Context) (uint64, error) -} - -// OnChainPDP implements PDP scheduling interfaces using FEVM RPC + go-synapse contracts. -type OnChainPDP struct { - dbNoContext *gorm.DB - ethClient *ethclient.Client - confirmClient confirmationClient - network constants.Network - chainID *big.Int - contractAddr common.Address - contract *contracts.PDPVerifier -} - -func NewOnChainPDP(ctx context.Context, db *gorm.DB, rpcURL string) (*OnChainPDP, error) { - if rpcURL == "" { - return nil, errors.New("eth rpc URL is required") - } - - ethClient, err := ethclient.DialContext(ctx, rpcURL) - if err != nil { - return nil, errors.Wrap(err, "failed to connect to FEVM RPC") - } - - network, chainIDInt64, err := synapse.DetectNetwork(ctx, ethClient) - if err != nil { - ethClient.Close() - return nil, errors.Wrap(err, "failed to detect FEVM network") - } - - contractAddr := constants.GetPDPVerifierAddress(network) - if contractAddr == (common.Address{}) { - ethClient.Close() - return nil, fmt.Errorf("no PDPVerifier contract for network %s", network) - } - - contract, err := contracts.NewPDPVerifier(contractAddr, ethClient) - if err != nil { - ethClient.Close() - return nil, errors.Wrap(err, "failed to bind PDPVerifier contract") - } - - Logger.Infow("initialized PDP on-chain adapter", - "network", network, - "chainId", chainIDInt64, - "contract", contractAddr.Hex(), - ) - - return &OnChainPDP{ - dbNoContext: db, - ethClient: ethClient, - confirmClient: ethClient, - network: network, - chainID: big.NewInt(chainIDInt64), - contractAddr: contractAddr, - contract: contract, - }, nil -} - -func (o *OnChainPDP) Close() error { - if o.ethClient != nil { - o.ethClient.Close() - } - return nil -} - -func (o *OnChainPDP) newManager(ctx context.Context, evmSigner signer.EVMSigner) (*pdp.Manager, error) { - cfg := pdp.DefaultManagerConfig() - cfg.ContractAddress = o.contractAddr - cfg.DefaultGasLimit = defaultGasLimit - mgr, err := pdp.NewManagerWithConfig(ctx, o.ethClient, evmSigner, o.network, &cfg) - if err != nil { - return nil, errors.Wrap(err, "failed to initialize PDP proof set manager") - } - return mgr, nil -} - -// findProofSetWithRoom finds an assembling proof set with room for more pieces. -// uses the durable PieceCount field rather than inferring occupancy from deal states. -func (o *OnChainPDP) findProofSetWithRoom(ctx context.Context, clientAddress string, provider string, maxPieces int) (uint64, bool, error) { - if maxPieces <= 0 { - return 0, false, errors.New("max pieces per proof set must be greater than 0") - } - - var ps model.PDPProofSet - err := o.dbNoContext.WithContext(ctx). - Where("client_address = ? AND provider = ? AND deleted = FALSE AND handoff_state = ?", - clientAddress, provider, model.ProofSetAssembling). - Where("piece_count < ?", maxPieces). - Order("set_id"). - First(&ps).Error - if err == nil { - return ps.SetID, true, nil - } - if errors.Is(err, gorm.ErrRecordNotFound) { - return 0, false, nil - } - return 0, false, errors.Wrap(err, "failed to query assembling PDP proof sets") -} - -func (o *OnChainPDP) EnsureProofSet(ctx context.Context, evmSigner signer.EVMSigner, provider string, cfg PDPSchedulingConfig) (uint64, error) { - clientAddr, err := commonToDelegatedAddress(evmSigner.EVMAddress()) - if err != nil { - return 0, errors.Wrap(err, "failed to derive delegated client address from signer") - } - existingSetID, found, err := o.findProofSetWithRoom(ctx, clientAddr.String(), provider, cfg.MaxPiecesPerProofSet) - if err != nil { - return 0, err - } - if found { - return existingSetID, nil - } - - manager, err := o.newManager(ctx, evmSigner) - if err != nil { - return 0, err - } - - result, err := manager.CreateProofSet(ctx, pdp.CreateProofSetOptions{ - Value: pdp.SybilFee, - }) - if err != nil { - return 0, errors.Wrap(err, "failed to create PDP proof set") - } - - setID := result.ProofSetID.Uint64() - row := model.PDPProofSet{ - SetID: setID, - ClientAddress: clientAddr.String(), - Provider: provider, - HandoffState: model.ProofSetAssembling, - } - if result.Receipt != nil && result.Receipt.BlockNumber != nil { - row.CreatedBlock = int64(result.Receipt.BlockNumber.Uint64()) - } - - if err := o.dbNoContext.WithContext(ctx).Where("set_id = ?", setID).Assign(row).FirstOrCreate(&model.PDPProofSet{}).Error; err != nil { - return 0, errors.Wrap(err, "failed to persist created PDP proof set") - } - return setID, nil -} - -func (o *OnChainPDP) ProposeTransfer(ctx context.Context, evmSigner signer.EVMSigner, proofSetID uint64, spEVMAddress common.Address) error { - txOpts, err := evmSigner.Transactor(o.chainID) - if err != nil { - return errors.Wrap(err, "failed to create transactor for propose transfer") - } - txOpts.GasLimit = defaultGasLimit - - tx, err := o.contract.ProposeDataSetStorageProvider(txOpts, new(big.Int).SetUint64(proofSetID), spEVMAddress) - if err != nil { - return errors.Wrap(err, "failed to submit propose-transfer transaction") - } - - Logger.Infow("proposed proof set transfer", - "proofSetID", proofSetID, - "spEVMAddress", spEVMAddress.Hex(), - "txHash", tx.Hash().Hex(), - ) - - // update local state -- the SP claim is tracked via StorageProviderChanged event - return o.dbNoContext.WithContext(ctx). - Model(&model.PDPProofSet{}). - Where("set_id = ?", proofSetID). - Updates(map[string]any{ - "handoff_state": model.ProofSetProposed, - "proposed_provider_evm": spEVMAddress.Hex(), - }).Error -} - -func (o *OnChainPDP) IncrementPieceCount(ctx context.Context, proofSetID uint64, count int) error { - return o.dbNoContext.WithContext(ctx). - Model(&model.PDPProofSet{}). - Where("set_id = ?", proofSetID). - Update("piece_count", gorm.Expr("piece_count + ?", count)).Error -} - -func (o *OnChainPDP) QueueAddRoots( - ctx context.Context, - evmSigner signer.EVMSigner, - proofSetID uint64, - pieceCIDs []cid.Cid, - pieceSizes []int64, - cfg PDPSchedulingConfig, -) (*PDPQueuedTx, error) { - if len(pieceCIDs) == 0 { - return nil, errors.New("no piece CIDs provided") - } - if len(pieceCIDs) != len(pieceSizes) { - return nil, fmt.Errorf("pieceCIDs length %d != pieceSizes length %d", len(pieceCIDs), len(pieceSizes)) - } - if cfg.BatchSize > 0 && len(pieceCIDs) > cfg.BatchSize { - return nil, fmt.Errorf("piece CID count %d exceeds configured PDP batch size %d", len(pieceCIDs), cfg.BatchSize) - } - - // convert commP v1 CIDs to CommPv2 format expected by PDPVerifier contract - roots := make([]pdp.Root, len(pieceCIDs)) - for i, pieceCID := range pieceCIDs { - payloadSize := uint64(pieceSizes[i]) * 127 / 128 - v2CID, err := commcid.PieceCidV2FromV1(pieceCID, payloadSize) - if err != nil { - return nil, errors.Wrapf(err, "failed to convert piece CID %s to CommPv2", pieceCID) - } - roots[i] = pdp.Root{PieceCID: v2CID} - } - - manager, err := o.newManager(ctx, evmSigner) - if err != nil { - return nil, err - } - - setIDBig := new(big.Int).SetUint64(proofSetID) - result, err := manager.AddRoots(ctx, setIDBig, roots) - if err != nil { - return nil, errors.Wrap(err, "failed to submit PDP add-roots transaction") - } - - return &PDPQueuedTx{Hash: result.TransactionHash.Hex()}, nil -} - -func (o *OnChainPDP) WaitForConfirmations( - ctx context.Context, - txHash string, - depth uint64, - pollInterval time.Duration, -) (*PDPTransactionReceipt, error) { - rawHash, err := hexutil.Decode(txHash) - if err != nil || len(rawHash) != common.HashLength { - return nil, fmt.Errorf("invalid tx hash %q", txHash) - } - if pollInterval <= 0 { - pollInterval = 2 * time.Second - } - - hash := common.HexToHash(txHash) - ticker := time.NewTicker(pollInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil, errors.Wrap(ctx.Err(), "context canceled while waiting for PDP transaction confirmation") - case <-ticker.C: - receipt, err := o.confirmClient.TransactionReceipt(ctx, hash) - if err != nil { - if errors.Is(err, ethereum.NotFound) { - continue - } - return nil, errors.Wrap(err, "failed to fetch transaction receipt") - } - - out := toPDPReceipt(txHash, receipt) - if receipt.Status != types.ReceiptStatusSuccessful { - return out, fmt.Errorf("transaction %s failed with status %d", txHash, receipt.Status) - } - if depth == 0 { - return out, nil - } - - currentBlock, err := o.confirmClient.BlockNumber(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to fetch latest block number") - } - if receipt.BlockNumber != nil && receipt.BlockNumber.Uint64()+depth <= currentBlock { - return out, nil - } - } - } -} - -func toPDPReceipt(txHash string, receipt *types.Receipt) *PDPTransactionReceipt { - out := &PDPTransactionReceipt{ - Hash: txHash, - } - if receipt == nil { - return out - } - out.Status = receipt.Status - out.GasUsed = receipt.GasUsed - if receipt.BlockNumber != nil { - out.BlockNumber = receipt.BlockNumber.Uint64() - } - if receipt.EffectiveGasPrice != nil { - out.CostAttoFIL = new(big.Int).Mul(new(big.Int).SetUint64(receipt.GasUsed), receipt.EffectiveGasPrice) - } else { - out.CostAttoFIL = big.NewInt(0) - } - return out -} - -func commonToDelegatedAddress(subaddr common.Address) (address.Address, error) { - addr, err := address.NewDelegatedAddress(10, subaddr.Bytes()) - if err != nil { - return address.Undef, errors.Wrap(err, "failed to encode delegated address") - } - return addr, nil -} diff --git a/service/dealpusher/pdp_onchain_test.go b/service/dealpusher/pdp_onchain_test.go deleted file mode 100644 index 0f56145ab..000000000 --- a/service/dealpusher/pdp_onchain_test.go +++ /dev/null @@ -1,186 +0,0 @@ -package dealpusher - -import ( - "context" - "math/big" - "testing" - "time" - - "github.com/data-preservation-programs/singularity/model" - "github.com/data-preservation-programs/singularity/util/testutil" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/require" - "gorm.io/gorm" -) - -type fakeConfirmationClient struct { - txReceiptFn func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - blockNumFn func(ctx context.Context) (uint64, error) -} - -func (f *fakeConfirmationClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - return f.txReceiptFn(ctx, txHash) -} - -func (f *fakeConfirmationClient) BlockNumber(ctx context.Context) (uint64, error) { - return f.blockNumFn(ctx) -} - -func TestOnChainPDP_WaitForConfirmations(t *testing.T) { - var receiptCalls int - var blockCalls int - - c := &fakeConfirmationClient{ - txReceiptFn: func(_ context.Context, _ common.Hash) (*types.Receipt, error) { - receiptCalls++ - if receiptCalls == 1 { - return nil, ethereum.NotFound - } - return &types.Receipt{ - Status: types.ReceiptStatusSuccessful, - GasUsed: 10, - EffectiveGasPrice: big.NewInt(3), - BlockNumber: big.NewInt(100), - }, nil - }, - blockNumFn: func(_ context.Context) (uint64, error) { - blockCalls++ - if blockCalls == 1 { - return 101, nil - } - return 102, nil - }, - } - - adapter := &OnChainPDP{confirmClient: c} - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - receipt, err := adapter.WaitForConfirmations(ctx, "0x1111111111111111111111111111111111111111111111111111111111111111", 2, time.Millisecond) - require.NoError(t, err) - require.NotNil(t, receipt) - require.Equal(t, uint64(100), receipt.BlockNumber) - require.Equal(t, uint64(10), receipt.GasUsed) - require.Equal(t, uint64(types.ReceiptStatusSuccessful), receipt.Status) - require.Equal(t, "30", receipt.CostAttoFIL.String()) -} - -func TestOnChainPDP_WaitForConfirmationsFailedTx(t *testing.T) { - c := &fakeConfirmationClient{ - txReceiptFn: func(_ context.Context, _ common.Hash) (*types.Receipt, error) { - return &types.Receipt{ - Status: types.ReceiptStatusFailed, - GasUsed: 5, - EffectiveGasPrice: big.NewInt(2), - BlockNumber: big.NewInt(10), - }, nil - }, - blockNumFn: func(_ context.Context) (uint64, error) { - return 10, nil - }, - } - - adapter := &OnChainPDP{confirmClient: c} - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - - receipt, err := adapter.WaitForConfirmations(ctx, "0x2222222222222222222222222222222222222222222222222222222222222222", 1, time.Millisecond) - require.Error(t, err) - require.NotNil(t, receipt) - require.Equal(t, uint64(types.ReceiptStatusFailed), receipt.Status) -} - -func TestOnChainPDP_WaitForConfirmationsInvalidHash(t *testing.T) { - adapter := &OnChainPDP{} - _, err := adapter.WaitForConfirmations(context.Background(), "not-a-hash", 1, time.Millisecond) - require.Error(t, err) -} - -func TestOnChainPDP_FindProofSetWithRoom_NoProofSets(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - adapter := &OnChainPDP{dbNoContext: db} - setID, found, err := adapter.findProofSetWithRoom(ctx, "f410foo", "f410provider", 2) - require.NoError(t, err) - require.False(t, found) - require.Zero(t, setID) - }) -} - -func TestOnChainPDP_FindProofSetWithRoom_PicksAssemblingWithCapacity(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - client := "f410foo" - provider := "f410provider" - // set 1: assembling but full - require.NoError(t, db.Create(&model.PDPProofSet{ - SetID: 1, - ClientAddress: client, - Provider: provider, - HandoffState: model.ProofSetAssembling, - PieceCount: 5, - }).Error) - // set 2: assembling with room - require.NoError(t, db.Create(&model.PDPProofSet{ - SetID: 2, - ClientAddress: client, - Provider: provider, - HandoffState: model.ProofSetAssembling, - PieceCount: 3, - }).Error) - - adapter := &OnChainPDP{dbNoContext: db} - setID, found, err := adapter.findProofSetWithRoom(ctx, client, provider, 5) - require.NoError(t, err) - require.True(t, found) - require.EqualValues(t, 2, setID) - }) -} - -func TestOnChainPDP_FindProofSetWithRoom_SkipsProposedAndTransferred(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - client := "f410foo" - provider := "f410provider" - // set 1: proposed -- should be skipped even though it has room - require.NoError(t, db.Create(&model.PDPProofSet{ - SetID: 1, - ClientAddress: client, - Provider: provider, - HandoffState: model.ProofSetProposed, - PieceCount: 1, - }).Error) - // set 2: transferred -- should be skipped - require.NoError(t, db.Create(&model.PDPProofSet{ - SetID: 2, - ClientAddress: client, - Provider: provider, - HandoffState: model.ProofSetTransferred, - PieceCount: 1, - }).Error) - - adapter := &OnChainPDP{dbNoContext: db} - setID, found, err := adapter.findProofSetWithRoom(ctx, client, provider, 10) - require.NoError(t, err) - require.False(t, found) - require.Zero(t, setID) - }) -} - -func TestOnChainPDP_IncrementPieceCount(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - require.NoError(t, db.Create(&model.PDPProofSet{ - SetID: 1, - ClientAddress: "f410foo", - Provider: "f410provider", - HandoffState: model.ProofSetAssembling, - PieceCount: 5, - }).Error) - - adapter := &OnChainPDP{dbNoContext: db} - require.NoError(t, adapter.IncrementPieceCount(ctx, 1, 3)) - - var ps model.PDPProofSet - require.NoError(t, db.First(&ps, "set_id = ?", 1).Error) - require.Equal(t, 8, ps.PieceCount) - }) -} diff --git a/service/dealpusher/pdp_pull.go b/service/dealpusher/pdp_pull.go new file mode 100644 index 000000000..474f08eaa --- /dev/null +++ b/service/dealpusher/pdp_pull.go @@ -0,0 +1,451 @@ +package dealpusher + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "math/big" + "strings" + "sync" + "time" + + cockroach "github.com/cockroachdb/errors" + synapse "github.com/data-preservation-programs/go-synapse" + "github.com/data-preservation-programs/go-synapse/constants" + "github.com/data-preservation-programs/go-synapse/pdp" + "github.com/data-preservation-programs/go-synapse/signer" + "github.com/data-preservation-programs/go-synapse/spregistry" + "github.com/data-preservation-programs/singularity/model" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/filecoin-project/go-address" + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/ipfs/go-cid" + "gorm.io/gorm" +) + +// OnChainPDPConfig configures the FWSS-pull adapter. +type OnChainPDPConfig struct { + // DB is required. + DB *gorm.DB + // RPCURL is the FEVM JSON-RPC endpoint (read-only; we never submit txs). + RPCURL string + // SourceURLBase is the HTTPS base singularity serves pieces from. The + // per-piece URL is constructed as /piece/. Required. + SourceURLBase string + // RecordKeeper is the FWSS contract address (hex). Empty defaults to + // the network FWSS from go-synapse constants. + RecordKeeper string +} + +// OnChainPDP drives the FWSS-mediated pull flow. We never submit any +// PDPVerifier tx ourselves: the SP downloads pieces from our content +// provider via /pdp/piece/pull, then commits on-chain from its own wallet +// via /pdp/data-sets/create-and-add (new sets) or /pdp/data-sets/{id}/pieces +// (existing). The only EVM RPC use is the ServiceProviderRegistry view +// call that resolves an SP's PDP service URL. +type OnChainPDP struct { + db *gorm.DB + ethClient *ethclient.Client + network constants.Network + chainID *big.Int + recordKeeper common.Address + sourceURLBase string + spRegistry *spregistry.Service + + spURLCacheMu sync.Mutex + spURLCache map[string]spInfo // delegated f4 provider string -> info +} + +type spInfo struct { + serviceURL string + payee common.Address +} + +func NewOnChainPDP(ctx context.Context, cfg OnChainPDPConfig) (*OnChainPDP, error) { + if cfg.DB == nil { + return nil, errors.New("db is required") + } + if cfg.RPCURL == "" { + return nil, errors.New("eth rpc URL is required") + } + if cfg.SourceURLBase == "" { + return nil, errors.New("source URL base is required (--pdp-source-url-base / PDP_SOURCE_URL_BASE)") + } + + ethClient, err := ethclient.DialContext(ctx, cfg.RPCURL) + if err != nil { + return nil, cockroach.Wrap(err, "failed to connect to FEVM RPC") + } + + network, chainIDInt64, err := synapse.DetectNetwork(ctx, ethClient) + if err != nil { + ethClient.Close() + return nil, cockroach.Wrap(err, "failed to detect FEVM network") + } + chainID := big.NewInt(chainIDInt64) + + recordKeeper := common.HexToAddress(cfg.RecordKeeper) + if recordKeeper == (common.Address{}) { + recordKeeper = constants.WarmStorageAddresses[network] + if recordKeeper == (common.Address{}) { + ethClient.Close() + return nil, fmt.Errorf("no FWSS recordKeeper address for network %s; set --pdp-record-keeper", network) + } + } + + spRegAddr := constants.SPRegistryAddresses[network] + if spRegAddr == (common.Address{}) { + ethClient.Close() + return nil, fmt.Errorf("no ServiceProviderRegistry address for network %s", network) + } + spReg, err := spregistry.NewService(ethClient, spRegAddr, nil, chainID) + if err != nil { + ethClient.Close() + return nil, cockroach.Wrap(err, "failed to bind ServiceProviderRegistry") + } + + Logger.Infow("initialized FWSS-pull adapter", + "network", network, + "chainId", chainIDInt64, + "recordKeeper", recordKeeper.Hex(), + "spRegistry", spRegAddr.Hex(), + "sourceURLBase", cfg.SourceURLBase, + ) + + return &OnChainPDP{ + db: cfg.DB, + ethClient: ethClient, + network: network, + chainID: chainID, + recordKeeper: recordKeeper, + sourceURLBase: strings.TrimSuffix(cfg.SourceURLBase, "/"), + spRegistry: spReg, + spURLCache: map[string]spInfo{}, + }, nil +} + +func (o *OnChainPDP) Close() error { + if o.ethClient != nil { + o.ethClient.Close() + } + return nil +} + +// PullPiecesToFWSS implements PDPProofSetManager. +func (o *OnChainPDP) PullPiecesToFWSS( + ctx context.Context, + evmSigner signer.EVMSigner, + provider string, + pieces []PDPPieceInput, + cfg PDPSchedulingConfig, +) (PDPPullResult, error) { + if evmSigner == nil { + return PDPPullResult{}, errors.New("evm signer is required") + } + if len(pieces) == 0 { + return PDPPullResult{}, errors.New("no pieces provided") + } + if cfg.BatchSize > 0 && len(pieces) > cfg.BatchSize { + return PDPPullResult{}, fmt.Errorf("piece count %d exceeds configured batch size %d", len(pieces), cfg.BatchSize) + } + + payerAddr := evmSigner.EVMAddress() + clientDelegated, err := commonToDelegatedAddress(payerAddr) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "derive delegated payer address") + } + clientAddrStr := clientDelegated.String() + + info, err := o.lookupSPInfo(ctx, provider) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "SP service-URL lookup") + } + + // Convert v1 piece CIDs to CommPv2 (what FWSS / Curio expect). + pieceCIDsV2 := make([]cid.Cid, len(pieces)) + pullInputs := make([]pdp.PullPieceInput, len(pieces)) + for i, p := range pieces { + // FR32 padding: padded = raw * 128/127, raw = padded * 127/128. + payloadSize := uint64(p.PieceSize) * 127 / 128 + v2, err := commcid.PieceCidV2FromV1(p.PieceCID, payloadSize) + if err != nil { + return PDPPullResult{}, cockroach.Wrapf(err, "convert piece %s to CommPv2", p.PieceCID) + } + pieceCIDsV2[i] = v2 + pullInputs[i] = pdp.PullPieceInput{ + PieceCID: v2.String(), + SourceURL: o.sourceURLBase + "/piece/" + v2.String(), + } + } + + existing, err := o.findProofSetWithRoom(ctx, clientAddrStr, provider, cfg.MaxPiecesPerProofSet) + if err != nil { + return PDPPullResult{}, err + } + + authHelper := pdp.NewAuthHelper(evmSigner.SignDigest, payerAddr, o.recordKeeper, o.chainID) + pdpServer := pdp.NewServer(info.serviceURL) + + if existing != nil { + return o.addToExisting(ctx, pdpServer, authHelper, existing, pullInputs, pieceCIDsV2, cfg) + } + + return o.createNewSet(ctx, pdpServer, authHelper, payerAddr, info.payee, provider, clientAddrStr, info.serviceURL, pullInputs, pieceCIDsV2, cfg) +} + +// addToExisting adds pieces to an assembling proof set we've already +// created. extraData is the AddPieces blob alone. +func (o *OnChainPDP) addToExisting( + ctx context.Context, + pdpServer *pdp.Server, + authHelper *pdp.AuthHelper, + existing *model.PDPProofSet, + pullInputs []pdp.PullPieceInput, + pieceCIDsV2 []cid.Cid, + cfg PDPSchedulingConfig, +) (PDPPullResult, error) { + clientDataSetID, ok := new(big.Int).SetString(existing.ClientDataSetID, 10) + if !ok || clientDataSetID == nil { + return PDPPullResult{}, fmt.Errorf("proof set %d has invalid clientDataSetID %q", existing.SetID, existing.ClientDataSetID) + } + + addExtra, err := signAddPiecesExtra(authHelper, clientDataSetID, pieceCIDsV2) + if err != nil { + return PDPPullResult{}, err + } + + if err := waitForPullComplete(ctx, pdpServer, pdp.PullPiecesOptions{ + RecordKeeper: o.recordKeeper.Hex(), + Pieces: pullInputs, + ExtraData: addExtra, + DataSetID: existing.SetID, + }, cfg.PullTimeout); err != nil { + return PDPPullResult{}, err + } + + addResp, err := pdpServer.AddPieces(ctx, int(existing.SetID), pieceCIDsV2, addExtra) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "POST /pdp/data-sets/{id}/pieces") + } + status, err := pdpServer.WaitForPieceAddition(ctx, int(existing.SetID), addResp.TxHash, cfg.PullTimeout) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "wait for add-pieces tx") + } + if status.AddMessageOK == nil || !*status.AddMessageOK { + return PDPPullResult{}, fmt.Errorf("add-pieces tx %s did not confirm successfully", addResp.TxHash) + } + + if err := o.db.WithContext(ctx). + Model(&model.PDPProofSet{}). + Where("set_id = ?", existing.SetID). + UpdateColumn("piece_count", gorm.Expr("piece_count + ?", len(pieceCIDsV2))).Error; err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "increment piece_count") + } + + return PDPPullResult{DataSetID: existing.SetID}, nil +} + +// createNewSet atomically creates a FWSS-listened proof set with the first +// batch of pieces. extraData is the abi.encode(bytes,bytes) of the +// CreateDataSet and AddPieces extras. +func (o *OnChainPDP) createNewSet( + ctx context.Context, + pdpServer *pdp.Server, + authHelper *pdp.AuthHelper, + payerAddr common.Address, + payee common.Address, + provider string, + clientAddrStr string, + serviceURL string, + pullInputs []pdp.PullPieceInput, + pieceCIDsV2 []cid.Cid, + cfg PDPSchedulingConfig, +) (PDPPullResult, error) { + clientDataSetID := randomClientDataSetID() + + createExtra, err := signCreateDataSetExtra(authHelper, payerAddr, payee, clientDataSetID) + if err != nil { + return PDPPullResult{}, err + } + addExtra, err := signAddPiecesExtra(authHelper, clientDataSetID, pieceCIDsV2) + if err != nil { + return PDPPullResult{}, err + } + combined, err := pdp.EncodeCreateDataSetAndAddPiecesExtraData(createExtra, addExtra) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "wrap create+add extra data") + } + + if err := waitForPullComplete(ctx, pdpServer, pdp.PullPiecesOptions{ + RecordKeeper: o.recordKeeper.Hex(), + Pieces: pullInputs, + ExtraData: combined, + DataSetID: 0, + }, cfg.PullTimeout); err != nil { + return PDPPullResult{}, err + } + + createResp, err := pdpServer.CreateDataSetAndAddPieces(ctx, o.recordKeeper.Hex(), pieceCIDsV2, combined) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "POST /pdp/data-sets/create-and-add") + } + + status, err := pdpServer.WaitForDataSetCreation(ctx, createResp.TxHash, cfg.PullTimeout) + if err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "wait for create-and-add tx") + } + if status.DataSetID == nil { + return PDPPullResult{}, fmt.Errorf("create-and-add tx %s confirmed but no dataSetId returned", createResp.TxHash) + } + dataSetID := uint64(*status.DataSetID) + + row := model.PDPProofSet{ + SetID: dataSetID, + ClientAddress: clientAddrStr, + Provider: provider, + HandoffState: model.ProofSetAssembling, + ClientDataSetID: clientDataSetID.String(), + ServiceURL: serviceURL, + PieceCount: len(pieceCIDsV2), + } + // Upsert so we win the race against pdptracker materializing the same + // SetID from the DataSetCreated event with listener-as-client_address. + if err := o.db.WithContext(ctx). + Where("set_id = ?", dataSetID). + Assign(row). + FirstOrCreate(&model.PDPProofSet{}).Error; err != nil { + return PDPPullResult{}, cockroach.Wrap(err, "persist new proof set") + } + + return PDPPullResult{DataSetID: dataSetID}, nil +} + +// waitForPullComplete blocks until the SP-side piece transfer reports +// complete, fails permanently, or times out. +func waitForPullComplete(ctx context.Context, pdpServer *pdp.Server, opts pdp.PullPiecesOptions, timeout time.Duration) error { + resp, err := pdpServer.WaitForPullPieces(ctx, opts, timeout) + if err != nil { + return cockroach.Wrap(err, "wait for /pdp/piece/pull") + } + switch resp.Status { + case pdp.PullStatusComplete: + return nil + case pdp.PullStatusFailed: + return fmt.Errorf("/pdp/piece/pull reported failed for %d pieces", len(opts.Pieces)) + default: + return fmt.Errorf("/pdp/piece/pull timed out at status %q", resp.Status) + } +} + +func signCreateDataSetExtra(authHelper *pdp.AuthHelper, payer, payee common.Address, clientDataSetID *big.Int) (string, error) { + sig, err := authHelper.SignCreateDataSet(clientDataSetID, payee, nil) + if err != nil { + return "", cockroach.Wrap(err, "sign CreateDataSet") + } + return pdp.EncodeDataSetCreateData(payer, clientDataSetID, nil, sig.Signature) +} + +func signAddPiecesExtra(authHelper *pdp.AuthHelper, clientDataSetID *big.Int, pieceCIDsV2 []cid.Cid) (string, error) { + // FWSS uses (payer, clientDataSetId) as the cross-tx replay key, not + // the addPieces in-extraData nonce; zero is fine here. + nonce := big.NewInt(0) + sig, err := authHelper.SignAddPieces(clientDataSetID, nonce, pieceCIDsV2, nil) + if err != nil { + return "", cockroach.Wrap(err, "sign AddPieces") + } + return pdp.EncodeAddPiecesExtraData(nonce, nil, sig.Signature) +} + +func (o *OnChainPDP) findProofSetWithRoom(ctx context.Context, clientAddress, provider string, maxPieces int) (*model.PDPProofSet, error) { + if maxPieces <= 0 { + return nil, errors.New("max pieces per proof set must be > 0") + } + var ps model.PDPProofSet + err := o.db.WithContext(ctx). + Where("client_address = ? AND provider = ? AND deleted = FALSE AND handoff_state = ?", + clientAddress, provider, model.ProofSetAssembling). + Where("piece_count < ?", maxPieces). + Order("set_id"). + First(&ps).Error + if err == nil { + return &ps, nil + } + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, cockroach.Wrap(err, "query assembling proof sets") +} + +func (o *OnChainPDP) lookupSPInfo(ctx context.Context, provider string) (spInfo, error) { + o.spURLCacheMu.Lock() + if cached, ok := o.spURLCache[provider]; ok { + o.spURLCacheMu.Unlock() + return cached, nil + } + o.spURLCacheMu.Unlock() + + evmAddr, err := delegatedToCommonAddress(provider) + if err != nil { + return spInfo{}, cockroach.Wrapf(err, "decode provider address %s", provider) + } + + pi, err := o.spRegistry.GetProviderByAddress(ctx, evmAddr) + if err != nil { + return spInfo{}, cockroach.Wrapf(err, "ServiceProviderRegistry.getProviderByAddress(%s)", evmAddr.Hex()) + } + if pi == nil { + return spInfo{}, fmt.Errorf("provider %s not registered in ServiceProviderRegistry", provider) + } + pdpProduct, ok := pi.Products["PDP"] + if !ok || pdpProduct == nil || pdpProduct.Data == nil { + return spInfo{}, fmt.Errorf("provider %s has no PDP product registered", provider) + } + if !pdpProduct.IsActive { + return spInfo{}, fmt.Errorf("provider %s PDP product is inactive", provider) + } + if pdpProduct.Data.ServiceURL == "" { + return spInfo{}, fmt.Errorf("provider %s has no PDP serviceURL registered", provider) + } + + info := spInfo{ + serviceURL: pdpProduct.Data.ServiceURL, + payee: pi.Payee, + } + o.spURLCacheMu.Lock() + o.spURLCache[provider] = info + o.spURLCacheMu.Unlock() + return info, nil +} + +func delegatedToCommonAddress(s string) (common.Address, error) { + addr, err := address.NewFromString(s) + if err != nil { + return common.Address{}, err + } + if addr.Protocol() != address.Delegated { + return common.Address{}, fmt.Errorf("address %s is not delegated (f410)", s) + } + payload := addr.Payload() + if len(payload) < 21 { + return common.Address{}, fmt.Errorf("delegated payload too short: %d", len(payload)) + } + return common.BytesToAddress(payload[1:21]), nil +} + +func commonToDelegatedAddress(subaddr common.Address) (address.Address, error) { + addr, err := address.NewDelegatedAddress(10, subaddr.Bytes()) + if err != nil { + return address.Undef, cockroach.Wrap(err, "encode delegated address") + } + return addr, nil +} + +func randomClientDataSetID() *big.Int { + b := make([]byte, 32) + if _, err := rand.Read(b); err != nil { + panic(fmt.Sprintf("crypto/rand failure: %v", err)) + } + return new(big.Int).SetBytes(b) +} diff --git a/service/dealpusher/pdp_schedule.go b/service/dealpusher/pdp_schedule.go index d58af3a6e..62c92443d 100644 --- a/service/dealpusher/pdp_schedule.go +++ b/service/dealpusher/pdp_schedule.go @@ -11,8 +11,6 @@ import ( "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/util" "github.com/data-preservation-programs/singularity/util/keystore" - "github.com/ethereum/go-ethereum/common" - "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" "github.com/rjNemo/underscore" "gorm.io/gorm" @@ -24,8 +22,7 @@ func defaultPDPSchedulingConfig() PDPSchedulingConfig { return PDPSchedulingConfig{ BatchSize: 128, MaxPiecesPerProofSet: 1024, - ConfirmationDepth: 5, - PollingInterval: 30 * time.Second, + PullTimeout: 5 * time.Minute, } } @@ -65,37 +62,8 @@ func (d *DealPusher) validatePDPPreparationPieceSizes(ctx context.Context, sched return errors.Wrap(err, "failed to validate preparation piece sizes for PDP") } -// resolveProviderEVMAddress looks up the provider's Actor record and derives -// the EVM address from its delegated (f410) filecoin address. -func (d *DealPusher) resolveProviderEVMAddress(ctx context.Context, provider string) (common.Address, error) { - db := d.dbNoContext.WithContext(ctx) - - var actor model.Actor - err := db.Where("address = ? OR id = ?", provider, provider).First(&actor).Error - if err != nil { - return common.Address{}, errors.Wrapf(err, "failed to resolve actor for provider %s", provider) - } - - addr, err := address.NewFromString(actor.Address) - if err != nil { - return common.Address{}, errors.Wrapf(err, "failed to parse actor address %s", actor.Address) - } - if addr.Protocol() != address.Delegated { - return common.Address{}, fmt.Errorf("provider actor address %s is not a delegated (f410) address", actor.Address) - } - - payload := addr.Payload() - // delegated address payload: first varint byte(s) for namespace, then 20 bytes for EVM address - // for f410 (namespace 10), payload[0] is the namespace varint, rest is the subaddress - if len(payload) < 21 { - return common.Address{}, fmt.Errorf("provider delegated address payload too short: %d bytes", len(payload)) - } - // skip namespace varint (1 byte for namespace 10) - return common.BytesToAddress(payload[1:21]), nil -} - func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) { - if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil { + if d.pdpProofSetManager == nil { return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured") } cfg := d.pdpSchedulingConfig @@ -106,12 +74,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul return model.ScheduleError, err } - // resolve SP EVM address upfront -- needed for handoff after filling - spEVMAddr, err := d.resolveProviderEVMAddress(ctx, schedule.Provider) - if err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to resolve provider EVM address for PDP handoff") - } - db := d.dbNoContext.WithContext(ctx) var attachments []model.SourceAttachment if err := db.Model(&model.SourceAttachment{}). @@ -151,10 +113,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul clientID = *walletObj.ActorID } - // track the current proof set so we can propose transfer when it fills - var currentProofSetID uint64 - var currentProofSetPieceCount int - var timer *time.Timer current := sumResult{} for { @@ -204,45 +162,15 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul } if scheduleComplete { - // propose transfer for any partially-filled proof set before exiting - if currentProofSetID != 0 && currentProofSetPieceCount > 0 { - if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for final proof set") - } - } if schedule.ScheduleCron != "" { return "", nil } return model.ScheduleCompleted, nil } - // cap batch to remaining room in current proof set - batchLimit := cfg.BatchSize - if currentProofSetID != 0 { - remaining := cfg.MaxPiecesPerProofSet - currentProofSetPieceCount - if remaining <= 0 { - // current proof set is full -- propose transfer and reset - if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for full proof set") - } - currentProofSetID = 0 - currentProofSetPieceCount = 0 - continue - } - if remaining < batchLimit { - batchLimit = remaining - } - } - - cars, err := d.findPDPCars(ctx, schedule, attachments, allowedPieceCIDs, overReplicatedCIDs, batchLimit) + cars, err := d.findPDPCars(ctx, schedule, attachments, allowedPieceCIDs, overReplicatedCIDs, cfg.BatchSize) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - // no more cars -- propose transfer for any partial proof set - if currentProofSetID != 0 && currentProofSetPieceCount > 0 { - if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for final proof set") - } - } if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual { return "", nil } @@ -251,11 +179,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul return model.ScheduleError, err } if len(cars) == 0 { - if currentProofSetID != 0 && currentProofSetPieceCount > 0 { - if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for final proof set") - } - } if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual { return "", nil } @@ -267,44 +190,22 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul } } - proofSetID, err := d.pdpProofSetManager.EnsureProofSet(ctx, evmSigner, schedule.Provider, cfg) - if err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to ensure PDP proof set") - } - if currentProofSetID == 0 { - currentProofSetID = proofSetID - // load existing piece count from DB in case we're resuming - var ps model.PDPProofSet - if err := db.Where("set_id = ?", proofSetID).First(&ps).Error; err == nil { - currentProofSetPieceCount = ps.PieceCount + pieceInputs := make([]PDPPieceInput, len(cars)) + for i, car := range cars { + pieceInputs[i] = PDPPieceInput{ + PieceCID: cid.Cid(car.PieceCID), + PieceSize: car.PieceSize, } } - pieceCIDs := make([]cid.Cid, 0, len(cars)) - pieceSizes := make([]int64, 0, len(cars)) - for _, car := range cars { - pieceCIDs = append(pieceCIDs, cid.Cid(car.PieceCID)) - pieceSizes = append(pieceSizes, car.PieceSize) - } - queuedTx, err := d.pdpProofSetManager.QueueAddRoots(ctx, evmSigner, proofSetID, pieceCIDs, pieceSizes, cfg) - if err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to queue PDP root addition transaction") - } - - _, err = d.pdpTxConfirmer.WaitForConfirmations(ctx, queuedTx.Hash, cfg.ConfirmationDepth, cfg.PollingInterval) + result, err := d.pdpProofSetManager.PullPiecesToFWSS(ctx, evmSigner, schedule.Provider, pieceInputs, cfg) if err != nil { - return model.ScheduleError, errors.Wrap(err, "failed waiting for PDP transaction confirmation") + return model.ScheduleError, errors.Wrap(err, "failed to push pieces via FWSS pull") } - // update durable piece count after confirmed on-chain add - if err := d.pdpProofSetManager.IncrementPieceCount(ctx, proofSetID, len(cars)); err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to update proof set piece count") - } - currentProofSetPieceCount += len(cars) - deals := make([]model.Deal, len(cars)) for i, car := range cars { - proofSetIDCopy := proofSetID + proofSetIDCopy := result.DataSetID deals[i] = model.Deal{ State: model.DealProposed, DealType: model.DealTypePDP, @@ -327,15 +228,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul current.DealNumber++ current.DealSize += car.PieceSize } - - // check if proof set is now full - if currentProofSetPieceCount >= cfg.MaxPiecesPerProofSet { - if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil { - return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for full proof set") - } - currentProofSetID = 0 - currentProofSetPieceCount = 0 - } } } diff --git a/service/dealpusher/pdp_wiring_test.go b/service/dealpusher/pdp_wiring_test.go index 8a20423df..e67249471 100644 --- a/service/dealpusher/pdp_wiring_test.go +++ b/service/dealpusher/pdp_wiring_test.go @@ -3,55 +3,40 @@ package dealpusher import ( "context" "testing" - "time" "github.com/data-preservation-programs/go-synapse/signer" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/util/keystore" "github.com/data-preservation-programs/singularity/util/testutil" - "github.com/ethereum/go-ethereum/common" "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" "gorm.io/gorm" ) +// proofSetManagerMock satisfies PDPProofSetManager. It records every batch +// it receives so tests can assert on the pieces and provider that were +// pushed. type proofSetManagerMock struct { - proofSetID uint64 - pieceCIDs []cid.Cid - proposedTransfers []proposedTransfer + dataSetID uint64 + calls []proofSetManagerCall + err error } -type proposedTransfer struct { - proofSetID uint64 - spEVMAddress common.Address +type proofSetManagerCall struct { + provider string + pieces []PDPPieceInput } -func (m *proofSetManagerMock) EnsureProofSet(_ context.Context, _ signer.EVMSigner, _ string, _ PDPSchedulingConfig) (uint64, error) { - return m.proofSetID, nil -} - -func (m *proofSetManagerMock) QueueAddRoots(_ context.Context, _ signer.EVMSigner, _ uint64, pieceCIDs []cid.Cid, _ []int64, _ PDPSchedulingConfig) (*PDPQueuedTx, error) { - m.pieceCIDs = append([]cid.Cid(nil), pieceCIDs...) - return &PDPQueuedTx{Hash: "0xabc"}, nil -} - -func (m *proofSetManagerMock) ProposeTransfer(_ context.Context, _ signer.EVMSigner, proofSetID uint64, spEVMAddress common.Address) error { - m.proposedTransfers = append(m.proposedTransfers, proposedTransfer{proofSetID, spEVMAddress}) - return nil -} - -func (m *proofSetManagerMock) IncrementPieceCount(_ context.Context, _ uint64, _ int) error { - return nil -} - -type txConfirmerMock struct { - txHash string -} - -func (m *txConfirmerMock) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) { - m.txHash = txHash - return &PDPTransactionReceipt{Hash: txHash}, nil +func (m *proofSetManagerMock) PullPiecesToFWSS(_ context.Context, _ signer.EVMSigner, provider string, pieces []PDPPieceInput, _ PDPSchedulingConfig) (PDPPullResult, error) { + if m.err != nil { + return PDPPullResult{}, m.err + } + m.calls = append(m.calls, proofSetManagerCall{ + provider: provider, + pieces: append([]PDPPieceInput(nil), pieces...), + }) + return PDPPullResult{DataSetID: m.dataSetID}, nil } func TestDealPusher_ResolveScheduleDealType_EmptyReturnsEmpty(t *testing.T) { @@ -85,7 +70,7 @@ func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t * require.Contains(t, err.Error(), "pdp scheduling dependencies are not configured") } -func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation(t *testing.T) { +func TestDealPusher_RunSchedule_PDPPushesBatchAndCreatesDeals(t *testing.T) { testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { clientSubaddr := make([]byte, 20) clientSubaddr[19] = 10 @@ -107,22 +92,20 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation actorID := "f01001" require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error) - // provider needs an actor record for EVM address resolution - providerActorID := "f01002" - require.NoError(t, db.Create(&model.Actor{ID: providerActorID, Address: providerAddr.String()}).Error) + require.NoError(t, db.Create(&model.Actor{ID: "f01002", Address: providerAddr.String()}).Error) - wallet := model.Wallet{ + walletObj := model.Wallet{ Address: clientAddr.String(), KeyPath: keyPath, KeyStore: "local", ActorID: &actorID, } - require.NoError(t, db.Create(&wallet).Error) - require.NoError(t, db.Model(&prep).Update("wallet_id", wallet.ID).Error) - storage := model.Storage{Name: "src-storage"} - require.NoError(t, db.Create(&storage).Error) - require.NotZero(t, storage.ID) - attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID} + require.NoError(t, db.Create(&walletObj).Error) + require.NoError(t, db.Model(&prep).Update("wallet_id", walletObj.ID).Error) + + storageRow := model.Storage{Name: "src-storage"} + require.NoError(t, db.Create(&storageRow).Error) + attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storageRow.ID} require.NoError(t, db.Create(&attachment).Error) require.NotZero(t, attachment.ID) @@ -143,42 +126,26 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation TotalDealNumber: 1, } require.NoError(t, db.Create(&schedule).Error) - schedule.Preparation = &model.Preparation{Wallet: &wallet} + schedule.Preparation = &model.Preparation{Wallet: &walletObj} - psm := &proofSetManagerMock{proofSetID: 42} - conf := &txConfirmerMock{} + psm := &proofSetManagerMock{dataSetID: 42} d := &DealPusher{ dbNoContext: db, keyStore: ks, pdpProofSetManager: psm, - pdpTxConfirmer: conf, pdpSchedulingConfig: defaultPDPSchedulingConfig(), scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP }, } - var attachments []model.SourceAttachment - require.NoError(t, db.Where("preparation_id = ?", schedule.PreparationID).Find(&attachments).Error) - require.Len(t, attachments, 1) - overReplicatedCIDs := db. - Table("deals"). - Select("piece_cid"). - Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}). - Group("piece_cid"). - Having("count(*) >= ?", d.maxReplicas) - cars, err := d.findPDPCars(ctx, &schedule, attachments, nil, overReplicatedCIDs, d.pdpSchedulingConfig.BatchSize) - require.NoError(t, err) - require.Len(t, cars, 1) state, err := d.runSchedule(ctx, &schedule) require.NoError(t, err) require.Equal(t, model.ScheduleCompleted, state) - require.Equal(t, "0xabc", conf.txHash) - require.Len(t, psm.pieceCIDs, 1) - require.Equal(t, cid.Cid(pieceCID), psm.pieceCIDs[0]) - // schedule completion should propose transfer for the partial proof set - require.Len(t, psm.proposedTransfers, 1) - require.Equal(t, uint64(42), psm.proposedTransfers[0].proofSetID) - require.Equal(t, common.BytesToAddress(providerSubaddr), psm.proposedTransfers[0].spEVMAddress) + require.Len(t, psm.calls, 1, "expected one batch push") + require.Equal(t, providerAddr.String(), psm.calls[0].provider) + require.Len(t, psm.calls[0].pieces, 1) + require.Equal(t, cid.Cid(pieceCID), psm.calls[0].pieces[0].PieceCID) + require.Equal(t, int64(1024), psm.calls[0].pieces[0].PieceSize) var deals []model.Deal require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error) @@ -190,7 +157,7 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation require.NotNil(t, deals[0].ProofSetID) require.Equal(t, uint64(42), *deals[0].ProofSetID) require.NotNil(t, deals[0].WalletID) - require.Equal(t, wallet.ID, *deals[0].WalletID) + require.Equal(t, walletObj.ID, *deals[0].WalletID) }) } @@ -216,18 +183,18 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) { actorID := "f01001" require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error) require.NoError(t, db.Create(&model.Actor{ID: "f01002", Address: providerAddr.String()}).Error) - wallet := model.Wallet{ + walletObj := model.Wallet{ Address: clientAddr.String(), KeyPath: keyPath, KeyStore: "local", ActorID: &actorID, } - require.NoError(t, db.Create(&wallet).Error) - require.NoError(t, db.Model(&prep).Update("wallet_id", wallet.ID).Error) + require.NoError(t, db.Create(&walletObj).Error) + require.NoError(t, db.Model(&prep).Update("wallet_id", walletObj.ID).Error) - storage := model.Storage{Name: "src-storage"} - require.NoError(t, db.Create(&storage).Error) - attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID} + storageRow := model.Storage{Name: "src-storage"} + require.NoError(t, db.Create(&storageRow).Error) + attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storageRow.ID} require.NoError(t, db.Create(&attachment).Error) pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024)) @@ -235,7 +202,7 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) { AttachmentID: &attachment.ID, PreparationID: &prep.ID, PieceCID: pieceCID, - PieceSize: 1536, // Not a power of two. + PieceSize: 1536, // not a power of two StoragePath: "car-1", } require.NoError(t, db.Create(&car).Error) @@ -247,15 +214,13 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) { TotalDealNumber: 1, } require.NoError(t, db.Create(&schedule).Error) - schedule.Preparation = &model.Preparation{Wallet: &wallet} + schedule.Preparation = &model.Preparation{Wallet: &walletObj} - psm := &proofSetManagerMock{proofSetID: 42} - conf := &txConfirmerMock{} + psm := &proofSetManagerMock{dataSetID: 42} d := &DealPusher{ dbNoContext: db, keyStore: ks, pdpProofSetManager: psm, - pdpTxConfirmer: conf, pdpSchedulingConfig: defaultPDPSchedulingConfig(), scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP }, } @@ -265,8 +230,7 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) { require.Equal(t, model.ScheduleError, state) require.ErrorContains(t, err, "invalid piece size for piece") require.ErrorContains(t, err, "must be a power of two") - require.Empty(t, conf.txHash) - require.Empty(t, psm.pieceCIDs) + require.Empty(t, psm.calls) var deals []model.Deal require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error) @@ -296,18 +260,18 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi actorID := "f01001" require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error) require.NoError(t, db.Create(&model.Actor{ID: "f01002", Address: providerAddr.String()}).Error) - wallet := model.Wallet{ + walletObj := model.Wallet{ Address: clientAddr.String(), KeyPath: keyPath, KeyStore: "local", ActorID: &actorID, } - require.NoError(t, db.Create(&wallet).Error) - require.NoError(t, db.Model(&prep).Update("wallet_id", wallet.ID).Error) + require.NoError(t, db.Create(&walletObj).Error) + require.NoError(t, db.Model(&prep).Update("wallet_id", walletObj.ID).Error) - storage := model.Storage{Name: "src-storage"} - require.NoError(t, db.Create(&storage).Error) - attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID} + storageRow := model.Storage{Name: "src-storage"} + require.NoError(t, db.Create(&storageRow).Error) + attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storageRow.ID} require.NoError(t, db.Create(&attachment).Error) pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024)) @@ -315,7 +279,7 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi AttachmentID: &attachment.ID, PreparationID: &prep.ID, PieceCID: pieceCID, - PieceSize: 1 << 31, // 2 GiB, above current 1 GiB minus FR32 overhead PDP limit. + PieceSize: 1 << 31, // 2 GiB, above current 1 GiB minus FR32 overhead PDP limit StoragePath: "car-1", } require.NoError(t, db.Create(&car).Error) @@ -327,15 +291,13 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi TotalDealNumber: 1, } require.NoError(t, db.Create(&schedule).Error) - schedule.Preparation = &model.Preparation{Wallet: &wallet} + schedule.Preparation = &model.Preparation{Wallet: &walletObj} - psm := &proofSetManagerMock{proofSetID: 42} - conf := &txConfirmerMock{} + psm := &proofSetManagerMock{dataSetID: 42} d := &DealPusher{ dbNoContext: db, keyStore: ks, pdpProofSetManager: psm, - pdpTxConfirmer: conf, pdpSchedulingConfig: defaultPDPSchedulingConfig(), scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP }, } @@ -344,8 +306,7 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi require.Error(t, err) require.Equal(t, model.ScheduleError, state) require.ErrorContains(t, err, "piece limit is 1 GiB minus FR32 overhead") - require.Empty(t, conf.txHash) - require.Empty(t, psm.pieceCIDs) + require.Empty(t, psm.calls) var deals []model.Deal require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error) From 9a46a34996c0dc18f88fcae898ade8aa3376fa34 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Fri, 1 May 2026 00:01:54 +0200 Subject: [PATCH 2/5] go.mod: bump go-synapse to e59b042 (SignDigest+AuthHelper refactor) Drops the local replace directive that was pointing at the in-flight go-synapse branch. The dealpusher pull flow now resolves cleanly via the upstream EVMSigner.SignDigest + pdp.NewAuthHelper(SignDigestFunc, ...) APIs. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ed7045460..41d2abafd 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/bcicen/jstream v1.0.1 github.com/brianvoe/gofakeit/v6 v6.23.2 github.com/cockroachdb/errors v1.11.3 - github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17 + github.com/data-preservation-programs/go-synapse v0.0.0-20260430215910-e59b042352fb github.com/data-preservation-programs/table v0.0.3 github.com/dustin/go-humanize v1.0.1 github.com/ethereum/go-ethereum v1.14.12 diff --git a/go.sum b/go.sum index e1b31d590..7f309aa72 100644 --- a/go.sum +++ b/go.sum @@ -275,8 +275,8 @@ github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+ github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7kthcD7UzbnoA= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= -github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17 h1:pTOIdr5W7pHrxFaQdONxHWiCtIOgdc6zfKL82SS4xhI= -github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc= +github.com/data-preservation-programs/go-synapse v0.0.0-20260430215910-e59b042352fb h1:sAa+NZLPiw1CJUTKW9dRh1KevPjYuNdotWyRNX9vF4c= +github.com/data-preservation-programs/go-synapse v0.0.0-20260430215910-e59b042352fb/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc= github.com/data-preservation-programs/table v0.0.3 h1:hboeauxPXybE8KlMA+RjDXz/J4xaG5CAFCcxyOm8yWo= github.com/data-preservation-programs/table v0.0.3/go.mod h1:sRGP/IuuqFc/y9QfmDyb5h6Q2wrnhhnBofEOj9aDRJg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From 1122868c6fb960ef2982a8605aa0377e15ddf650 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Fri, 1 May 2026 00:28:06 +0200 Subject: [PATCH 3/5] dealpusher: import cockroachdb/errors as errors in pdp_pull.go The rest of service/dealpusher/ imports github.com/cockroachdb/errors under its natural name 'errors' (120 errors.Wrap* call sites). The FWSS-pull rewrite was an outlier with 21 cockroach.Wrap* call sites and a separate stdlib errors import. Drop the alias and the stdlib errors import; cockroachdb/errors covers errors.New/Is/Wrap/Wrapf for everything we use here. --- service/dealpusher/pdp_pull.go | 56 ++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/service/dealpusher/pdp_pull.go b/service/dealpusher/pdp_pull.go index 474f08eaa..9db96f14f 100644 --- a/service/dealpusher/pdp_pull.go +++ b/service/dealpusher/pdp_pull.go @@ -3,14 +3,13 @@ package dealpusher import ( "context" "crypto/rand" - "errors" "fmt" "math/big" "strings" "sync" "time" - cockroach "github.com/cockroachdb/errors" + "github.com/cockroachdb/errors" synapse "github.com/data-preservation-programs/go-synapse" "github.com/data-preservation-programs/go-synapse/constants" "github.com/data-preservation-programs/go-synapse/pdp" @@ -76,13 +75,13 @@ func NewOnChainPDP(ctx context.Context, cfg OnChainPDPConfig) (*OnChainPDP, erro ethClient, err := ethclient.DialContext(ctx, cfg.RPCURL) if err != nil { - return nil, cockroach.Wrap(err, "failed to connect to FEVM RPC") + return nil, errors.Wrap(err, "failed to connect to FEVM RPC") } network, chainIDInt64, err := synapse.DetectNetwork(ctx, ethClient) if err != nil { ethClient.Close() - return nil, cockroach.Wrap(err, "failed to detect FEVM network") + return nil, errors.Wrap(err, "failed to detect FEVM network") } chainID := big.NewInt(chainIDInt64) @@ -103,7 +102,7 @@ func NewOnChainPDP(ctx context.Context, cfg OnChainPDPConfig) (*OnChainPDP, erro spReg, err := spregistry.NewService(ethClient, spRegAddr, nil, chainID) if err != nil { ethClient.Close() - return nil, cockroach.Wrap(err, "failed to bind ServiceProviderRegistry") + return nil, errors.Wrap(err, "failed to bind ServiceProviderRegistry") } Logger.Infow("initialized FWSS-pull adapter", @@ -154,13 +153,13 @@ func (o *OnChainPDP) PullPiecesToFWSS( payerAddr := evmSigner.EVMAddress() clientDelegated, err := commonToDelegatedAddress(payerAddr) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "derive delegated payer address") + return PDPPullResult{}, errors.Wrap(err, "derive delegated payer address") } clientAddrStr := clientDelegated.String() info, err := o.lookupSPInfo(ctx, provider) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "SP service-URL lookup") + return PDPPullResult{}, errors.Wrap(err, "SP service-URL lookup") } // Convert v1 piece CIDs to CommPv2 (what FWSS / Curio expect). @@ -171,7 +170,7 @@ func (o *OnChainPDP) PullPiecesToFWSS( payloadSize := uint64(p.PieceSize) * 127 / 128 v2, err := commcid.PieceCidV2FromV1(p.PieceCID, payloadSize) if err != nil { - return PDPPullResult{}, cockroach.Wrapf(err, "convert piece %s to CommPv2", p.PieceCID) + return PDPPullResult{}, errors.Wrapf(err, "convert piece %s to CommPv2", p.PieceCID) } pieceCIDsV2[i] = v2 pullInputs[i] = pdp.PullPieceInput{ @@ -227,11 +226,11 @@ func (o *OnChainPDP) addToExisting( addResp, err := pdpServer.AddPieces(ctx, int(existing.SetID), pieceCIDsV2, addExtra) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "POST /pdp/data-sets/{id}/pieces") + return PDPPullResult{}, errors.Wrap(err, "POST /pdp/data-sets/{id}/pieces") } status, err := pdpServer.WaitForPieceAddition(ctx, int(existing.SetID), addResp.TxHash, cfg.PullTimeout) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "wait for add-pieces tx") + return PDPPullResult{}, errors.Wrap(err, "wait for add-pieces tx") } if status.AddMessageOK == nil || !*status.AddMessageOK { return PDPPullResult{}, fmt.Errorf("add-pieces tx %s did not confirm successfully", addResp.TxHash) @@ -241,7 +240,7 @@ func (o *OnChainPDP) addToExisting( Model(&model.PDPProofSet{}). Where("set_id = ?", existing.SetID). UpdateColumn("piece_count", gorm.Expr("piece_count + ?", len(pieceCIDsV2))).Error; err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "increment piece_count") + return PDPPullResult{}, errors.Wrap(err, "increment piece_count") } return PDPPullResult{DataSetID: existing.SetID}, nil @@ -275,7 +274,7 @@ func (o *OnChainPDP) createNewSet( } combined, err := pdp.EncodeCreateDataSetAndAddPiecesExtraData(createExtra, addExtra) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "wrap create+add extra data") + return PDPPullResult{}, errors.Wrap(err, "wrap create+add extra data") } if err := waitForPullComplete(ctx, pdpServer, pdp.PullPiecesOptions{ @@ -289,12 +288,12 @@ func (o *OnChainPDP) createNewSet( createResp, err := pdpServer.CreateDataSetAndAddPieces(ctx, o.recordKeeper.Hex(), pieceCIDsV2, combined) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "POST /pdp/data-sets/create-and-add") + return PDPPullResult{}, errors.Wrap(err, "POST /pdp/data-sets/create-and-add") } status, err := pdpServer.WaitForDataSetCreation(ctx, createResp.TxHash, cfg.PullTimeout) if err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "wait for create-and-add tx") + return PDPPullResult{}, errors.Wrap(err, "wait for create-and-add tx") } if status.DataSetID == nil { return PDPPullResult{}, fmt.Errorf("create-and-add tx %s confirmed but no dataSetId returned", createResp.TxHash) @@ -316,7 +315,7 @@ func (o *OnChainPDP) createNewSet( Where("set_id = ?", dataSetID). Assign(row). FirstOrCreate(&model.PDPProofSet{}).Error; err != nil { - return PDPPullResult{}, cockroach.Wrap(err, "persist new proof set") + return PDPPullResult{}, errors.Wrap(err, "persist new proof set") } return PDPPullResult{DataSetID: dataSetID}, nil @@ -327,7 +326,7 @@ func (o *OnChainPDP) createNewSet( func waitForPullComplete(ctx context.Context, pdpServer *pdp.Server, opts pdp.PullPiecesOptions, timeout time.Duration) error { resp, err := pdpServer.WaitForPullPieces(ctx, opts, timeout) if err != nil { - return cockroach.Wrap(err, "wait for /pdp/piece/pull") + return errors.Wrap(err, "wait for /pdp/piece/pull") } switch resp.Status { case pdp.PullStatusComplete: @@ -342,7 +341,7 @@ func waitForPullComplete(ctx context.Context, pdpServer *pdp.Server, opts pdp.Pu func signCreateDataSetExtra(authHelper *pdp.AuthHelper, payer, payee common.Address, clientDataSetID *big.Int) (string, error) { sig, err := authHelper.SignCreateDataSet(clientDataSetID, payee, nil) if err != nil { - return "", cockroach.Wrap(err, "sign CreateDataSet") + return "", errors.Wrap(err, "sign CreateDataSet") } return pdp.EncodeDataSetCreateData(payer, clientDataSetID, nil, sig.Signature) } @@ -353,7 +352,7 @@ func signAddPiecesExtra(authHelper *pdp.AuthHelper, clientDataSetID *big.Int, pi nonce := big.NewInt(0) sig, err := authHelper.SignAddPieces(clientDataSetID, nonce, pieceCIDsV2, nil) if err != nil { - return "", cockroach.Wrap(err, "sign AddPieces") + return "", errors.Wrap(err, "sign AddPieces") } return pdp.EncodeAddPiecesExtraData(nonce, nil, sig.Signature) } @@ -375,7 +374,7 @@ func (o *OnChainPDP) findProofSetWithRoom(ctx context.Context, clientAddress, pr if errors.Is(err, gorm.ErrRecordNotFound) { return nil, nil } - return nil, cockroach.Wrap(err, "query assembling proof sets") + return nil, errors.Wrap(err, "query assembling proof sets") } func (o *OnChainPDP) lookupSPInfo(ctx context.Context, provider string) (spInfo, error) { @@ -386,14 +385,25 @@ func (o *OnChainPDP) lookupSPInfo(ctx context.Context, provider string) (spInfo, } o.spURLCacheMu.Unlock() - evmAddr, err := delegatedToCommonAddress(provider) + // schedule.Provider is the t0 form after StateLookupID; resolve to + // the delegated (f410) form via lotus if it's not already. + delegated := provider + if addr, perr := address.NewFromString(provider); perr != nil || addr.Protocol() != address.Delegated { + var robust string + if rerr := o.ethClient.Client().CallContext(ctx, &robust, "Filecoin.StateLookupRobustAddress", provider, nil); rerr != nil { + return spInfo{}, errors.Wrapf(rerr, "resolve robust address for provider %s", provider) + } + delegated = robust + } + + evmAddr, err := delegatedToCommonAddress(delegated) if err != nil { - return spInfo{}, cockroach.Wrapf(err, "decode provider address %s", provider) + return spInfo{}, errors.Wrapf(err, "decode provider address %s", delegated) } pi, err := o.spRegistry.GetProviderByAddress(ctx, evmAddr) if err != nil { - return spInfo{}, cockroach.Wrapf(err, "ServiceProviderRegistry.getProviderByAddress(%s)", evmAddr.Hex()) + return spInfo{}, errors.Wrapf(err, "ServiceProviderRegistry.getProviderByAddress(%s)", evmAddr.Hex()) } if pi == nil { return spInfo{}, fmt.Errorf("provider %s not registered in ServiceProviderRegistry", provider) @@ -437,7 +447,7 @@ func delegatedToCommonAddress(s string) (common.Address, error) { func commonToDelegatedAddress(subaddr common.Address) (address.Address, error) { addr, err := address.NewDelegatedAddress(10, subaddr.Bytes()) if err != nil { - return address.Undef, cockroach.Wrap(err, "encode delegated address") + return address.Undef, errors.Wrap(err, "encode delegated address") } return addr, nil } From 146c7f5f1527ff1083450bb8a0ec6a9c40ddd448 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Sun, 3 May 2026 21:28:03 +0100 Subject: [PATCH 4/5] go.mod: bump go-synapse to 40ae25e Picks up: - d91ba0c, 2406bd9: spregistry getProvider* abi unpack fixes (was blocking Service.GetProviderByAddress on calibnet) - 35c6f36: abix.UnpackSingleTuple extracted from spregistry, contracts/payments.go callers migrated - 3273a9a: txutil retry/nonce helper cleanup - 40ae25e: gosimple S1016 cleanup in payments.go --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 41d2abafd..a509c2123 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/bcicen/jstream v1.0.1 github.com/brianvoe/gofakeit/v6 v6.23.2 github.com/cockroachdb/errors v1.11.3 - github.com/data-preservation-programs/go-synapse v0.0.0-20260430215910-e59b042352fb + github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1 github.com/data-preservation-programs/table v0.0.3 github.com/dustin/go-humanize v1.0.1 github.com/ethereum/go-ethereum v1.14.12 diff --git a/go.sum b/go.sum index 7f309aa72..ec240559d 100644 --- a/go.sum +++ b/go.sum @@ -275,8 +275,8 @@ github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+ github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7kthcD7UzbnoA= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= -github.com/data-preservation-programs/go-synapse v0.0.0-20260430215910-e59b042352fb h1:sAa+NZLPiw1CJUTKW9dRh1KevPjYuNdotWyRNX9vF4c= -github.com/data-preservation-programs/go-synapse v0.0.0-20260430215910-e59b042352fb/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc= +github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1 h1:SGQs5b7eyjycfxKDRHmzZW613BjjRkDT0XITafKK50A= +github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc= github.com/data-preservation-programs/table v0.0.3 h1:hboeauxPXybE8KlMA+RjDXz/J4xaG5CAFCcxyOm8yWo= github.com/data-preservation-programs/table v0.0.3/go.mod h1:sRGP/IuuqFc/y9QfmDyb5h6Q2wrnhhnBofEOj9aDRJg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From bf0a1eb780578a2d7e984a8752d46311dfdaf770 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Sun, 3 May 2026 21:46:02 +0100 Subject: [PATCH 5/5] docs: regenerate CLI reference for FWSS-pull flags go generate output for the new send-manual-pdp --payload-size flag and the deal-pusher --pdp-source-url-base / --pdp-pull-timeout / --pdp-record-keeper flag set. --- docs/en/cli-reference/deal/README.md | 2 +- docs/en/cli-reference/deal/send-manual-pdp.md | 26 +++++++++++-------- docs/en/cli-reference/run/deal-pusher.md | 9 ++++--- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/docs/en/cli-reference/deal/README.md b/docs/en/cli-reference/deal/README.md index ce63478bf..994556f15 100644 --- a/docs/en/cli-reference/deal/README.md +++ b/docs/en/cli-reference/deal/README.md @@ -11,7 +11,7 @@ USAGE: COMMANDS: schedule Schedule deals send-manual Send a manual deal proposal to boost or legacy market - send-manual-pdp Send a manual PDP deal on-chain + send-manual-pdp Send a manual PDP deal via the FWSS-pull flow list List all deals help, h Shows a list of commands or help for one command diff --git a/docs/en/cli-reference/deal/send-manual-pdp.md b/docs/en/cli-reference/deal/send-manual-pdp.md index d6fcce094..a7b557a67 100644 --- a/docs/en/cli-reference/deal/send-manual-pdp.md +++ b/docs/en/cli-reference/deal/send-manual-pdp.md @@ -1,24 +1,28 @@ -# Send a manual PDP deal on-chain +# Send a manual PDP deal via the FWSS-pull flow {% code fullWidth="true" %} ``` NAME: - singularity deal send-manual-pdp - Send a manual PDP deal on-chain + singularity deal send-manual-pdp - Send a manual PDP deal via the FWSS-pull flow USAGE: singularity deal send-manual-pdp [command options] DESCRIPTION: - Create/reuse a proof set and add a piece to it on-chain via PDPVerifier. - Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 + Push a single piece to an SP via Curio's /pdp/piece/pull, then trigger the + SP's on-chain commit (createDataSet+addPieces if no assembling set yet, or addPieces + into the existing one). Useful for e2e/diagnostic testing of the FWSS pull path. + Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 --source-url-base https://static.example.org OPTIONS: - --client value Client wallet address (must be imported) - --provider value Storage provider f4/t4 address - --piece-cid value Piece CID (commp) - --piece-size value Piece size in bytes (default: 0) - --eth-rpc value FEVM JSON-RPC endpoint [$ETH_RPC_URL] - --confirmation-depth value Blocks to wait for tx confirmation (default: 5) - --help, -h show help + --client value Client wallet address (must be imported) + --provider value Storage provider f4/t4 address + --piece-cid value Piece CID (commp v1) + --piece-size value Padded piece size in bytes (default: 0) + --eth-rpc value FEVM JSON-RPC endpoint [$ETH_RPC_URL] + --source-url-base value HTTPS base where Curio fetches the piece (sourceUrl = /piece/) [$PDP_SOURCE_URL_BASE] + --record-keeper value FWSS contract address. Defaults to network FWSS from go-synapse. [$PDP_RECORD_KEEPER] + --pull-timeout value How long to wait for Curio to finish each phase (default: 5m0s) + --help, -h show help ``` {% endcode %} diff --git a/docs/en/cli-reference/run/deal-pusher.md b/docs/en/cli-reference/run/deal-pusher.md index 05e03498c..88e4990ad 100644 --- a/docs/en/cli-reference/run/deal-pusher.md +++ b/docs/en/cli-reference/run/deal-pusher.md @@ -12,10 +12,11 @@ OPTIONS: --no-automigrate skip automatic database migration and correctness checks on startup; only use if you run 'admin init' on every upgrade or manually before starting daemons (default: false) --deal-attempts value, -d value Number of times to attempt a deal before giving up (default: 3) --max-replication-factor value, -M value Max number of replicas for each individual PieceCID across all clients and providers (default: Unlimited) - --pdp-batch-size value Number of roots to include in each PDP add-roots transaction (default: 128) - --pdp-max-pieces-per-proofset value Maximum pieces per proof set before handing off to the storage provider (default: 1024) - --pdp-confirmation-depth value Number of block confirmations required for PDP transactions (default: 5) - --pdp-poll-interval value Polling interval for PDP transaction confirmation checks (default: 30s) + --pdp-batch-size value Number of pieces to include in each /pdp/piece/pull request (default: 128) + --pdp-max-pieces-per-proofset value Maximum pieces per proof set before starting a new one (default: 1024) + --pdp-pull-timeout value How long to wait for Curio to finish pulling a batch (per request) (default: 5m0s) + --pdp-source-url-base value HTTPS base URL where Curio fetches pieces from; sourceUrl is built as /piece/ [$PDP_SOURCE_URL_BASE] + --pdp-record-keeper value FWSS contract address (recordKeeper). Defaults to the network default from go-synapse. [$PDP_RECORD_KEEPER] --eth-rpc value Ethereum RPC endpoint for FEVM (required to execute PDP and DDO schedules on-chain) [$ETH_RPC_URL] --ddo-contract value DDO Diamond proxy contract address [$DDO_CONTRACT_ADDRESS] --ddo-payments-contract value DDO Payments proxy contract address [$DDO_PAYMENTS_CONTRACT_ADDRESS]