Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions src/proxy/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,17 @@ impl<S: ProxyStream> Controller<S> {
partition_finder: Arc<impl PartitionFinder<S> + Sync + Send + 'static>,
status_reporter: StatusReporter,
cw_publisher: Option<Arc<dyn CloudWatchClient>>,
) -> Self {
let Ok(listener) = TcpListener::bind(listen_addr).await else {
panic!("Failed to bind {}", listen_addr);
};
) -> Result<Self, std::io::Error> {
let listener = TcpListener::bind(listen_addr).await.map_err(|e| {
error!(
"Failed to bind to {}: {}. \
Ensure the address is reachable and the port is not already in use.",
listen_addr, e
);
e
})?;

Self {
Ok(Self {
listener,
partition_finder,
proxy_id: ProxyIdentifier::new(),
Expand All @@ -125,7 +130,7 @@ impl<S: ProxyStream> Controller<S> {
status_reporter,
proxy_config,
cw_publisher,
}
})
}

pub async fn run<T: RpcClient, V: S3ClientBuilder>(
Expand Down Expand Up @@ -218,7 +223,12 @@ impl<S: ProxyStream> Controller<S> {
}

// Skip channel init if read bypass is not requested
let channel_init_config = if !self.proxy_config.nested_config.read_bypass_config.requested {
let channel_init_config = if !self
.proxy_config
.nested_config
.read_bypass_config
.requested
{
ChannelInitConfig::default()
} else {
let configs = vec![ChannelConfigArgs::AWSFILE_READ_BYPASS_V2(
Expand Down Expand Up @@ -503,10 +513,8 @@ impl<S: ProxyStream> Controller<S> {
mod tests {
use super::*;
use crate::{
aws::cw_publisher::LogLevel,
config::channel_init_config::ChannelInitConfig,
proxy_builder::ProxyBuilder,
status_reporter::create_status_channel,
aws::cw_publisher::LogLevel, config::channel_init_config::ChannelInitConfig,
proxy_builder::ProxyBuilder, status_reporter::create_status_channel,
};
use std::sync::atomic::AtomicU64;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -599,12 +607,7 @@ mod tests {
)
.await;

let mut state = IncarnationState::new(
ProxyIdentifier::new(),
None,
events_tx,
1,
);
let mut state = IncarnationState::new(ProxyIdentifier::new(), None, events_tx, 1);

let period_secs = 10;
let publisher_clone = mock_publisher.clone();
Expand Down Expand Up @@ -658,4 +661,23 @@ mod tests {
let proxy = handle.await.unwrap();
let _ = proxy.shutdown().await;
}

#[tokio::test]
async fn test_controller_new_bind_failure_returns_err() {
// Occupy a port so the second bind attempt fails.
let occupied = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = occupied.local_addr().unwrap();

let (_status_requester, status_reporter) = create_status_channel();
let result = Controller::new(
&addr.to_string(),
ProxyConfig::default(),
Arc::new(MockPartitionFinder),
status_reporter,
None,
)
.await;

assert!(result.is_err(), "expected Err when port is already bound");
}
}
18 changes: 16 additions & 2 deletions src/proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ async fn main() {
status_reporter,
cw_publisher.clone(),
)
.await;
.await
.unwrap_or_else(|_| {
let p = std::path::Path::new(&proxy_config.pid_file_path);
if p.exists() {
let _ = std::fs::remove_file(p);
}
std::process::exit(1);
});
tokio::spawn(controller.run(
sigterm_cancellation_token.clone(),
AwsFileRpcClient,
Expand All @@ -116,7 +123,14 @@ async fn main() {
status_reporter,
cw_publisher.clone(),
)
.await;
.await
.unwrap_or_else(|_| {
let p = std::path::Path::new(&proxy_config.pid_file_path);
if p.exists() {
let _ = std::fs::remove_file(p);
}
std::process::exit(1);
});
tokio::spawn(controller.run(
sigterm_cancellation_token.clone(),
AwsFileRpcClient,
Expand Down
Loading