diff --git a/cmd/deal/send-manual-pdp.go b/cmd/deal/send-manual-pdp.go
index 0530ba5c1..d6d95e3d8 100644
--- a/cmd/deal/send-manual-pdp.go
+++ b/cmd/deal/send-manual-pdp.go
@@ -18,9 +18,11 @@ import (
var SendManualPDPCmd = &cli.Command{
Name: "send-manual-pdp",
- Usage: "Send a manual PDP deal on-chain",
- Description: `Create/reuse a proof set and add a piece to it on-chain via PDPVerifier.
- Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1`,
+ Usage: "Send a manual PDP deal via the FWSS-pull flow",
+ Description: `Push a single piece to an SP via Curio's /pdp/piece/pull, then trigger the
+SP's on-chain commit (createDataSet+addPieces if no assembling set yet, or addPieces
+into the existing one). Useful for e2e/diagnostic testing of the FWSS pull path.
+ Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 --source-url-base https://static.example.org`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "client",
@@ -34,12 +36,12 @@ var SendManualPDPCmd = &cli.Command{
},
&cli.StringFlag{
Name: "piece-cid",
- Usage: "Piece CID (commp)",
+ Usage: "Piece CID (commp v1)",
Required: true,
},
&cli.Int64Flag{
Name: "piece-size",
- Usage: "Piece size in bytes",
+ Usage: "Padded piece size in bytes",
Required: true,
},
&cli.StringFlag{
@@ -48,10 +50,21 @@ var SendManualPDPCmd = &cli.Command{
EnvVars: []string{"ETH_RPC_URL"},
Required: true,
},
- &cli.Uint64Flag{
- Name: "confirmation-depth",
- Usage: "Blocks to wait for tx confirmation",
- Value: 5,
+ &cli.StringFlag{
+ Name: "source-url-base",
+ Usage: "HTTPS base where Curio fetches the piece (sourceUrl = /piece/)",
+ EnvVars: []string{"PDP_SOURCE_URL_BASE"},
+ Required: true,
+ },
+ &cli.StringFlag{
+ Name: "record-keeper",
+ Usage: "FWSS contract address. Defaults to network FWSS from go-synapse.",
+ EnvVars: []string{"PDP_RECORD_KEEPER"},
+ },
+ &cli.DurationFlag{
+ Name: "pull-timeout",
+ Usage: "How long to wait for Curio to finish each phase",
+ Value: 5 * time.Minute,
},
},
Action: func(c *cli.Context) error {
@@ -61,13 +74,17 @@ var SendManualPDPCmd = &cli.Command{
}
defer closer.Close()
- pdp, err := dealpusher.NewOnChainPDP(c.Context, db, c.String("eth-rpc"))
+ pdp, err := dealpusher.NewOnChainPDP(c.Context, dealpusher.OnChainPDPConfig{
+ DB: db,
+ RPCURL: c.String("eth-rpc"),
+ SourceURLBase: c.String("source-url-base"),
+ RecordKeeper: c.String("record-keeper"),
+ })
if err != nil {
return errors.WithStack(err)
}
defer pdp.Close()
- // load wallet
var walletObj model.Wallet
err = db.WithContext(c.Context).Where("address = ?", c.String("client")).First(&walletObj).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
@@ -86,56 +103,40 @@ var SendManualPDPCmd = &cli.Command{
return errors.WithStack(err)
}
- provider := c.String("provider")
-
- cfg := dealpusher.PDPSchedulingConfig{
- BatchSize: 1,
- MaxPiecesPerProofSet: 1024,
- ConfirmationDepth: c.Uint64("confirmation-depth"),
- PollingInterval: 5 * time.Second,
- }
-
- // ensure proof set exists (or create one)
- fmt.Println("ensuring proof set...")
- proofSetID, err := pdp.EnsureProofSet(c.Context, evmSigner, provider, cfg)
- if err != nil {
- return errors.Wrap(err, "failed to ensure proof set")
- }
- fmt.Printf("proof set ID: %d\n", proofSetID)
-
- // parse piece cid
pieceCID, err := cid.Parse(c.String("piece-cid"))
if err != nil {
return errors.Wrap(err, "invalid piece CID")
}
-
- // add piece to proof set
- fmt.Println("submitting add-roots tx...")
pieceSize := c.Int64("piece-size")
- queuedTx, err := pdp.QueueAddRoots(c.Context, evmSigner, proofSetID, []cid.Cid{pieceCID}, []int64{pieceSize}, cfg)
- if err != nil {
- return errors.Wrap(err, "failed to add roots")
+
+ cfg := dealpusher.PDPSchedulingConfig{
+ BatchSize: 1,
+ MaxPiecesPerProofSet: 1024,
+ PullTimeout: c.Duration("pull-timeout"),
}
- fmt.Printf("tx: %s\n", queuedTx.Hash)
- // wait for confirmation
- fmt.Println("waiting for confirmation...")
- receipt, err := pdp.WaitForConfirmations(c.Context, queuedTx.Hash, cfg.ConfirmationDepth, cfg.PollingInterval)
+ fmt.Println("pushing piece to SP via /pdp/piece/pull + on-chain commit...")
+ result, err := pdp.PullPiecesToFWSS(
+ c.Context,
+ evmSigner,
+ c.String("provider"),
+ []dealpusher.PDPPieceInput{{PieceCID: pieceCID, PieceSize: pieceSize}},
+ cfg,
+ )
if err != nil {
- return errors.Wrap(err, "tx failed")
+ return errors.Wrap(err, "FWSS pull push failed")
}
- fmt.Printf("confirmed at block %d (gas: %d)\n", receipt.BlockNumber, receipt.GasUsed)
+ fmt.Printf("data set ID: %d\n", result.DataSetID)
- // save deal record
- proofSetIDCopy := proofSetID
+ dataSetIDCopy := result.DataSetID
dealModel := &model.Deal{
State: model.DealProposed,
DealType: model.DealTypePDP,
- Provider: provider,
+ Provider: c.String("provider"),
PieceCID: model.CID(pieceCID),
PieceSize: pieceSize,
WalletID: &walletObj.ID,
- ProofSetID: &proofSetIDCopy,
+ ProofSetID: &dataSetIDCopy,
}
if err := db.WithContext(c.Context).Create(dealModel).Error; err != nil {
return errors.Wrap(err, "failed to save deal")
diff --git a/cmd/run/dealpusher.go b/cmd/run/dealpusher.go
index b78a25ff0..c36360b4a 100644
--- a/cmd/run/dealpusher.go
+++ b/cmd/run/dealpusher.go
@@ -29,23 +29,28 @@ var DealPusherCmd = &cli.Command{
},
&cli.IntFlag{
Name: "pdp-batch-size",
- Usage: "Number of roots to include in each PDP add-roots transaction",
+ Usage: "Number of pieces to include in each /pdp/piece/pull request",
Value: 128,
},
&cli.IntFlag{
Name: "pdp-max-pieces-per-proofset",
- Usage: "Maximum pieces per proof set before handing off to the storage provider",
+ Usage: "Maximum pieces per proof set before starting a new one",
Value: 1024,
},
- &cli.Uint64Flag{
- Name: "pdp-confirmation-depth",
- Usage: "Number of block confirmations required for PDP transactions",
- Value: 5,
- },
&cli.DurationFlag{
- Name: "pdp-poll-interval",
- Usage: "Polling interval for PDP transaction confirmation checks",
- Value: 30 * time.Second,
+ Name: "pdp-pull-timeout",
+ Usage: "How long to wait for Curio to finish pulling a batch (per request)",
+ Value: 5 * time.Minute,
+ },
+ &cli.StringFlag{
+ Name: "pdp-source-url-base",
+ Usage: "HTTPS base URL where Curio fetches pieces from; sourceUrl is built as /piece/",
+ EnvVars: []string{"PDP_SOURCE_URL_BASE"},
+ },
+ &cli.StringFlag{
+ Name: "pdp-record-keeper",
+ Usage: "FWSS contract address (recordKeeper). Defaults to the network default from go-synapse.",
+ EnvVars: []string{"PDP_RECORD_KEEPER"},
},
&cli.StringFlag{
Name: "eth-rpc",
@@ -114,8 +119,7 @@ var DealPusherCmd = &cli.Command{
pdpCfg := dealpusher.PDPSchedulingConfig{
BatchSize: c.Int("pdp-batch-size"),
MaxPiecesPerProofSet: c.Int("pdp-max-pieces-per-proofset"),
- ConfirmationDepth: c.Uint64("pdp-confirmation-depth"),
- PollingInterval: c.Duration("pdp-poll-interval"),
+ PullTimeout: c.Duration("pdp-pull-timeout"),
}
if err := pdpCfg.Validate(); err != nil {
return errors.WithStack(err)
@@ -125,16 +129,19 @@ var DealPusherCmd = &cli.Command{
dealpusher.WithPDPSchedulingConfig(pdpCfg),
}
if rpcURL := c.String("eth-rpc"); rpcURL != "" {
- pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, db, rpcURL)
+ adapterCfg := dealpusher.OnChainPDPConfig{
+ DB: db,
+ RPCURL: rpcURL,
+ SourceURLBase: c.String("pdp-source-url-base"),
+ RecordKeeper: c.String("pdp-record-keeper"),
+ }
+ pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, adapterCfg)
if err != nil {
return errors.Wrap(err, "failed to initialize PDP on-chain adapter")
}
defer pdpAdapter.Close()
- opts = append(opts,
- dealpusher.WithPDPProofSetManager(pdpAdapter),
- dealpusher.WithPDPTransactionConfirmer(pdpAdapter),
- )
+ opts = append(opts, dealpusher.WithPDPProofSetManager(pdpAdapter))
}
if ddoContract := c.String("ddo-contract"); ddoContract != "" {
diff --git a/docs/en/cli-reference/deal/README.md b/docs/en/cli-reference/deal/README.md
index ce63478bf..994556f15 100644
--- a/docs/en/cli-reference/deal/README.md
+++ b/docs/en/cli-reference/deal/README.md
@@ -11,7 +11,7 @@ USAGE:
COMMANDS:
schedule Schedule deals
send-manual Send a manual deal proposal to boost or legacy market
- send-manual-pdp Send a manual PDP deal on-chain
+ send-manual-pdp Send a manual PDP deal via the FWSS-pull flow
list List all deals
help, h Shows a list of commands or help for one command
diff --git a/docs/en/cli-reference/deal/send-manual-pdp.md b/docs/en/cli-reference/deal/send-manual-pdp.md
index d6fcce094..a7b557a67 100644
--- a/docs/en/cli-reference/deal/send-manual-pdp.md
+++ b/docs/en/cli-reference/deal/send-manual-pdp.md
@@ -1,24 +1,28 @@
-# Send a manual PDP deal on-chain
+# Send a manual PDP deal via the FWSS-pull flow
{% code fullWidth="true" %}
```
NAME:
- singularity deal send-manual-pdp - Send a manual PDP deal on-chain
+ singularity deal send-manual-pdp - Send a manual PDP deal via the FWSS-pull flow
USAGE:
singularity deal send-manual-pdp [command options]
DESCRIPTION:
- Create/reuse a proof set and add a piece to it on-chain via PDPVerifier.
- Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1
+ Push a single piece to an SP via Curio's /pdp/piece/pull, then trigger the
+ SP's on-chain commit (createDataSet+addPieces if no assembling set yet, or addPieces
+ into the existing one). Useful for e2e/diagnostic testing of the FWSS pull path.
+ Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 --source-url-base https://static.example.org
OPTIONS:
- --client value Client wallet address (must be imported)
- --provider value Storage provider f4/t4 address
- --piece-cid value Piece CID (commp)
- --piece-size value Piece size in bytes (default: 0)
- --eth-rpc value FEVM JSON-RPC endpoint [$ETH_RPC_URL]
- --confirmation-depth value Blocks to wait for tx confirmation (default: 5)
- --help, -h show help
+ --client value Client wallet address (must be imported)
+ --provider value Storage provider f4/t4 address
+ --piece-cid value Piece CID (commp v1)
+ --piece-size value Padded piece size in bytes (default: 0)
+ --eth-rpc value FEVM JSON-RPC endpoint [$ETH_RPC_URL]
+ --source-url-base value HTTPS base where Curio fetches the piece (sourceUrl = /piece/) [$PDP_SOURCE_URL_BASE]
+ --record-keeper value FWSS contract address. Defaults to network FWSS from go-synapse. [$PDP_RECORD_KEEPER]
+ --pull-timeout value How long to wait for Curio to finish each phase (default: 5m0s)
+ --help, -h show help
```
{% endcode %}
diff --git a/docs/en/cli-reference/run/deal-pusher.md b/docs/en/cli-reference/run/deal-pusher.md
index 05e03498c..88e4990ad 100644
--- a/docs/en/cli-reference/run/deal-pusher.md
+++ b/docs/en/cli-reference/run/deal-pusher.md
@@ -12,10 +12,11 @@ OPTIONS:
--no-automigrate skip automatic database migration and correctness checks on startup; only use if you run 'admin init' on every upgrade or manually before starting daemons (default: false)
--deal-attempts value, -d value Number of times to attempt a deal before giving up (default: 3)
--max-replication-factor value, -M value Max number of replicas for each individual PieceCID across all clients and providers (default: Unlimited)
- --pdp-batch-size value Number of roots to include in each PDP add-roots transaction (default: 128)
- --pdp-max-pieces-per-proofset value Maximum pieces per proof set before handing off to the storage provider (default: 1024)
- --pdp-confirmation-depth value Number of block confirmations required for PDP transactions (default: 5)
- --pdp-poll-interval value Polling interval for PDP transaction confirmation checks (default: 30s)
+ --pdp-batch-size value Number of pieces to include in each /pdp/piece/pull request (default: 128)
+ --pdp-max-pieces-per-proofset value Maximum pieces per proof set before starting a new one (default: 1024)
+ --pdp-pull-timeout value How long to wait for Curio to finish pulling a batch (per request) (default: 5m0s)
+ --pdp-source-url-base value HTTPS base URL where Curio fetches pieces from; sourceUrl is built as /piece/ [$PDP_SOURCE_URL_BASE]
+ --pdp-record-keeper value FWSS contract address (recordKeeper). Defaults to the network default from go-synapse. [$PDP_RECORD_KEEPER]
--eth-rpc value Ethereum RPC endpoint for FEVM (required to execute PDP and DDO schedules on-chain) [$ETH_RPC_URL]
--ddo-contract value DDO Diamond proxy contract address [$DDO_CONTRACT_ADDRESS]
--ddo-payments-contract value DDO Payments proxy contract address [$DDO_PAYMENTS_CONTRACT_ADDRESS]
diff --git a/go.mod b/go.mod
index ed7045460..a509c2123 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@ require (
github.com/bcicen/jstream v1.0.1
github.com/brianvoe/gofakeit/v6 v6.23.2
github.com/cockroachdb/errors v1.11.3
- github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17
+ github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1
github.com/data-preservation-programs/table v0.0.3
github.com/dustin/go-humanize v1.0.1
github.com/ethereum/go-ethereum v1.14.12
diff --git a/go.sum b/go.sum
index e1b31d590..ec240559d 100644
--- a/go.sum
+++ b/go.sum
@@ -275,8 +275,8 @@ github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+
github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7kthcD7UzbnoA=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
-github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17 h1:pTOIdr5W7pHrxFaQdONxHWiCtIOgdc6zfKL82SS4xhI=
-github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc=
+github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1 h1:SGQs5b7eyjycfxKDRHmzZW613BjjRkDT0XITafKK50A=
+github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc=
github.com/data-preservation-programs/table v0.0.3 h1:hboeauxPXybE8KlMA+RjDXz/J4xaG5CAFCcxyOm8yWo=
github.com/data-preservation-programs/table v0.0.3/go.mod h1:sRGP/IuuqFc/y9QfmDyb5h6Q2wrnhhnBofEOj9aDRJg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/model/replication.go b/model/replication.go
index 0134194ba..ec6d63920 100644
--- a/model/replication.go
+++ b/model/replication.go
@@ -219,14 +219,22 @@ const (
// This is a materialized view built from Shovel-indexed events, replacing
// the per-cycle RPC scans of GetProofSets/GetProofSetsForClient.
type PDPProofSet struct {
- SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"`
- ClientAddress string `gorm:"not null;index" json:"clientAddress"`
- Provider string `gorm:"not null" json:"provider"`
- IsLive bool `gorm:"default:false" json:"isLive"`
- ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"`
- CreatedBlock int64 `gorm:"not null" json:"createdBlock"`
- Deleted bool `gorm:"default:false" json:"deleted"`
- HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"`
- ProposedProviderEVM string ` json:"proposedProviderEVM,omitempty"`
- PieceCount int `gorm:"default:0" json:"pieceCount"`
+ SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"`
+ ClientAddress string `gorm:"not null;index" json:"clientAddress"`
+ Provider string `gorm:"not null" json:"provider"`
+ IsLive bool `gorm:"default:false" json:"isLive"`
+ ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"`
+ CreatedBlock int64 `gorm:"not null" json:"createdBlock"`
+ Deleted bool `gorm:"default:false" json:"deleted"`
+ HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"`
+ PieceCount int `gorm:"default:0" json:"pieceCount"`
+ // ClientDataSetID is the per-(payer, set) nonce the client signs into
+ // CreateDataSet/AddPieces extraData. FWSS rejects reused IDs via its
+ // clientNonces[payer][id] check, so we persist before signing to make
+ // retries idempotent. Stored decimal because uint256 doesn't fit any
+ // native int.
+ ClientDataSetID string ` json:"clientDataSetId,omitempty"`
+ // ServiceURL caches the SP's PDP HTTP endpoint, fetched from
+ // ServiceProviderRegistry the first time we push to this provider.
+ ServiceURL string ` json:"serviceUrl,omitempty"`
}
diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go
index 6aba9d0c0..f193febca 100644
--- a/service/dealpusher/dealpusher.go
+++ b/service/dealpusher/dealpusher.go
@@ -40,15 +40,14 @@ var waitPendingInterval = time.Minute
// DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process.
type DealPusher struct {
- dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
- keyStore keystore.KeyStore // Keystore for loading private keys
- lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries
- dealMaker replication.DealMaker // Object responsible for making a deal in replication.
- pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
- pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer.
- pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for root batching and tx confirmation.
- ddoDealManager DDODealManager // Optional DDO deal lifecycle manager.
- ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation.
+ dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
+ keyStore keystore.KeyStore // Keystore for loading private keys
+ lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries
+ dealMaker replication.DealMaker // Object responsible for making a deal in replication.
+ pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
+ pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for batch sizing and pull timeouts.
+ ddoDealManager DDODealManager // Optional DDO deal lifecycle manager.
+ ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation.
// Resolver is injected so tests and future wiring can switch deal type behavior without coupling DealPusher to config storage.
scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType
workerID uuid.UUID // UUID identifying the associated worker.
diff --git a/service/dealpusher/options.go b/service/dealpusher/options.go
index 649dd2f61..ed9fb04c2 100644
--- a/service/dealpusher/options.go
+++ b/service/dealpusher/options.go
@@ -11,12 +11,6 @@ func WithPDPProofSetManager(manager PDPProofSetManager) Option {
}
}
-func WithPDPTransactionConfirmer(confirmer PDPTransactionConfirmer) Option {
- return func(d *DealPusher) {
- d.pdpTxConfirmer = confirmer
- }
-}
-
func WithPDPSchedulingConfig(cfg PDPSchedulingConfig) Option {
return func(d *DealPusher) {
d.pdpSchedulingConfig = cfg
diff --git a/service/dealpusher/pdp_api.go b/service/dealpusher/pdp_api.go
index 412388280..9ea001bd1 100644
--- a/service/dealpusher/pdp_api.go
+++ b/service/dealpusher/pdp_api.go
@@ -3,20 +3,23 @@ package dealpusher
import (
"context"
"errors"
- "math/big"
"time"
"github.com/data-preservation-programs/go-synapse/signer"
- "github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-cid"
)
-// PDPSchedulingConfig holds PDP-specific scheduling knobs for on-chain operations.
+// PDPSchedulingConfig holds PDP-specific scheduling knobs for the FWSS-pull
+// flow.
type PDPSchedulingConfig struct {
- BatchSize int // pieces per addPieces transaction (gas constraint)
- MaxPiecesPerProofSet int // pieces per proof set before handoff to SP
- ConfirmationDepth uint64
- PollingInterval time.Duration
+ // BatchSize bounds pieces per /pdp/piece/pull request.
+ BatchSize int
+ // MaxPiecesPerProofSet bounds pieces per data set; the scheduler starts
+ // a new set when the current one fills.
+ MaxPiecesPerProofSet int
+ // PullTimeout bounds the time we wait for Curio to finish pulling a
+ // batch (per request, not aggregate).
+ PullTimeout time.Duration
}
// Validate validates PDP scheduling configuration.
@@ -27,44 +30,51 @@ func (c PDPSchedulingConfig) Validate() error {
if c.MaxPiecesPerProofSet <= 0 {
return errors.New("pdp max pieces per proof set must be greater than 0")
}
- if c.ConfirmationDepth == 0 {
- return errors.New("pdp confirmation depth must be greater than 0")
- }
- if c.PollingInterval <= 0 {
- return errors.New("pdp polling interval must be greater than 0")
+ if c.PullTimeout <= 0 {
+ return errors.New("pdp pull timeout must be greater than 0")
}
return nil
}
-// PDPProofSetManager defines proof set lifecycle operations needed by scheduling.
-// All methods take an EVMSigner because they submit FEVM transactions.
-type PDPProofSetManager interface {
- // EnsureProofSet returns an existing proof set ID in assembling state or creates one.
- EnsureProofSet(ctx context.Context, evmSigner signer.EVMSigner, provider string, cfg PDPSchedulingConfig) (uint64, error)
- // QueueAddRoots submits root additions for a proof set and returns the queued tx reference.
- // pieceSizes are the padded piece sizes corresponding to each CID, needed for CommPv2 conversion.
- QueueAddRoots(ctx context.Context, evmSigner signer.EVMSigner, proofSetID uint64, pieceCIDs []cid.Cid, pieceSizes []int64, cfg PDPSchedulingConfig) (*PDPQueuedTx, error)
- // ProposeTransfer proposes the proof set for handoff to the given SP.
- ProposeTransfer(ctx context.Context, evmSigner signer.EVMSigner, proofSetID uint64, spEVMAddress common.Address) error
- // IncrementPieceCount atomically increments the durable piece count after confirmed on-chain add.
- IncrementPieceCount(ctx context.Context, proofSetID uint64, count int) error
+// PDPPieceInput names a piece the scheduler wants pushed to the SP. The
+// implementation constructs the SP-side source URL.
+type PDPPieceInput struct {
+ PieceCID cid.Cid
+ // PieceSize is the padded piece size, needed for CommPv2 conversion
+ // before signing.
+ PieceSize int64
}
-// PDPTransactionConfirmer defines confirmation checks for queued on-chain transactions.
-type PDPTransactionConfirmer interface {
- WaitForConfirmations(ctx context.Context, txHash string, depth uint64, pollInterval time.Duration) (*PDPTransactionReceipt, error)
+// PDPPullResult reports the outcome of a /pdp/piece/pull batch.
+type PDPPullResult struct {
+ // DataSetID is the FWSS-listened data set the pieces ended up in.
+ // For new sets, this is the SetID Curio returns after the
+ // createDataSet+addPieces tx confirms.
+ DataSetID uint64
}
-// PDPQueuedTx represents an on-chain transaction submitted by PDP scheduling.
-type PDPQueuedTx struct {
- Hash string
-}
-
-// PDPTransactionReceipt represents a confirmed on-chain transaction result.
-type PDPTransactionReceipt struct {
- Hash string
- BlockNumber uint64
- GasUsed uint64
- Status uint64
- CostAttoFIL *big.Int
+// PDPProofSetManager pushes pieces to an SP's Curio via /pdp/piece/pull,
+// then triggers the SP's on-chain commit via /pdp/data-sets/create-and-add
+// (new sets) or /pdp/data-sets/{id}/pieces (existing). It encapsulates SP
+// service-URL discovery, clientDataSetId persistence, EIP-712 signing,
+// and post-completion bookkeeping.
+type PDPProofSetManager interface {
+ // PullPiecesToFWSS pushes a batch of pieces to the SP via the FWSS-pull
+ // flow. If no assembling proof set has room, a new FWSS-listened set is
+ // created atomically with the first batch; otherwise pieces are added
+ // to the existing assembling set. Blocks until Curio reports the SP
+ // transfer is complete and the on-chain tx confirms (giving us the
+ // dataSetId), or the configured timeout elapses. The returned
+ // DataSetID is the set the pieces landed in.
+ //
+ // evmSigner is the client's secp256k1 wallet; its EVMAddress is the
+ // on-chain payer, and its raw key (via signer.EVMSigner.ECDSAKey)
+ // is used for the EIP-712 extraData signing.
+ PullPiecesToFWSS(
+ ctx context.Context,
+ evmSigner signer.EVMSigner,
+ provider string,
+ pieces []PDPPieceInput,
+ cfg PDPSchedulingConfig,
+ ) (PDPPullResult, error)
}
diff --git a/service/dealpusher/pdp_api_test.go b/service/dealpusher/pdp_api_test.go
index 515c84d48..2af81339c 100644
--- a/service/dealpusher/pdp_api_test.go
+++ b/service/dealpusher/pdp_api_test.go
@@ -12,25 +12,33 @@ func TestPDPSchedulingConfigValidate(t *testing.T) {
cfg := PDPSchedulingConfig{
BatchSize: 100,
MaxPiecesPerProofSet: 1024,
- ConfirmationDepth: 5,
- PollingInterval: 30 * time.Second,
+ PullTimeout: 5 * time.Minute,
}
require.NoError(t, cfg.Validate())
})
- t.Run("invalid", func(t *testing.T) {
+ t.Run("invalid (zero values)", func(t *testing.T) {
cfg := PDPSchedulingConfig{}
require.Error(t, cfg.Validate())
})
t.Run("invalid max pieces per proof set", func(t *testing.T) {
cfg := PDPSchedulingConfig{
- BatchSize: 100,
- ConfirmationDepth: 5,
- PollingInterval: 30 * time.Second,
+ BatchSize: 100,
+ PullTimeout: 5 * time.Minute,
}
err := cfg.Validate()
require.Error(t, err)
require.Contains(t, err.Error(), "pdp max pieces per proof set must be greater than 0")
})
+
+ t.Run("invalid pull timeout", func(t *testing.T) {
+ cfg := PDPSchedulingConfig{
+ BatchSize: 100,
+ MaxPiecesPerProofSet: 1024,
+ }
+ err := cfg.Validate()
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "pdp pull timeout must be greater than 0")
+ })
}
diff --git a/service/dealpusher/pdp_onchain.go b/service/dealpusher/pdp_onchain.go
deleted file mode 100644
index cafd48df8..000000000
--- a/service/dealpusher/pdp_onchain.go
+++ /dev/null
@@ -1,331 +0,0 @@
-package dealpusher
-
-import (
- "context"
- "fmt"
- "math/big"
- "time"
-
- "github.com/cockroachdb/errors"
- synapse "github.com/data-preservation-programs/go-synapse"
- "github.com/data-preservation-programs/go-synapse/constants"
- "github.com/data-preservation-programs/go-synapse/contracts"
- "github.com/data-preservation-programs/go-synapse/pdp"
- "github.com/data-preservation-programs/go-synapse/signer"
- commcid "github.com/filecoin-project/go-fil-commcid"
-
- "github.com/data-preservation-programs/singularity/model"
- "github.com/ethereum/go-ethereum"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/hexutil"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethclient"
- "github.com/filecoin-project/go-address"
- "github.com/ipfs/go-cid"
- "gorm.io/gorm"
-)
-
-// defaultGasLimit is a fixed gas limit for FEVM transactions.
-// FEVM gas estimation (NoSend=true) is unreliable, so we use a fixed value.
-// EVM traces show ~200K gas but FEVM execution needs ~17M due to actor overhead.
-const defaultGasLimit = 30_000_000
-
-type confirmationClient interface {
- TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
- BlockNumber(ctx context.Context) (uint64, error)
-}
-
-// OnChainPDP implements PDP scheduling interfaces using FEVM RPC + go-synapse contracts.
-type OnChainPDP struct {
- dbNoContext *gorm.DB
- ethClient *ethclient.Client
- confirmClient confirmationClient
- network constants.Network
- chainID *big.Int
- contractAddr common.Address
- contract *contracts.PDPVerifier
-}
-
-func NewOnChainPDP(ctx context.Context, db *gorm.DB, rpcURL string) (*OnChainPDP, error) {
- if rpcURL == "" {
- return nil, errors.New("eth rpc URL is required")
- }
-
- ethClient, err := ethclient.DialContext(ctx, rpcURL)
- if err != nil {
- return nil, errors.Wrap(err, "failed to connect to FEVM RPC")
- }
-
- network, chainIDInt64, err := synapse.DetectNetwork(ctx, ethClient)
- if err != nil {
- ethClient.Close()
- return nil, errors.Wrap(err, "failed to detect FEVM network")
- }
-
- contractAddr := constants.GetPDPVerifierAddress(network)
- if contractAddr == (common.Address{}) {
- ethClient.Close()
- return nil, fmt.Errorf("no PDPVerifier contract for network %s", network)
- }
-
- contract, err := contracts.NewPDPVerifier(contractAddr, ethClient)
- if err != nil {
- ethClient.Close()
- return nil, errors.Wrap(err, "failed to bind PDPVerifier contract")
- }
-
- Logger.Infow("initialized PDP on-chain adapter",
- "network", network,
- "chainId", chainIDInt64,
- "contract", contractAddr.Hex(),
- )
-
- return &OnChainPDP{
- dbNoContext: db,
- ethClient: ethClient,
- confirmClient: ethClient,
- network: network,
- chainID: big.NewInt(chainIDInt64),
- contractAddr: contractAddr,
- contract: contract,
- }, nil
-}
-
-func (o *OnChainPDP) Close() error {
- if o.ethClient != nil {
- o.ethClient.Close()
- }
- return nil
-}
-
-func (o *OnChainPDP) newManager(ctx context.Context, evmSigner signer.EVMSigner) (*pdp.Manager, error) {
- cfg := pdp.DefaultManagerConfig()
- cfg.ContractAddress = o.contractAddr
- cfg.DefaultGasLimit = defaultGasLimit
- mgr, err := pdp.NewManagerWithConfig(ctx, o.ethClient, evmSigner, o.network, &cfg)
- if err != nil {
- return nil, errors.Wrap(err, "failed to initialize PDP proof set manager")
- }
- return mgr, nil
-}
-
-// findProofSetWithRoom finds an assembling proof set with room for more pieces.
-// uses the durable PieceCount field rather than inferring occupancy from deal states.
-func (o *OnChainPDP) findProofSetWithRoom(ctx context.Context, clientAddress string, provider string, maxPieces int) (uint64, bool, error) {
- if maxPieces <= 0 {
- return 0, false, errors.New("max pieces per proof set must be greater than 0")
- }
-
- var ps model.PDPProofSet
- err := o.dbNoContext.WithContext(ctx).
- Where("client_address = ? AND provider = ? AND deleted = FALSE AND handoff_state = ?",
- clientAddress, provider, model.ProofSetAssembling).
- Where("piece_count < ?", maxPieces).
- Order("set_id").
- First(&ps).Error
- if err == nil {
- return ps.SetID, true, nil
- }
- if errors.Is(err, gorm.ErrRecordNotFound) {
- return 0, false, nil
- }
- return 0, false, errors.Wrap(err, "failed to query assembling PDP proof sets")
-}
-
-func (o *OnChainPDP) EnsureProofSet(ctx context.Context, evmSigner signer.EVMSigner, provider string, cfg PDPSchedulingConfig) (uint64, error) {
- clientAddr, err := commonToDelegatedAddress(evmSigner.EVMAddress())
- if err != nil {
- return 0, errors.Wrap(err, "failed to derive delegated client address from signer")
- }
- existingSetID, found, err := o.findProofSetWithRoom(ctx, clientAddr.String(), provider, cfg.MaxPiecesPerProofSet)
- if err != nil {
- return 0, err
- }
- if found {
- return existingSetID, nil
- }
-
- manager, err := o.newManager(ctx, evmSigner)
- if err != nil {
- return 0, err
- }
-
- result, err := manager.CreateProofSet(ctx, pdp.CreateProofSetOptions{
- Value: pdp.SybilFee,
- })
- if err != nil {
- return 0, errors.Wrap(err, "failed to create PDP proof set")
- }
-
- setID := result.ProofSetID.Uint64()
- row := model.PDPProofSet{
- SetID: setID,
- ClientAddress: clientAddr.String(),
- Provider: provider,
- HandoffState: model.ProofSetAssembling,
- }
- if result.Receipt != nil && result.Receipt.BlockNumber != nil {
- row.CreatedBlock = int64(result.Receipt.BlockNumber.Uint64())
- }
-
- if err := o.dbNoContext.WithContext(ctx).Where("set_id = ?", setID).Assign(row).FirstOrCreate(&model.PDPProofSet{}).Error; err != nil {
- return 0, errors.Wrap(err, "failed to persist created PDP proof set")
- }
- return setID, nil
-}
-
-func (o *OnChainPDP) ProposeTransfer(ctx context.Context, evmSigner signer.EVMSigner, proofSetID uint64, spEVMAddress common.Address) error {
- txOpts, err := evmSigner.Transactor(o.chainID)
- if err != nil {
- return errors.Wrap(err, "failed to create transactor for propose transfer")
- }
- txOpts.GasLimit = defaultGasLimit
-
- tx, err := o.contract.ProposeDataSetStorageProvider(txOpts, new(big.Int).SetUint64(proofSetID), spEVMAddress)
- if err != nil {
- return errors.Wrap(err, "failed to submit propose-transfer transaction")
- }
-
- Logger.Infow("proposed proof set transfer",
- "proofSetID", proofSetID,
- "spEVMAddress", spEVMAddress.Hex(),
- "txHash", tx.Hash().Hex(),
- )
-
- // update local state -- the SP claim is tracked via StorageProviderChanged event
- return o.dbNoContext.WithContext(ctx).
- Model(&model.PDPProofSet{}).
- Where("set_id = ?", proofSetID).
- Updates(map[string]any{
- "handoff_state": model.ProofSetProposed,
- "proposed_provider_evm": spEVMAddress.Hex(),
- }).Error
-}
-
-func (o *OnChainPDP) IncrementPieceCount(ctx context.Context, proofSetID uint64, count int) error {
- return o.dbNoContext.WithContext(ctx).
- Model(&model.PDPProofSet{}).
- Where("set_id = ?", proofSetID).
- Update("piece_count", gorm.Expr("piece_count + ?", count)).Error
-}
-
-func (o *OnChainPDP) QueueAddRoots(
- ctx context.Context,
- evmSigner signer.EVMSigner,
- proofSetID uint64,
- pieceCIDs []cid.Cid,
- pieceSizes []int64,
- cfg PDPSchedulingConfig,
-) (*PDPQueuedTx, error) {
- if len(pieceCIDs) == 0 {
- return nil, errors.New("no piece CIDs provided")
- }
- if len(pieceCIDs) != len(pieceSizes) {
- return nil, fmt.Errorf("pieceCIDs length %d != pieceSizes length %d", len(pieceCIDs), len(pieceSizes))
- }
- if cfg.BatchSize > 0 && len(pieceCIDs) > cfg.BatchSize {
- return nil, fmt.Errorf("piece CID count %d exceeds configured PDP batch size %d", len(pieceCIDs), cfg.BatchSize)
- }
-
- // convert commP v1 CIDs to CommPv2 format expected by PDPVerifier contract
- roots := make([]pdp.Root, len(pieceCIDs))
- for i, pieceCID := range pieceCIDs {
- payloadSize := uint64(pieceSizes[i]) * 127 / 128
- v2CID, err := commcid.PieceCidV2FromV1(pieceCID, payloadSize)
- if err != nil {
- return nil, errors.Wrapf(err, "failed to convert piece CID %s to CommPv2", pieceCID)
- }
- roots[i] = pdp.Root{PieceCID: v2CID}
- }
-
- manager, err := o.newManager(ctx, evmSigner)
- if err != nil {
- return nil, err
- }
-
- setIDBig := new(big.Int).SetUint64(proofSetID)
- result, err := manager.AddRoots(ctx, setIDBig, roots)
- if err != nil {
- return nil, errors.Wrap(err, "failed to submit PDP add-roots transaction")
- }
-
- return &PDPQueuedTx{Hash: result.TransactionHash.Hex()}, nil
-}
-
-func (o *OnChainPDP) WaitForConfirmations(
- ctx context.Context,
- txHash string,
- depth uint64,
- pollInterval time.Duration,
-) (*PDPTransactionReceipt, error) {
- rawHash, err := hexutil.Decode(txHash)
- if err != nil || len(rawHash) != common.HashLength {
- return nil, fmt.Errorf("invalid tx hash %q", txHash)
- }
- if pollInterval <= 0 {
- pollInterval = 2 * time.Second
- }
-
- hash := common.HexToHash(txHash)
- ticker := time.NewTicker(pollInterval)
- defer ticker.Stop()
-
- for {
- select {
- case <-ctx.Done():
- return nil, errors.Wrap(ctx.Err(), "context canceled while waiting for PDP transaction confirmation")
- case <-ticker.C:
- receipt, err := o.confirmClient.TransactionReceipt(ctx, hash)
- if err != nil {
- if errors.Is(err, ethereum.NotFound) {
- continue
- }
- return nil, errors.Wrap(err, "failed to fetch transaction receipt")
- }
-
- out := toPDPReceipt(txHash, receipt)
- if receipt.Status != types.ReceiptStatusSuccessful {
- return out, fmt.Errorf("transaction %s failed with status %d", txHash, receipt.Status)
- }
- if depth == 0 {
- return out, nil
- }
-
- currentBlock, err := o.confirmClient.BlockNumber(ctx)
- if err != nil {
- return nil, errors.Wrap(err, "failed to fetch latest block number")
- }
- if receipt.BlockNumber != nil && receipt.BlockNumber.Uint64()+depth <= currentBlock {
- return out, nil
- }
- }
- }
-}
-
-func toPDPReceipt(txHash string, receipt *types.Receipt) *PDPTransactionReceipt {
- out := &PDPTransactionReceipt{
- Hash: txHash,
- }
- if receipt == nil {
- return out
- }
- out.Status = receipt.Status
- out.GasUsed = receipt.GasUsed
- if receipt.BlockNumber != nil {
- out.BlockNumber = receipt.BlockNumber.Uint64()
- }
- if receipt.EffectiveGasPrice != nil {
- out.CostAttoFIL = new(big.Int).Mul(new(big.Int).SetUint64(receipt.GasUsed), receipt.EffectiveGasPrice)
- } else {
- out.CostAttoFIL = big.NewInt(0)
- }
- return out
-}
-
-func commonToDelegatedAddress(subaddr common.Address) (address.Address, error) {
- addr, err := address.NewDelegatedAddress(10, subaddr.Bytes())
- if err != nil {
- return address.Undef, errors.Wrap(err, "failed to encode delegated address")
- }
- return addr, nil
-}
diff --git a/service/dealpusher/pdp_onchain_test.go b/service/dealpusher/pdp_onchain_test.go
deleted file mode 100644
index 0f56145ab..000000000
--- a/service/dealpusher/pdp_onchain_test.go
+++ /dev/null
@@ -1,186 +0,0 @@
-package dealpusher
-
-import (
- "context"
- "math/big"
- "testing"
- "time"
-
- "github.com/data-preservation-programs/singularity/model"
- "github.com/data-preservation-programs/singularity/util/testutil"
- "github.com/ethereum/go-ethereum"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/stretchr/testify/require"
- "gorm.io/gorm"
-)
-
-type fakeConfirmationClient struct {
- txReceiptFn func(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
- blockNumFn func(ctx context.Context) (uint64, error)
-}
-
-func (f *fakeConfirmationClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
- return f.txReceiptFn(ctx, txHash)
-}
-
-func (f *fakeConfirmationClient) BlockNumber(ctx context.Context) (uint64, error) {
- return f.blockNumFn(ctx)
-}
-
-func TestOnChainPDP_WaitForConfirmations(t *testing.T) {
- var receiptCalls int
- var blockCalls int
-
- c := &fakeConfirmationClient{
- txReceiptFn: func(_ context.Context, _ common.Hash) (*types.Receipt, error) {
- receiptCalls++
- if receiptCalls == 1 {
- return nil, ethereum.NotFound
- }
- return &types.Receipt{
- Status: types.ReceiptStatusSuccessful,
- GasUsed: 10,
- EffectiveGasPrice: big.NewInt(3),
- BlockNumber: big.NewInt(100),
- }, nil
- },
- blockNumFn: func(_ context.Context) (uint64, error) {
- blockCalls++
- if blockCalls == 1 {
- return 101, nil
- }
- return 102, nil
- },
- }
-
- adapter := &OnChainPDP{confirmClient: c}
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
-
- receipt, err := adapter.WaitForConfirmations(ctx, "0x1111111111111111111111111111111111111111111111111111111111111111", 2, time.Millisecond)
- require.NoError(t, err)
- require.NotNil(t, receipt)
- require.Equal(t, uint64(100), receipt.BlockNumber)
- require.Equal(t, uint64(10), receipt.GasUsed)
- require.Equal(t, uint64(types.ReceiptStatusSuccessful), receipt.Status)
- require.Equal(t, "30", receipt.CostAttoFIL.String())
-}
-
-func TestOnChainPDP_WaitForConfirmationsFailedTx(t *testing.T) {
- c := &fakeConfirmationClient{
- txReceiptFn: func(_ context.Context, _ common.Hash) (*types.Receipt, error) {
- return &types.Receipt{
- Status: types.ReceiptStatusFailed,
- GasUsed: 5,
- EffectiveGasPrice: big.NewInt(2),
- BlockNumber: big.NewInt(10),
- }, nil
- },
- blockNumFn: func(_ context.Context) (uint64, error) {
- return 10, nil
- },
- }
-
- adapter := &OnChainPDP{confirmClient: c}
- ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
- defer cancel()
-
- receipt, err := adapter.WaitForConfirmations(ctx, "0x2222222222222222222222222222222222222222222222222222222222222222", 1, time.Millisecond)
- require.Error(t, err)
- require.NotNil(t, receipt)
- require.Equal(t, uint64(types.ReceiptStatusFailed), receipt.Status)
-}
-
-func TestOnChainPDP_WaitForConfirmationsInvalidHash(t *testing.T) {
- adapter := &OnChainPDP{}
- _, err := adapter.WaitForConfirmations(context.Background(), "not-a-hash", 1, time.Millisecond)
- require.Error(t, err)
-}
-
-func TestOnChainPDP_FindProofSetWithRoom_NoProofSets(t *testing.T) {
- testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
- adapter := &OnChainPDP{dbNoContext: db}
- setID, found, err := adapter.findProofSetWithRoom(ctx, "f410foo", "f410provider", 2)
- require.NoError(t, err)
- require.False(t, found)
- require.Zero(t, setID)
- })
-}
-
-func TestOnChainPDP_FindProofSetWithRoom_PicksAssemblingWithCapacity(t *testing.T) {
- testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
- client := "f410foo"
- provider := "f410provider"
- // set 1: assembling but full
- require.NoError(t, db.Create(&model.PDPProofSet{
- SetID: 1,
- ClientAddress: client,
- Provider: provider,
- HandoffState: model.ProofSetAssembling,
- PieceCount: 5,
- }).Error)
- // set 2: assembling with room
- require.NoError(t, db.Create(&model.PDPProofSet{
- SetID: 2,
- ClientAddress: client,
- Provider: provider,
- HandoffState: model.ProofSetAssembling,
- PieceCount: 3,
- }).Error)
-
- adapter := &OnChainPDP{dbNoContext: db}
- setID, found, err := adapter.findProofSetWithRoom(ctx, client, provider, 5)
- require.NoError(t, err)
- require.True(t, found)
- require.EqualValues(t, 2, setID)
- })
-}
-
-func TestOnChainPDP_FindProofSetWithRoom_SkipsProposedAndTransferred(t *testing.T) {
- testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
- client := "f410foo"
- provider := "f410provider"
- // set 1: proposed -- should be skipped even though it has room
- require.NoError(t, db.Create(&model.PDPProofSet{
- SetID: 1,
- ClientAddress: client,
- Provider: provider,
- HandoffState: model.ProofSetProposed,
- PieceCount: 1,
- }).Error)
- // set 2: transferred -- should be skipped
- require.NoError(t, db.Create(&model.PDPProofSet{
- SetID: 2,
- ClientAddress: client,
- Provider: provider,
- HandoffState: model.ProofSetTransferred,
- PieceCount: 1,
- }).Error)
-
- adapter := &OnChainPDP{dbNoContext: db}
- setID, found, err := adapter.findProofSetWithRoom(ctx, client, provider, 10)
- require.NoError(t, err)
- require.False(t, found)
- require.Zero(t, setID)
- })
-}
-
-func TestOnChainPDP_IncrementPieceCount(t *testing.T) {
- testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
- require.NoError(t, db.Create(&model.PDPProofSet{
- SetID: 1,
- ClientAddress: "f410foo",
- Provider: "f410provider",
- HandoffState: model.ProofSetAssembling,
- PieceCount: 5,
- }).Error)
-
- adapter := &OnChainPDP{dbNoContext: db}
- require.NoError(t, adapter.IncrementPieceCount(ctx, 1, 3))
-
- var ps model.PDPProofSet
- require.NoError(t, db.First(&ps, "set_id = ?", 1).Error)
- require.Equal(t, 8, ps.PieceCount)
- })
-}
diff --git a/service/dealpusher/pdp_pull.go b/service/dealpusher/pdp_pull.go
new file mode 100644
index 000000000..9db96f14f
--- /dev/null
+++ b/service/dealpusher/pdp_pull.go
@@ -0,0 +1,461 @@
+package dealpusher
+
+import (
+ "context"
+ "crypto/rand"
+ "fmt"
+ "math/big"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/cockroachdb/errors"
+ synapse "github.com/data-preservation-programs/go-synapse"
+ "github.com/data-preservation-programs/go-synapse/constants"
+ "github.com/data-preservation-programs/go-synapse/pdp"
+ "github.com/data-preservation-programs/go-synapse/signer"
+ "github.com/data-preservation-programs/go-synapse/spregistry"
+ "github.com/data-preservation-programs/singularity/model"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethclient"
+ "github.com/filecoin-project/go-address"
+ commcid "github.com/filecoin-project/go-fil-commcid"
+ "github.com/ipfs/go-cid"
+ "gorm.io/gorm"
+)
+
+// OnChainPDPConfig configures the FWSS-pull adapter.
+type OnChainPDPConfig struct {
+ // DB is required.
+ DB *gorm.DB
+ // RPCURL is the FEVM JSON-RPC endpoint (read-only; we never submit txs).
+ RPCURL string
+ // SourceURLBase is the HTTPS base singularity serves pieces from. The
+ // per-piece URL is constructed as /piece/. Required.
+ SourceURLBase string
+ // RecordKeeper is the FWSS contract address (hex). Empty defaults to
+ // the network FWSS from go-synapse constants.
+ RecordKeeper string
+}
+
+// OnChainPDP drives the FWSS-mediated pull flow. We never submit any
+// PDPVerifier tx ourselves: the SP downloads pieces from our content
+// provider via /pdp/piece/pull, then commits on-chain from its own wallet
+// via /pdp/data-sets/create-and-add (new sets) or /pdp/data-sets/{id}/pieces
+// (existing). The only EVM RPC use is the ServiceProviderRegistry view
+// call that resolves an SP's PDP service URL.
+type OnChainPDP struct {
+ db *gorm.DB
+ ethClient *ethclient.Client
+ network constants.Network
+ chainID *big.Int
+ recordKeeper common.Address
+ sourceURLBase string
+ spRegistry *spregistry.Service
+
+ spURLCacheMu sync.Mutex
+ spURLCache map[string]spInfo // delegated f4 provider string -> info
+}
+
+type spInfo struct {
+ serviceURL string
+ payee common.Address
+}
+
+func NewOnChainPDP(ctx context.Context, cfg OnChainPDPConfig) (*OnChainPDP, error) {
+ if cfg.DB == nil {
+ return nil, errors.New("db is required")
+ }
+ if cfg.RPCURL == "" {
+ return nil, errors.New("eth rpc URL is required")
+ }
+ if cfg.SourceURLBase == "" {
+ return nil, errors.New("source URL base is required (--pdp-source-url-base / PDP_SOURCE_URL_BASE)")
+ }
+
+ ethClient, err := ethclient.DialContext(ctx, cfg.RPCURL)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to connect to FEVM RPC")
+ }
+
+ network, chainIDInt64, err := synapse.DetectNetwork(ctx, ethClient)
+ if err != nil {
+ ethClient.Close()
+ return nil, errors.Wrap(err, "failed to detect FEVM network")
+ }
+ chainID := big.NewInt(chainIDInt64)
+
+ recordKeeper := common.HexToAddress(cfg.RecordKeeper)
+ if recordKeeper == (common.Address{}) {
+ recordKeeper = constants.WarmStorageAddresses[network]
+ if recordKeeper == (common.Address{}) {
+ ethClient.Close()
+ return nil, fmt.Errorf("no FWSS recordKeeper address for network %s; set --pdp-record-keeper", network)
+ }
+ }
+
+ spRegAddr := constants.SPRegistryAddresses[network]
+ if spRegAddr == (common.Address{}) {
+ ethClient.Close()
+ return nil, fmt.Errorf("no ServiceProviderRegistry address for network %s", network)
+ }
+ spReg, err := spregistry.NewService(ethClient, spRegAddr, nil, chainID)
+ if err != nil {
+ ethClient.Close()
+ return nil, errors.Wrap(err, "failed to bind ServiceProviderRegistry")
+ }
+
+ Logger.Infow("initialized FWSS-pull adapter",
+ "network", network,
+ "chainId", chainIDInt64,
+ "recordKeeper", recordKeeper.Hex(),
+ "spRegistry", spRegAddr.Hex(),
+ "sourceURLBase", cfg.SourceURLBase,
+ )
+
+ return &OnChainPDP{
+ db: cfg.DB,
+ ethClient: ethClient,
+ network: network,
+ chainID: chainID,
+ recordKeeper: recordKeeper,
+ sourceURLBase: strings.TrimSuffix(cfg.SourceURLBase, "/"),
+ spRegistry: spReg,
+ spURLCache: map[string]spInfo{},
+ }, nil
+}
+
+func (o *OnChainPDP) Close() error {
+ if o.ethClient != nil {
+ o.ethClient.Close()
+ }
+ return nil
+}
+
+// PullPiecesToFWSS implements PDPProofSetManager.
+func (o *OnChainPDP) PullPiecesToFWSS(
+ ctx context.Context,
+ evmSigner signer.EVMSigner,
+ provider string,
+ pieces []PDPPieceInput,
+ cfg PDPSchedulingConfig,
+) (PDPPullResult, error) {
+ if evmSigner == nil {
+ return PDPPullResult{}, errors.New("evm signer is required")
+ }
+ if len(pieces) == 0 {
+ return PDPPullResult{}, errors.New("no pieces provided")
+ }
+ if cfg.BatchSize > 0 && len(pieces) > cfg.BatchSize {
+ return PDPPullResult{}, fmt.Errorf("piece count %d exceeds configured batch size %d", len(pieces), cfg.BatchSize)
+ }
+
+ payerAddr := evmSigner.EVMAddress()
+ clientDelegated, err := commonToDelegatedAddress(payerAddr)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "derive delegated payer address")
+ }
+ clientAddrStr := clientDelegated.String()
+
+ info, err := o.lookupSPInfo(ctx, provider)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "SP service-URL lookup")
+ }
+
+ // Convert v1 piece CIDs to CommPv2 (what FWSS / Curio expect).
+ pieceCIDsV2 := make([]cid.Cid, len(pieces))
+ pullInputs := make([]pdp.PullPieceInput, len(pieces))
+ for i, p := range pieces {
+ // FR32 padding: padded = raw * 128/127, raw = padded * 127/128.
+ payloadSize := uint64(p.PieceSize) * 127 / 128
+ v2, err := commcid.PieceCidV2FromV1(p.PieceCID, payloadSize)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrapf(err, "convert piece %s to CommPv2", p.PieceCID)
+ }
+ pieceCIDsV2[i] = v2
+ pullInputs[i] = pdp.PullPieceInput{
+ PieceCID: v2.String(),
+ SourceURL: o.sourceURLBase + "/piece/" + v2.String(),
+ }
+ }
+
+ existing, err := o.findProofSetWithRoom(ctx, clientAddrStr, provider, cfg.MaxPiecesPerProofSet)
+ if err != nil {
+ return PDPPullResult{}, err
+ }
+
+ authHelper := pdp.NewAuthHelper(evmSigner.SignDigest, payerAddr, o.recordKeeper, o.chainID)
+ pdpServer := pdp.NewServer(info.serviceURL)
+
+ if existing != nil {
+ return o.addToExisting(ctx, pdpServer, authHelper, existing, pullInputs, pieceCIDsV2, cfg)
+ }
+
+ return o.createNewSet(ctx, pdpServer, authHelper, payerAddr, info.payee, provider, clientAddrStr, info.serviceURL, pullInputs, pieceCIDsV2, cfg)
+}
+
+// addToExisting adds pieces to an assembling proof set we've already
+// created. extraData is the AddPieces blob alone.
+func (o *OnChainPDP) addToExisting(
+ ctx context.Context,
+ pdpServer *pdp.Server,
+ authHelper *pdp.AuthHelper,
+ existing *model.PDPProofSet,
+ pullInputs []pdp.PullPieceInput,
+ pieceCIDsV2 []cid.Cid,
+ cfg PDPSchedulingConfig,
+) (PDPPullResult, error) {
+ clientDataSetID, ok := new(big.Int).SetString(existing.ClientDataSetID, 10)
+ if !ok || clientDataSetID == nil {
+ return PDPPullResult{}, fmt.Errorf("proof set %d has invalid clientDataSetID %q", existing.SetID, existing.ClientDataSetID)
+ }
+
+ addExtra, err := signAddPiecesExtra(authHelper, clientDataSetID, pieceCIDsV2)
+ if err != nil {
+ return PDPPullResult{}, err
+ }
+
+ if err := waitForPullComplete(ctx, pdpServer, pdp.PullPiecesOptions{
+ RecordKeeper: o.recordKeeper.Hex(),
+ Pieces: pullInputs,
+ ExtraData: addExtra,
+ DataSetID: existing.SetID,
+ }, cfg.PullTimeout); err != nil {
+ return PDPPullResult{}, err
+ }
+
+ addResp, err := pdpServer.AddPieces(ctx, int(existing.SetID), pieceCIDsV2, addExtra)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "POST /pdp/data-sets/{id}/pieces")
+ }
+ status, err := pdpServer.WaitForPieceAddition(ctx, int(existing.SetID), addResp.TxHash, cfg.PullTimeout)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "wait for add-pieces tx")
+ }
+ if status.AddMessageOK == nil || !*status.AddMessageOK {
+ return PDPPullResult{}, fmt.Errorf("add-pieces tx %s did not confirm successfully", addResp.TxHash)
+ }
+
+ if err := o.db.WithContext(ctx).
+ Model(&model.PDPProofSet{}).
+ Where("set_id = ?", existing.SetID).
+ UpdateColumn("piece_count", gorm.Expr("piece_count + ?", len(pieceCIDsV2))).Error; err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "increment piece_count")
+ }
+
+ return PDPPullResult{DataSetID: existing.SetID}, nil
+}
+
+// createNewSet atomically creates a FWSS-listened proof set with the first
+// batch of pieces. extraData is the abi.encode(bytes,bytes) of the
+// CreateDataSet and AddPieces extras.
+func (o *OnChainPDP) createNewSet(
+ ctx context.Context,
+ pdpServer *pdp.Server,
+ authHelper *pdp.AuthHelper,
+ payerAddr common.Address,
+ payee common.Address,
+ provider string,
+ clientAddrStr string,
+ serviceURL string,
+ pullInputs []pdp.PullPieceInput,
+ pieceCIDsV2 []cid.Cid,
+ cfg PDPSchedulingConfig,
+) (PDPPullResult, error) {
+ clientDataSetID := randomClientDataSetID()
+
+ createExtra, err := signCreateDataSetExtra(authHelper, payerAddr, payee, clientDataSetID)
+ if err != nil {
+ return PDPPullResult{}, err
+ }
+ addExtra, err := signAddPiecesExtra(authHelper, clientDataSetID, pieceCIDsV2)
+ if err != nil {
+ return PDPPullResult{}, err
+ }
+ combined, err := pdp.EncodeCreateDataSetAndAddPiecesExtraData(createExtra, addExtra)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "wrap create+add extra data")
+ }
+
+ if err := waitForPullComplete(ctx, pdpServer, pdp.PullPiecesOptions{
+ RecordKeeper: o.recordKeeper.Hex(),
+ Pieces: pullInputs,
+ ExtraData: combined,
+ DataSetID: 0,
+ }, cfg.PullTimeout); err != nil {
+ return PDPPullResult{}, err
+ }
+
+ createResp, err := pdpServer.CreateDataSetAndAddPieces(ctx, o.recordKeeper.Hex(), pieceCIDsV2, combined)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "POST /pdp/data-sets/create-and-add")
+ }
+
+ status, err := pdpServer.WaitForDataSetCreation(ctx, createResp.TxHash, cfg.PullTimeout)
+ if err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "wait for create-and-add tx")
+ }
+ if status.DataSetID == nil {
+ return PDPPullResult{}, fmt.Errorf("create-and-add tx %s confirmed but no dataSetId returned", createResp.TxHash)
+ }
+ dataSetID := uint64(*status.DataSetID)
+
+ row := model.PDPProofSet{
+ SetID: dataSetID,
+ ClientAddress: clientAddrStr,
+ Provider: provider,
+ HandoffState: model.ProofSetAssembling,
+ ClientDataSetID: clientDataSetID.String(),
+ ServiceURL: serviceURL,
+ PieceCount: len(pieceCIDsV2),
+ }
+ // Upsert so we win the race against pdptracker materializing the same
+ // SetID from the DataSetCreated event with listener-as-client_address.
+ if err := o.db.WithContext(ctx).
+ Where("set_id = ?", dataSetID).
+ Assign(row).
+ FirstOrCreate(&model.PDPProofSet{}).Error; err != nil {
+ return PDPPullResult{}, errors.Wrap(err, "persist new proof set")
+ }
+
+ return PDPPullResult{DataSetID: dataSetID}, nil
+}
+
+// waitForPullComplete blocks until the SP-side piece transfer reports
+// complete, fails permanently, or times out.
+func waitForPullComplete(ctx context.Context, pdpServer *pdp.Server, opts pdp.PullPiecesOptions, timeout time.Duration) error {
+ resp, err := pdpServer.WaitForPullPieces(ctx, opts, timeout)
+ if err != nil {
+ return errors.Wrap(err, "wait for /pdp/piece/pull")
+ }
+ switch resp.Status {
+ case pdp.PullStatusComplete:
+ return nil
+ case pdp.PullStatusFailed:
+ return fmt.Errorf("/pdp/piece/pull reported failed for %d pieces", len(opts.Pieces))
+ default:
+ return fmt.Errorf("/pdp/piece/pull timed out at status %q", resp.Status)
+ }
+}
+
+func signCreateDataSetExtra(authHelper *pdp.AuthHelper, payer, payee common.Address, clientDataSetID *big.Int) (string, error) {
+ sig, err := authHelper.SignCreateDataSet(clientDataSetID, payee, nil)
+ if err != nil {
+ return "", errors.Wrap(err, "sign CreateDataSet")
+ }
+ return pdp.EncodeDataSetCreateData(payer, clientDataSetID, nil, sig.Signature)
+}
+
+func signAddPiecesExtra(authHelper *pdp.AuthHelper, clientDataSetID *big.Int, pieceCIDsV2 []cid.Cid) (string, error) {
+ // FWSS uses (payer, clientDataSetId) as the cross-tx replay key, not
+ // the addPieces in-extraData nonce; zero is fine here.
+ nonce := big.NewInt(0)
+ sig, err := authHelper.SignAddPieces(clientDataSetID, nonce, pieceCIDsV2, nil)
+ if err != nil {
+ return "", errors.Wrap(err, "sign AddPieces")
+ }
+ return pdp.EncodeAddPiecesExtraData(nonce, nil, sig.Signature)
+}
+
+func (o *OnChainPDP) findProofSetWithRoom(ctx context.Context, clientAddress, provider string, maxPieces int) (*model.PDPProofSet, error) {
+ if maxPieces <= 0 {
+ return nil, errors.New("max pieces per proof set must be > 0")
+ }
+ var ps model.PDPProofSet
+ err := o.db.WithContext(ctx).
+ Where("client_address = ? AND provider = ? AND deleted = FALSE AND handoff_state = ?",
+ clientAddress, provider, model.ProofSetAssembling).
+ Where("piece_count < ?", maxPieces).
+ Order("set_id").
+ First(&ps).Error
+ if err == nil {
+ return &ps, nil
+ }
+ if errors.Is(err, gorm.ErrRecordNotFound) {
+ return nil, nil
+ }
+ return nil, errors.Wrap(err, "query assembling proof sets")
+}
+
+func (o *OnChainPDP) lookupSPInfo(ctx context.Context, provider string) (spInfo, error) {
+ o.spURLCacheMu.Lock()
+ if cached, ok := o.spURLCache[provider]; ok {
+ o.spURLCacheMu.Unlock()
+ return cached, nil
+ }
+ o.spURLCacheMu.Unlock()
+
+ // schedule.Provider is the t0 form after StateLookupID; resolve to
+ // the delegated (f410) form via lotus if it's not already.
+ delegated := provider
+ if addr, perr := address.NewFromString(provider); perr != nil || addr.Protocol() != address.Delegated {
+ var robust string
+ if rerr := o.ethClient.Client().CallContext(ctx, &robust, "Filecoin.StateLookupRobustAddress", provider, nil); rerr != nil {
+ return spInfo{}, errors.Wrapf(rerr, "resolve robust address for provider %s", provider)
+ }
+ delegated = robust
+ }
+
+ evmAddr, err := delegatedToCommonAddress(delegated)
+ if err != nil {
+ return spInfo{}, errors.Wrapf(err, "decode provider address %s", delegated)
+ }
+
+ pi, err := o.spRegistry.GetProviderByAddress(ctx, evmAddr)
+ if err != nil {
+ return spInfo{}, errors.Wrapf(err, "ServiceProviderRegistry.getProviderByAddress(%s)", evmAddr.Hex())
+ }
+ if pi == nil {
+ return spInfo{}, fmt.Errorf("provider %s not registered in ServiceProviderRegistry", provider)
+ }
+ pdpProduct, ok := pi.Products["PDP"]
+ if !ok || pdpProduct == nil || pdpProduct.Data == nil {
+ return spInfo{}, fmt.Errorf("provider %s has no PDP product registered", provider)
+ }
+ if !pdpProduct.IsActive {
+ return spInfo{}, fmt.Errorf("provider %s PDP product is inactive", provider)
+ }
+ if pdpProduct.Data.ServiceURL == "" {
+ return spInfo{}, fmt.Errorf("provider %s has no PDP serviceURL registered", provider)
+ }
+
+ info := spInfo{
+ serviceURL: pdpProduct.Data.ServiceURL,
+ payee: pi.Payee,
+ }
+ o.spURLCacheMu.Lock()
+ o.spURLCache[provider] = info
+ o.spURLCacheMu.Unlock()
+ return info, nil
+}
+
+func delegatedToCommonAddress(s string) (common.Address, error) {
+ addr, err := address.NewFromString(s)
+ if err != nil {
+ return common.Address{}, err
+ }
+ if addr.Protocol() != address.Delegated {
+ return common.Address{}, fmt.Errorf("address %s is not delegated (f410)", s)
+ }
+ payload := addr.Payload()
+ if len(payload) < 21 {
+ return common.Address{}, fmt.Errorf("delegated payload too short: %d", len(payload))
+ }
+ return common.BytesToAddress(payload[1:21]), nil
+}
+
+func commonToDelegatedAddress(subaddr common.Address) (address.Address, error) {
+ addr, err := address.NewDelegatedAddress(10, subaddr.Bytes())
+ if err != nil {
+ return address.Undef, errors.Wrap(err, "encode delegated address")
+ }
+ return addr, nil
+}
+
+func randomClientDataSetID() *big.Int {
+ b := make([]byte, 32)
+ if _, err := rand.Read(b); err != nil {
+ panic(fmt.Sprintf("crypto/rand failure: %v", err))
+ }
+ return new(big.Int).SetBytes(b)
+}
diff --git a/service/dealpusher/pdp_schedule.go b/service/dealpusher/pdp_schedule.go
index d58af3a6e..62c92443d 100644
--- a/service/dealpusher/pdp_schedule.go
+++ b/service/dealpusher/pdp_schedule.go
@@ -11,8 +11,6 @@ import (
"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/util"
"github.com/data-preservation-programs/singularity/util/keystore"
- "github.com/ethereum/go-ethereum/common"
- "github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
"github.com/rjNemo/underscore"
"gorm.io/gorm"
@@ -24,8 +22,7 @@ func defaultPDPSchedulingConfig() PDPSchedulingConfig {
return PDPSchedulingConfig{
BatchSize: 128,
MaxPiecesPerProofSet: 1024,
- ConfirmationDepth: 5,
- PollingInterval: 30 * time.Second,
+ PullTimeout: 5 * time.Minute,
}
}
@@ -65,37 +62,8 @@ func (d *DealPusher) validatePDPPreparationPieceSizes(ctx context.Context, sched
return errors.Wrap(err, "failed to validate preparation piece sizes for PDP")
}
-// resolveProviderEVMAddress looks up the provider's Actor record and derives
-// the EVM address from its delegated (f410) filecoin address.
-func (d *DealPusher) resolveProviderEVMAddress(ctx context.Context, provider string) (common.Address, error) {
- db := d.dbNoContext.WithContext(ctx)
-
- var actor model.Actor
- err := db.Where("address = ? OR id = ?", provider, provider).First(&actor).Error
- if err != nil {
- return common.Address{}, errors.Wrapf(err, "failed to resolve actor for provider %s", provider)
- }
-
- addr, err := address.NewFromString(actor.Address)
- if err != nil {
- return common.Address{}, errors.Wrapf(err, "failed to parse actor address %s", actor.Address)
- }
- if addr.Protocol() != address.Delegated {
- return common.Address{}, fmt.Errorf("provider actor address %s is not a delegated (f410) address", actor.Address)
- }
-
- payload := addr.Payload()
- // delegated address payload: first varint byte(s) for namespace, then 20 bytes for EVM address
- // for f410 (namespace 10), payload[0] is the namespace varint, rest is the subaddress
- if len(payload) < 21 {
- return common.Address{}, fmt.Errorf("provider delegated address payload too short: %d bytes", len(payload))
- }
- // skip namespace varint (1 byte for namespace 10)
- return common.BytesToAddress(payload[1:21]), nil
-}
-
func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) {
- if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil {
+ if d.pdpProofSetManager == nil {
return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured")
}
cfg := d.pdpSchedulingConfig
@@ -106,12 +74,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
return model.ScheduleError, err
}
- // resolve SP EVM address upfront -- needed for handoff after filling
- spEVMAddr, err := d.resolveProviderEVMAddress(ctx, schedule.Provider)
- if err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to resolve provider EVM address for PDP handoff")
- }
-
db := d.dbNoContext.WithContext(ctx)
var attachments []model.SourceAttachment
if err := db.Model(&model.SourceAttachment{}).
@@ -151,10 +113,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
clientID = *walletObj.ActorID
}
- // track the current proof set so we can propose transfer when it fills
- var currentProofSetID uint64
- var currentProofSetPieceCount int
-
var timer *time.Timer
current := sumResult{}
for {
@@ -204,45 +162,15 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
}
if scheduleComplete {
- // propose transfer for any partially-filled proof set before exiting
- if currentProofSetID != 0 && currentProofSetPieceCount > 0 {
- if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for final proof set")
- }
- }
if schedule.ScheduleCron != "" {
return "", nil
}
return model.ScheduleCompleted, nil
}
- // cap batch to remaining room in current proof set
- batchLimit := cfg.BatchSize
- if currentProofSetID != 0 {
- remaining := cfg.MaxPiecesPerProofSet - currentProofSetPieceCount
- if remaining <= 0 {
- // current proof set is full -- propose transfer and reset
- if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for full proof set")
- }
- currentProofSetID = 0
- currentProofSetPieceCount = 0
- continue
- }
- if remaining < batchLimit {
- batchLimit = remaining
- }
- }
-
- cars, err := d.findPDPCars(ctx, schedule, attachments, allowedPieceCIDs, overReplicatedCIDs, batchLimit)
+ cars, err := d.findPDPCars(ctx, schedule, attachments, allowedPieceCIDs, overReplicatedCIDs, cfg.BatchSize)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
- // no more cars -- propose transfer for any partial proof set
- if currentProofSetID != 0 && currentProofSetPieceCount > 0 {
- if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for final proof set")
- }
- }
if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual {
return "", nil
}
@@ -251,11 +179,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
return model.ScheduleError, err
}
if len(cars) == 0 {
- if currentProofSetID != 0 && currentProofSetPieceCount > 0 {
- if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for final proof set")
- }
- }
if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual {
return "", nil
}
@@ -267,44 +190,22 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
}
}
- proofSetID, err := d.pdpProofSetManager.EnsureProofSet(ctx, evmSigner, schedule.Provider, cfg)
- if err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to ensure PDP proof set")
- }
- if currentProofSetID == 0 {
- currentProofSetID = proofSetID
- // load existing piece count from DB in case we're resuming
- var ps model.PDPProofSet
- if err := db.Where("set_id = ?", proofSetID).First(&ps).Error; err == nil {
- currentProofSetPieceCount = ps.PieceCount
+ pieceInputs := make([]PDPPieceInput, len(cars))
+ for i, car := range cars {
+ pieceInputs[i] = PDPPieceInput{
+ PieceCID: cid.Cid(car.PieceCID),
+ PieceSize: car.PieceSize,
}
}
- pieceCIDs := make([]cid.Cid, 0, len(cars))
- pieceSizes := make([]int64, 0, len(cars))
- for _, car := range cars {
- pieceCIDs = append(pieceCIDs, cid.Cid(car.PieceCID))
- pieceSizes = append(pieceSizes, car.PieceSize)
- }
- queuedTx, err := d.pdpProofSetManager.QueueAddRoots(ctx, evmSigner, proofSetID, pieceCIDs, pieceSizes, cfg)
- if err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to queue PDP root addition transaction")
- }
-
- _, err = d.pdpTxConfirmer.WaitForConfirmations(ctx, queuedTx.Hash, cfg.ConfirmationDepth, cfg.PollingInterval)
+ result, err := d.pdpProofSetManager.PullPiecesToFWSS(ctx, evmSigner, schedule.Provider, pieceInputs, cfg)
if err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed waiting for PDP transaction confirmation")
+ return model.ScheduleError, errors.Wrap(err, "failed to push pieces via FWSS pull")
}
- // update durable piece count after confirmed on-chain add
- if err := d.pdpProofSetManager.IncrementPieceCount(ctx, proofSetID, len(cars)); err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to update proof set piece count")
- }
- currentProofSetPieceCount += len(cars)
-
deals := make([]model.Deal, len(cars))
for i, car := range cars {
- proofSetIDCopy := proofSetID
+ proofSetIDCopy := result.DataSetID
deals[i] = model.Deal{
State: model.DealProposed,
DealType: model.DealTypePDP,
@@ -327,15 +228,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
current.DealNumber++
current.DealSize += car.PieceSize
}
-
- // check if proof set is now full
- if currentProofSetPieceCount >= cfg.MaxPiecesPerProofSet {
- if err := d.pdpProofSetManager.ProposeTransfer(ctx, evmSigner, currentProofSetID, spEVMAddr); err != nil {
- return model.ScheduleError, errors.Wrap(err, "failed to propose transfer for full proof set")
- }
- currentProofSetID = 0
- currentProofSetPieceCount = 0
- }
}
}
diff --git a/service/dealpusher/pdp_wiring_test.go b/service/dealpusher/pdp_wiring_test.go
index 8a20423df..e67249471 100644
--- a/service/dealpusher/pdp_wiring_test.go
+++ b/service/dealpusher/pdp_wiring_test.go
@@ -3,55 +3,40 @@ package dealpusher
import (
"context"
"testing"
- "time"
"github.com/data-preservation-programs/go-synapse/signer"
"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/util/keystore"
"github.com/data-preservation-programs/singularity/util/testutil"
- "github.com/ethereum/go-ethereum/common"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"gorm.io/gorm"
)
+// proofSetManagerMock satisfies PDPProofSetManager. It records every batch
+// it receives so tests can assert on the pieces and provider that were
+// pushed.
type proofSetManagerMock struct {
- proofSetID uint64
- pieceCIDs []cid.Cid
- proposedTransfers []proposedTransfer
+ dataSetID uint64
+ calls []proofSetManagerCall
+ err error
}
-type proposedTransfer struct {
- proofSetID uint64
- spEVMAddress common.Address
+type proofSetManagerCall struct {
+ provider string
+ pieces []PDPPieceInput
}
-func (m *proofSetManagerMock) EnsureProofSet(_ context.Context, _ signer.EVMSigner, _ string, _ PDPSchedulingConfig) (uint64, error) {
- return m.proofSetID, nil
-}
-
-func (m *proofSetManagerMock) QueueAddRoots(_ context.Context, _ signer.EVMSigner, _ uint64, pieceCIDs []cid.Cid, _ []int64, _ PDPSchedulingConfig) (*PDPQueuedTx, error) {
- m.pieceCIDs = append([]cid.Cid(nil), pieceCIDs...)
- return &PDPQueuedTx{Hash: "0xabc"}, nil
-}
-
-func (m *proofSetManagerMock) ProposeTransfer(_ context.Context, _ signer.EVMSigner, proofSetID uint64, spEVMAddress common.Address) error {
- m.proposedTransfers = append(m.proposedTransfers, proposedTransfer{proofSetID, spEVMAddress})
- return nil
-}
-
-func (m *proofSetManagerMock) IncrementPieceCount(_ context.Context, _ uint64, _ int) error {
- return nil
-}
-
-type txConfirmerMock struct {
- txHash string
-}
-
-func (m *txConfirmerMock) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) {
- m.txHash = txHash
- return &PDPTransactionReceipt{Hash: txHash}, nil
+func (m *proofSetManagerMock) PullPiecesToFWSS(_ context.Context, _ signer.EVMSigner, provider string, pieces []PDPPieceInput, _ PDPSchedulingConfig) (PDPPullResult, error) {
+ if m.err != nil {
+ return PDPPullResult{}, m.err
+ }
+ m.calls = append(m.calls, proofSetManagerCall{
+ provider: provider,
+ pieces: append([]PDPPieceInput(nil), pieces...),
+ })
+ return PDPPullResult{DataSetID: m.dataSetID}, nil
}
func TestDealPusher_ResolveScheduleDealType_EmptyReturnsEmpty(t *testing.T) {
@@ -85,7 +70,7 @@ func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t *
require.Contains(t, err.Error(), "pdp scheduling dependencies are not configured")
}
-func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation(t *testing.T) {
+func TestDealPusher_RunSchedule_PDPPushesBatchAndCreatesDeals(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
clientSubaddr := make([]byte, 20)
clientSubaddr[19] = 10
@@ -107,22 +92,20 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation
actorID := "f01001"
require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error)
- // provider needs an actor record for EVM address resolution
- providerActorID := "f01002"
- require.NoError(t, db.Create(&model.Actor{ID: providerActorID, Address: providerAddr.String()}).Error)
+ require.NoError(t, db.Create(&model.Actor{ID: "f01002", Address: providerAddr.String()}).Error)
- wallet := model.Wallet{
+ walletObj := model.Wallet{
Address: clientAddr.String(),
KeyPath: keyPath,
KeyStore: "local",
ActorID: &actorID,
}
- require.NoError(t, db.Create(&wallet).Error)
- require.NoError(t, db.Model(&prep).Update("wallet_id", wallet.ID).Error)
- storage := model.Storage{Name: "src-storage"}
- require.NoError(t, db.Create(&storage).Error)
- require.NotZero(t, storage.ID)
- attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID}
+ require.NoError(t, db.Create(&walletObj).Error)
+ require.NoError(t, db.Model(&prep).Update("wallet_id", walletObj.ID).Error)
+
+ storageRow := model.Storage{Name: "src-storage"}
+ require.NoError(t, db.Create(&storageRow).Error)
+ attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storageRow.ID}
require.NoError(t, db.Create(&attachment).Error)
require.NotZero(t, attachment.ID)
@@ -143,42 +126,26 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation
TotalDealNumber: 1,
}
require.NoError(t, db.Create(&schedule).Error)
- schedule.Preparation = &model.Preparation{Wallet: &wallet}
+ schedule.Preparation = &model.Preparation{Wallet: &walletObj}
- psm := &proofSetManagerMock{proofSetID: 42}
- conf := &txConfirmerMock{}
+ psm := &proofSetManagerMock{dataSetID: 42}
d := &DealPusher{
dbNoContext: db,
keyStore: ks,
pdpProofSetManager: psm,
- pdpTxConfirmer: conf,
pdpSchedulingConfig: defaultPDPSchedulingConfig(),
scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP },
}
- var attachments []model.SourceAttachment
- require.NoError(t, db.Where("preparation_id = ?", schedule.PreparationID).Find(&attachments).Error)
- require.Len(t, attachments, 1)
- overReplicatedCIDs := db.
- Table("deals").
- Select("piece_cid").
- Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}).
- Group("piece_cid").
- Having("count(*) >= ?", d.maxReplicas)
- cars, err := d.findPDPCars(ctx, &schedule, attachments, nil, overReplicatedCIDs, d.pdpSchedulingConfig.BatchSize)
- require.NoError(t, err)
- require.Len(t, cars, 1)
state, err := d.runSchedule(ctx, &schedule)
require.NoError(t, err)
require.Equal(t, model.ScheduleCompleted, state)
- require.Equal(t, "0xabc", conf.txHash)
- require.Len(t, psm.pieceCIDs, 1)
- require.Equal(t, cid.Cid(pieceCID), psm.pieceCIDs[0])
- // schedule completion should propose transfer for the partial proof set
- require.Len(t, psm.proposedTransfers, 1)
- require.Equal(t, uint64(42), psm.proposedTransfers[0].proofSetID)
- require.Equal(t, common.BytesToAddress(providerSubaddr), psm.proposedTransfers[0].spEVMAddress)
+ require.Len(t, psm.calls, 1, "expected one batch push")
+ require.Equal(t, providerAddr.String(), psm.calls[0].provider)
+ require.Len(t, psm.calls[0].pieces, 1)
+ require.Equal(t, cid.Cid(pieceCID), psm.calls[0].pieces[0].PieceCID)
+ require.Equal(t, int64(1024), psm.calls[0].pieces[0].PieceSize)
var deals []model.Deal
require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error)
@@ -190,7 +157,7 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation
require.NotNil(t, deals[0].ProofSetID)
require.Equal(t, uint64(42), *deals[0].ProofSetID)
require.NotNil(t, deals[0].WalletID)
- require.Equal(t, wallet.ID, *deals[0].WalletID)
+ require.Equal(t, walletObj.ID, *deals[0].WalletID)
})
}
@@ -216,18 +183,18 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) {
actorID := "f01001"
require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error)
require.NoError(t, db.Create(&model.Actor{ID: "f01002", Address: providerAddr.String()}).Error)
- wallet := model.Wallet{
+ walletObj := model.Wallet{
Address: clientAddr.String(),
KeyPath: keyPath,
KeyStore: "local",
ActorID: &actorID,
}
- require.NoError(t, db.Create(&wallet).Error)
- require.NoError(t, db.Model(&prep).Update("wallet_id", wallet.ID).Error)
+ require.NoError(t, db.Create(&walletObj).Error)
+ require.NoError(t, db.Model(&prep).Update("wallet_id", walletObj.ID).Error)
- storage := model.Storage{Name: "src-storage"}
- require.NoError(t, db.Create(&storage).Error)
- attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID}
+ storageRow := model.Storage{Name: "src-storage"}
+ require.NoError(t, db.Create(&storageRow).Error)
+ attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storageRow.ID}
require.NoError(t, db.Create(&attachment).Error)
pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024))
@@ -235,7 +202,7 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) {
AttachmentID: &attachment.ID,
PreparationID: &prep.ID,
PieceCID: pieceCID,
- PieceSize: 1536, // Not a power of two.
+ PieceSize: 1536, // not a power of two
StoragePath: "car-1",
}
require.NoError(t, db.Create(&car).Error)
@@ -247,15 +214,13 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) {
TotalDealNumber: 1,
}
require.NoError(t, db.Create(&schedule).Error)
- schedule.Preparation = &model.Preparation{Wallet: &wallet}
+ schedule.Preparation = &model.Preparation{Wallet: &walletObj}
- psm := &proofSetManagerMock{proofSetID: 42}
- conf := &txConfirmerMock{}
+ psm := &proofSetManagerMock{dataSetID: 42}
d := &DealPusher{
dbNoContext: db,
keyStore: ks,
pdpProofSetManager: psm,
- pdpTxConfirmer: conf,
pdpSchedulingConfig: defaultPDPSchedulingConfig(),
scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP },
}
@@ -265,8 +230,7 @@ func TestDealPusher_RunSchedule_PDPRejectsInvalidPieceSize(t *testing.T) {
require.Equal(t, model.ScheduleError, state)
require.ErrorContains(t, err, "invalid piece size for piece")
require.ErrorContains(t, err, "must be a power of two")
- require.Empty(t, conf.txHash)
- require.Empty(t, psm.pieceCIDs)
+ require.Empty(t, psm.calls)
var deals []model.Deal
require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error)
@@ -296,18 +260,18 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi
actorID := "f01001"
require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error)
require.NoError(t, db.Create(&model.Actor{ID: "f01002", Address: providerAddr.String()}).Error)
- wallet := model.Wallet{
+ walletObj := model.Wallet{
Address: clientAddr.String(),
KeyPath: keyPath,
KeyStore: "local",
ActorID: &actorID,
}
- require.NoError(t, db.Create(&wallet).Error)
- require.NoError(t, db.Model(&prep).Update("wallet_id", wallet.ID).Error)
+ require.NoError(t, db.Create(&walletObj).Error)
+ require.NoError(t, db.Model(&prep).Update("wallet_id", walletObj.ID).Error)
- storage := model.Storage{Name: "src-storage"}
- require.NoError(t, db.Create(&storage).Error)
- attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID}
+ storageRow := model.Storage{Name: "src-storage"}
+ require.NoError(t, db.Create(&storageRow).Error)
+ attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storageRow.ID}
require.NoError(t, db.Create(&attachment).Error)
pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024))
@@ -315,7 +279,7 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi
AttachmentID: &attachment.ID,
PreparationID: &prep.ID,
PieceCID: pieceCID,
- PieceSize: 1 << 31, // 2 GiB, above current 1 GiB minus FR32 overhead PDP limit.
+ PieceSize: 1 << 31, // 2 GiB, above current 1 GiB minus FR32 overhead PDP limit
StoragePath: "car-1",
}
require.NoError(t, db.Create(&car).Error)
@@ -327,15 +291,13 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi
TotalDealNumber: 1,
}
require.NoError(t, db.Create(&schedule).Error)
- schedule.Preparation = &model.Preparation{Wallet: &wallet}
+ schedule.Preparation = &model.Preparation{Wallet: &walletObj}
- psm := &proofSetManagerMock{proofSetID: 42}
- conf := &txConfirmerMock{}
+ psm := &proofSetManagerMock{dataSetID: 42}
d := &DealPusher{
dbNoContext: db,
keyStore: ks,
pdpProofSetManager: psm,
- pdpTxConfirmer: conf,
pdpSchedulingConfig: defaultPDPSchedulingConfig(),
scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP },
}
@@ -344,8 +306,7 @@ func TestDealPusher_RunSchedule_PDPRejectsPreparationWithOversizedPiece(t *testi
require.Error(t, err)
require.Equal(t, model.ScheduleError, state)
require.ErrorContains(t, err, "piece limit is 1 GiB minus FR32 overhead")
- require.Empty(t, conf.txHash)
- require.Empty(t, psm.pieceCIDs)
+ require.Empty(t, psm.calls)
var deals []model.Deal
require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error)