Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions services/api-rs/crates/centaur-api-server/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,22 @@ struct SandboxArgs {
value_delimiter = ','
)]
image_pull_secrets: Vec<String>,
/// `key=value` pairs set as the `nodeSelector` on every sandbox pod, e.g.
/// to pin sandboxes to a dedicated (spot) node pool.
#[arg(
long = "session-sandbox-node-selector",
env = "SESSION_SANDBOX_NODE_SELECTOR",
value_delimiter = ','
)]
node_selector: Vec<String>,
/// JSON array of toleration objects applied to every sandbox pod, so they
/// can schedule onto a tainted node pool, e.g.
/// `[{"key":"centaur","operator":"Equal","value":"true","effect":"NoSchedule"}]`.
#[arg(
long = "session-sandbox-tolerations",
env = "SESSION_SANDBOX_TOLERATIONS"
)]
tolerations: Option<String>,
#[arg(
long = "session-sandbox-ready-timeout-secs",
alias = "kubernetes-sandbox-ready-timeout-s",
Expand Down Expand Up @@ -1326,8 +1342,32 @@ impl TryFrom<&SandboxArgs> for AgentSandboxConfig {
.map(str::to_owned)
.collect();
config.ready_timeout = Duration::from_secs(args.ready_timeout_secs);
config.node_selector = args
.node_selector
.iter()
.filter_map(|entry| entry.split_once('='))
.map(|(key, value)| (key.trim().to_owned(), value.trim().to_owned()))
.filter(|(key, _)| !key.is_empty())
.collect();
if let Some(raw) = args
.tolerations
.as_deref()
.map(str::trim)
.filter(|raw| !raw.is_empty())
{
config.tolerations = serde_json::from_str(raw).map_err(|err| {
ServerError::UnsupportedConfig(format!(
"SESSION_SANDBOX_TOLERATIONS must be a JSON array of toleration objects: {err}"
))
})?;
}
config.iron_proxy = args.iron_proxy.to_config()?;
// The per-sandbox proxy pod follows the sandbox onto the same node pool.
let sandbox_node_selector = config.node_selector.clone();
let sandbox_tolerations = config.tolerations.clone();
if let Some(proxy) = config.iron_proxy.as_mut() {
proxy.node_selector = sandbox_node_selector;
proxy.tolerations = sandbox_tolerations;
// `to_config` only ships the harness fragment, so add infra and
// discovered tool fragments for any static proxy placeholder
// metadata the backend needs.
Expand Down Expand Up @@ -2067,6 +2107,10 @@ mod tests {
"github-access-token-read-packages, extra-secret ",
"--session-sandbox-ready-timeout-secs",
"42",
"--session-sandbox-node-selector",
"centaur-pool=true, cloud.google.com/gke-spot=true ",
"--session-sandbox-tolerations",
r#"[{"key":"centaur","operator":"Equal","value":"true","effect":"NoSchedule"}]"#,
"--kubernetes-sandbox-iron-proxy-mode",
"disabled",
])
Expand All @@ -2080,6 +2124,20 @@ mod tests {
vec!["github-access-token-read-packages", "extra-secret"]
);
assert_eq!(config.ready_timeout, Duration::from_secs(42));
assert_eq!(
config.node_selector.get("centaur-pool").map(String::as_str),
Some("true")
);
assert_eq!(
config
.node_selector
.get("cloud.google.com/gke-spot")
.map(String::as_str),
Some("true")
);
assert_eq!(config.tolerations.len(), 1);
assert_eq!(config.tolerations[0]["key"], "centaur");
assert_eq!(config.tolerations[0]["effect"], "NoSchedule");
assert!(config.iron_proxy.is_none());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use centaur_sandbox_core::{SandboxError, SandboxId, SandboxResult, SandboxSpec};
use k8s_openapi::api::core::v1::{
Capabilities, Container, ContainerPort, EmptyDirVolumeSource, EnvFromSource,
EnvVar as K8sEnvVar, HTTPGetAction, Pod, PodSpec, Probe, SecretEnvSource, SecretVolumeSource,
SecurityContext, Service, ServicePort, ServiceSpec, Volume, VolumeMount,
SecurityContext, Service, ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
};
use k8s_openapi::api::networking::v1::{
NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, NetworkPolicyPeer,
Expand Down Expand Up @@ -83,6 +83,11 @@ pub struct IronProxyConfig {
pub op_connect_app_name: String,
pub op_connect_port: u16,
pub api_pod_labels: BTreeMap<String, String>,
/// `nodeSelector` for the per-sandbox proxy pod. Mirrors the sandbox's so the
/// proxy lands on the same (e.g. spot) pool. Empty = none.
pub node_selector: BTreeMap<String, String>,
/// Tolerations for the per-sandbox proxy pod (verbatim JSON). Empty = none.
pub tolerations: Vec<Value>,
}

impl IronProxyConfig {
Expand All @@ -106,6 +111,8 @@ impl IronProxyConfig {
"app.kubernetes.io/component".to_owned(),
"api".to_owned(),
)]),
node_selector: BTreeMap::new(),
tolerations: Vec::new(),
}
}
}
Expand Down Expand Up @@ -1091,6 +1098,15 @@ fn build_iron_proxy_pod(
restart_policy: Some("Never".to_owned()),
containers: vec![iron_proxy_container(iron_proxy, resolved, sync)],
volumes: Some(iron_proxy_volumes(iron_proxy)),
node_selector: (!iron_proxy.node_selector.is_empty())
.then(|| iron_proxy.node_selector.clone()),
tolerations: (!iron_proxy.tolerations.is_empty()).then(|| {
iron_proxy
.tolerations
.iter()
.filter_map(|toleration| serde_json::from_value::<Toleration>(toleration.clone()).ok())
.collect()
}),
..Default::default()
}),
..Default::default()
Expand Down Expand Up @@ -1761,6 +1777,51 @@ mod tests {
}
}

