Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
revm-interpreter = { version = "29.0.1", default-features = false }

# rbuilder
rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2" }
rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2", features = [
rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "4132e6d7077de51b5f314c687ebaa760f52471c0" }
rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "4132e6d7077de51b5f314c687ebaa760f52471c0", features = [
"test-utils"
] }

Expand Down Expand Up @@ -190,4 +190,4 @@
rustdoc.all = "warn"

[package.metadata.cargo-shear]
ignored = ["strum"]

Check warning on line 193 in Cargo.toml

View workflow job for this annotation

GitHub Actions / cargo-shear

shear/redundant_ignore

redundant ignore `strum` (remove from ignored list)
1 change: 1 addition & 0 deletions fixtures/create_bundles_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ CREATE TABLE bundles (
`refund_percent` Nullable(UInt8),
`refund_recipient` Nullable(FixedString(20)),
`delayed_refund` Nullable(Bool),
`disable_cross_region_sharing` Bool,
`refund_identity` Nullable(FixedString(20)),

`signer_address` Nullable(FixedString(20)),
Expand Down
8 changes: 6 additions & 2 deletions src/builderhub/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ impl Client {
}

/// Register the given signer address with the BuilderHub peer store.
pub async fn register(&self, signer_address: Address) -> Result<(), ClientRegisterError> {
pub async fn register(
&self,
signer_address: Address,
region: String,
) -> Result<(), ClientRegisterError> {
let endpoint =
format!("{}/api/l1-builder/v1/register_credentials/orderflow_proxy", self.url);
let body = PeerCredentials { tls_cert: None, ecdsa_pubkey_address: signer_address };
let body = PeerCredentials { tls_cert: None, ecdsa_pubkey_address: signer_address, region };
let response = self.inner.post(endpoint).json(&body).send().await?;
let status = response.status();
if !status.is_success() {
Expand Down
11 changes: 10 additions & 1 deletion src/builderhub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub struct PeerCredentials {
pub tls_cert: Option<String>,
/// Orderflow signer public key.
pub ecdsa_pubkey_address: Address,
/// Region of the builder.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub region: String,
}

/// A [`Peer`] is a builder inside Builderhub. This holds informations about a builder peer, as
Expand Down Expand Up @@ -135,7 +138,12 @@ impl LocalPeerStore {
Self { builders: Arc::new(DashMap::new()) }
}

pub fn register(&self, signer_address: Address, port: Option<u16>) -> LocalPeerStore {
pub fn register(
&self,
signer_address: Address,
port: Option<u16>,
region: String,
) -> LocalPeerStore {
self.builders.insert(
signer_address.to_string(),
Peer {
Expand All @@ -147,6 +155,7 @@ impl LocalPeerStore {
orderflow_proxy: PeerCredentials {
tls_cert: None,
ecdsa_pubkey_address: signer_address,
region,
},
instance: InstanceData { tls_cert: "".to_string() },
},
Expand Down
37 changes: 36 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{convert::Infallible, net::SocketAddr, num::NonZero, path::PathBuf, str::FromStr};
use std::{
convert::Infallible, fmt::Display, net::SocketAddr, num::NonZero, path::PathBuf, str::FromStr,
};

use alloy_primitives::Address;
use alloy_signer_local::PrivateKeySigner;
Expand All @@ -16,6 +18,24 @@ use crate::{
/// The maximum request size in bytes (10 MiB).
const MAX_REQUEST_SIZE_BYTES: usize = 10 * 1024 * 1024;

/// Possible config regions
#[derive(Debug, Clone, clap::ValueEnum)]
pub enum Region {
US,
EU,
AP,
}

impl Display for Region {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::US => write!(f, "us"),
Self::EU => write!(f, "eu"),
Self::AP => write!(f, "ap"),
}
}
}

/// Arguments required to create a clickhouse client.
#[derive(PartialEq, Eq, Clone, Debug, Args)]
#[group(id = "clickhouse", requires_all = ["CLICKHOUSE_HOST", "CLICKHOUSE_USERNAME", "CLICKHOUSE_PASSWORD", "CLICKHOUSE_DATABASE"])]
Expand Down Expand Up @@ -210,6 +230,10 @@ pub struct OrderflowIngressArgs {
#[clap(long, env = "BUILDERNET_NODE_NAME", id = "BUILDERNET_NODE_NAME", value_parser = replace_dashes_with_underscores)]
pub builder_name: String,

/// Region of the builder.
#[clap(long, value_enum, env = "BUILDER_REGION", id = "BUILDER_REGION")]
pub builder_region: Region,

/// The URL of BuilderHub.
#[clap(long, value_hint = ValueHint::Url, env = "BUILDERHUB_ENDPOINT", id = "BUILDERHUB_ENDPOINT")]
pub builder_hub_url: Option<String>,
Expand Down Expand Up @@ -350,6 +374,7 @@ impl Default for OrderflowIngressArgs {
builder_url: None,
builder_ready_endpoint: None,
builder_name: String::from("buildernet"),
builder_region: Region::US,
builder_hub_url: None,
flashbots_signer: None,
max_txs_per_bundle: 100,
Expand Down Expand Up @@ -506,6 +531,8 @@ mod tests {
"http://localhost:3000",
"--builder-name",
"buildernet",
"--builder-region",
"us",
];

let args = OrderflowIngressArgs::try_parse_from(args)
Expand Down Expand Up @@ -535,6 +562,8 @@ mod tests {
"http://localhost:3000",
"--builder-name",
"buildernet",
"--builder-region",
"us",
"--indexer.clickhouse.host",
"http://127.0.0.1:12345",
];
Expand Down Expand Up @@ -567,6 +596,8 @@ mod tests {
"http://localhost:3000",
"--builder-name",
"buildernet",
"--builder-region",
"us",
"--indexer.clickhouse.host",
"http://127.0.0.1:12345",
"--indexer.clickhouse.database",
Expand Down Expand Up @@ -613,6 +644,8 @@ mod tests {
"http://localhost:3000",
"--builder-name",
"buildernet",
"--builder-region",
"us",
"--indexer.parquet.bundle-receipts-file-path",
"pronto.parquet",
];
Expand Down Expand Up @@ -647,6 +680,8 @@ mod tests {
"http://localhost:3000",
"--builder-name",
"buildernet",
"--builder-region",
"us",
"--indexer.parquet.bundle-receipts-file-path",
"pronto.parquet",
"--indexer.clickhouse.host",
Expand Down
118 changes: 114 additions & 4 deletions src/forwarder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct IngressForwarders {
peers: Arc<DashMap<String, PeerHandle>>,
/// The priority workers for signing requests.
workers: PriorityWorkers,
/// The region of the local builder. Used to filter peers when bundles disable
/// cross-region sharing.
region: String,
}

impl IngressForwarders {
Expand All @@ -58,8 +61,9 @@ impl IngressForwarders {
peers: Arc<DashMap<String, PeerHandle>>,
signer: PrivateKeySigner,
workers: PriorityWorkers,
region: String,
) -> Self {
Self { local, peers, signer, workers }
Self { local, peers, signer, workers, region }
}

/// Find peer name by address.
Expand All @@ -78,6 +82,11 @@ impl IngressForwarders {

let priority = order.priority();
let method_name = order.method_name().to_string();
let restrict_to_local_region = if let SystemOrder::Bundle(ref bundle) = order {
bundle.raw_bundle.metadata.disable_cross_region_sharing
} else {
false
};

// Start with JSON-RPC encoding, that's needed for the local builder anyway.
let mut encoded_order = order.clone().encode();
Expand Down Expand Up @@ -120,12 +129,16 @@ impl IngressForwarders {
let forward = Arc::new(ForwardingRequest::user_to_system(encoded_order.into(), headers));

debug!(peers = %self.peers.len(), "sending order to peers");
self.broadcast_inner(forward);
self.broadcast_inner(forward, restrict_to_local_region);
}

/// Broadcast request to all peers.
fn broadcast_inner(&self, forward: Arc<ForwardingRequest>) {
/// Broadcast request to all peers. When `restrict_to_local_region` is true, peers in a
/// different region than the local builder are skipped (but kept in the peer map).
fn broadcast_inner(&self, forward: Arc<ForwardingRequest>, restrict_to_local_region: bool) {
self.peers.retain(|peer, handle| {
if restrict_to_local_region && handle.info.orderflow_proxy.region != self.region {
return true;
}
if let Err(e) = handle.sender.send(forward.priority(), forward.clone()) {
error!(?e, %peer, "peer channel closed, removing peer");

Expand Down Expand Up @@ -364,3 +377,100 @@ impl Default for LogRateLimiter {
Self::new(Duration::from_millis(100))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
builderhub::{InstanceData, Peer, PeerCredentials},
primitives::{EncodedOrder, RawOrderMetadata, UtcInstant, WithEncoding},
priority::Priority,
};
use alloy_primitives::{Address, B256};

fn make_peer_with_region(
region: &str,
) -> (Peer, PeerHandle, priority::channel::UnboundedReceiver<Arc<ForwardingRequest>>) {
let info = Peer {
name: region.to_string(),
ip: "127.0.0.1".to_string(),
dns_name: String::new(),
orderflow_proxy: PeerCredentials {
tls_cert: None,
ecdsa_pubkey_address: Address::ZERO,
region: region.to_string(),
},
instance: InstanceData { tls_cert: String::new() },
};
let (tx, rx) = priority::channel::unbounded_channel();
(info.clone(), PeerHandle { info, sender: tx }, rx)
}

fn dummy_forward() -> Arc<ForwardingRequest> {
let order = EncodedOrder::Raw(WithEncoding {
inner: RawOrderMetadata {
priority: Priority::Medium,
received_at: UtcInstant::now(),
hash: B256::ZERO,
},
encoding: Arc::new(Vec::new()),
encoding_tcp_forwarder: None,
});
Arc::new(ForwardingRequest::user_to_local(order))
}

fn forwarders_with(
local_region: &str,
peers: Arc<DashMap<String, PeerHandle>>,
) -> IngressForwarders {
let (local_tx, _) = priority::channel::unbounded_channel();
IngressForwarders::new(
local_tx,
peers,
alloy_signer_local::PrivateKeySigner::random(),
PriorityWorkers::new_with_threads(1),
local_region.to_string(),
)
}

async fn try_recv(
rx: &mut priority::channel::UnboundedReceiver<Arc<ForwardingRequest>>,
) -> bool {
tokio::time::timeout(Duration::from_millis(50), rx.recv()).await.is_ok()
}

#[tokio::test]
async fn broadcast_inner_skips_cross_region_when_restricted() {
let peers: Arc<DashMap<String, PeerHandle>> = Arc::new(DashMap::new());
let (_, us_handle, mut us_rx) = make_peer_with_region("us");
let (_, eu_handle, mut eu_rx) = make_peer_with_region("eu");
peers.insert("us".to_string(), us_handle);
peers.insert("eu".to_string(), eu_handle);

let forwarders = forwarders_with("us", peers.clone());

forwarders.broadcast_inner(dummy_forward(), true);

assert!(try_recv(&mut us_rx).await, "same-region peer should receive");
assert!(!try_recv(&mut eu_rx).await, "cross-region peer should be skipped");

// Both peers must remain in the map (skip != evict).
assert_eq!(peers.len(), 2);
}

#[tokio::test]
async fn broadcast_inner_sends_to_all_when_unrestricted() {
let peers: Arc<DashMap<String, PeerHandle>> = Arc::new(DashMap::new());
let (_, us_handle, mut us_rx) = make_peer_with_region("us");
let (_, eu_handle, mut eu_rx) = make_peer_with_region("eu");
peers.insert("us".to_string(), us_handle);
peers.insert("eu".to_string(), eu_handle);

let forwarders = forwarders_with("us", peers.clone());

forwarders.broadcast_inner(dummy_forward(), false);

assert!(try_recv(&mut us_rx).await);
assert!(try_recv(&mut eu_rx).await);
}
}
12 changes: 12 additions & 0 deletions src/indexer/click/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ pub struct BundleRow {
pub refund_recipient: Option<Address>,
/// Whether the bundle has a delayed refund.
pub delayed_refund: Option<bool>,
/// If bundle disallows sending to other regions
pub disable_cross_region_sharing: bool,
/// For 2nd price refunds done by buildernet
#[serde(with = "address::option")]
pub refund_identity: Option<Address>,
Expand Down Expand Up @@ -251,6 +253,10 @@ impl From<(SystemBundle, String)> for BundleRow {
reverting_tx_hashes: bundle.raw_bundle.metadata.reverting_tx_hashes.clone(),
dropping_tx_hashes: bundle.raw_bundle.metadata.dropping_tx_hashes.clone(),
delayed_refund: bundle.raw_bundle.metadata.delayed_refund,
disable_cross_region_sharing: bundle
.raw_bundle
.metadata
.disable_cross_region_sharing,
refund_tx_hashes: bundle
.raw_bundle
.metadata
Expand Down Expand Up @@ -318,6 +324,10 @@ impl From<(SystemBundle, String)> for BundleRow {
signer_address: Some(bundle.metadata.signer),
builder_name,
delayed_refund: bundle.raw_bundle.metadata.delayed_refund,
disable_cross_region_sharing: bundle
.raw_bundle
.metadata
.disable_cross_region_sharing,
refund_percent: bundle.raw_bundle.metadata.refund_percent,
refund_recipient: bundle.raw_bundle.metadata.refund_recipient,
refund_identity: bundle.raw_bundle.metadata.refund_identity,
Expand Down Expand Up @@ -568,7 +578,9 @@ pub(crate) mod tests {
Some("v2".to_string())
},
signing_address: value.signer_address,
// TODO: looks like a bug
delayed_refund: None,
disable_cross_region_sharing: value.disable_cross_region_sharing,
},
}
}
Expand Down
Loading