#[test]
fn iron_proxy_pod_carries_node_selector_and_tolerations() {
let id = SandboxId::new("asbx-test");
let sync = ProxySyncEnv {
proxy_id: "proxy-1".to_owned(),
control_url: "http://console:3000".to_owned(),
token: "iprx-token".to_owned(),
};
let mut iron_proxy = IronProxyConfig::new("proxy:test", "ca-cert", "ca-key");
iron_proxy
.node_selector
.insert("centaur-pool".to_owned(), "true".to_owned());
iron_proxy.tolerations.push(json!({
"key": "centaur",
"operator": "Equal",
"value": "true",
"effect": "NoSchedule",
}));

let pod = build_iron_proxy_pod(&id, &iron_proxy, &resolved(), &sync);
let spec = pod.spec.as_ref().unwrap();
assert_eq!(
spec.node_selector
.as_ref()
.and_then(|selector| selector.get("centaur-pool"))
.map(String::as_str),
Some("true")
);
let tolerations = spec.tolerations.as_ref().unwrap();
assert_eq!(tolerations.len(), 1);
assert_eq!(tolerations[0].key.as_deref(), Some("centaur"));
assert_eq!(tolerations[0].effect.as_deref(), Some("NoSchedule"));

// Absent by default: proxy pod stays schedulable anywhere when unset.
let bare = build_iron_proxy_pod(
&id,
&IronProxyConfig::new("proxy:test", "ca-cert", "ca-key"),
&resolved(),
&sync,
);
let bare_spec = bare.spec.as_ref().unwrap();
assert!(bare_spec.node_selector.is_none());
assert!(bare_spec.tolerations.is_none());
}

fn rule_allows_namespace_port(
rule: &NetworkPolicyEgressRule,
namespace: &str,
Expand Down
59 changes: 59 additions & 0 deletions services/api-rs/crates/centaur-sandbox-agent-k8s/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub struct AgentSandboxConfig {
/// harness's usage/cost spans never leave the pod.
pub otlp_egress: Option<OtlpEgressTarget>,
pub ready_timeout: Duration,
/// `nodeSelector` applied to every sandbox pod. Empty = none. Lets the
/// control plane confine sandboxes to a dedicated node pool (e.g. spot).
pub node_selector: BTreeMap<String, String>,
/// Toleration objects applied verbatim to every sandbox pod, so sandboxes
/// can schedule onto a tainted node pool. Empty = none.
pub tolerations: Vec<Value>,
}

/// Destination of the sandbox's direct OTLP export, expressed as the target
Expand Down Expand Up @@ -113,6 +119,8 @@ impl AgentSandboxConfig {
tools: None,
otlp_egress: None,
ready_timeout: Duration::from_secs(60),
node_selector: BTreeMap::new(),
tolerations: Vec::new(),
}
}

Expand Down Expand Up @@ -711,6 +719,16 @@ fn build_agent_sandbox(
.collect::<Vec<_>>()
}),
);
insert_optional(
&mut pod_spec,
"nodeSelector",
(!config.node_selector.is_empty()).then(|| config.node_selector.clone()),
);
insert_optional(
&mut pod_spec,
"tolerations",
(!config.tolerations.is_empty()).then(|| config.tolerations.clone()),
);

let mut agent_spec = json!({
"replicas": 1,
Expand Down Expand Up @@ -905,6 +923,47 @@ mod tests {
assert!(container.resources.as_ref().unwrap().limits.is_some());
}

#[test]
fn node_selector_and_tolerations_land_on_the_pod() {
let spec = SandboxSpec::new("centaur-agent:latest");
let mut config = AgentSandboxConfig::new("centaur");
config
.node_selector
.insert("centaur-pool".to_owned(), "true".to_owned());
config.tolerations.push(json!({
"key": "centaur",
"operator": "Equal",
"value": "true",
"effect": "NoSchedule",
}));

let sandbox = build_agent_sandbox(&SandboxId::new("asbx-test"), &spec, &config).unwrap();
let pod_spec = &sandbox.spec.pod_template.spec;

assert_eq!(
pod_spec
.node_selector
.as_ref()
.and_then(|selector| selector.get("centaur-pool"))
.map(String::as_str),
Some("true")
);
let tolerations = pod_spec.tolerations.as_ref().unwrap();
assert_eq!(tolerations.len(), 1);
assert_eq!(tolerations[0].key.as_deref(), Some("centaur"));
assert_eq!(tolerations[0].effect.as_deref(), Some("NoSchedule"));

// Unset by default: no nodeSelector/tolerations on the pod.
let bare = build_agent_sandbox(
&SandboxId::new("asbx-bare"),
&spec,
&AgentSandboxConfig::new("centaur"),
)
.unwrap();
assert!(bare.spec.pod_template.spec.node_selector.is_none());
assert!(bare.spec.pod_template.spec.tolerations.is_none());
}

#[test]
fn tools_clone_rides_iron_proxy_when_enabled() {
// apply_proxy_env runs before build_agent_sandbox in create(), so the
Expand Down
Loading