Skip to content
30 changes: 18 additions & 12 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"flag"
"fmt"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -48,6 +49,7 @@ func main() {
userContainerPort int
maxConcurrency int
maxQueueLength int
hasTCPProbe bool
clusterConfigPath string
)

Expand All @@ -56,6 +58,7 @@ func main() {
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
flag.BoolVar(&hasTCPProbe, "has-tcp-probe", false, "tcp probe to the user-provided container port")
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
flag.Parse()

Expand Down Expand Up @@ -142,7 +145,7 @@ func main() {

adminHandler := http.NewServeMux()
adminHandler.Handle("/metrics", promStats)
adminHandler.Handle("/healthz", readinessTCPHandler(userContainerPort, log))
adminHandler.Handle("/healthz", readinessTCPHandler(userContainerPort, hasTCPProbe, log))

servers := map[string]*http.Server{
"proxy": {
Expand Down Expand Up @@ -201,19 +204,22 @@ func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) {
os.Exit(1)
}

func readinessTCPHandler(port int, logger *zap.SugaredLogger) http.HandlerFunc {
func readinessTCPHandler(port int, enableTCPProbe bool, logger *zap.SugaredLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
timeout := time.Duration(1) * time.Second
address := net.JoinHostPort("localhost", strconv.FormatInt(int64(port), 10))

conn, err := net.DialTimeout("tcp", address, timeout)
if err != nil {
logger.Warn(errors.Wrap(err, "TCP probe to user-provided container port failed"))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("unhealthy"))
return
if enableTCPProbe {
ctx := r.Context()
address := net.JoinHostPort("localhost", fmt.Sprintf("%d", port))

var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", address)
if err != nil {
logger.Warn(errors.Wrap(err, "TCP probe to user-provided container port failed"))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("unhealthy"))
return
}
_ = conn.Close()
}
_ = conn.Close()

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("healthy"))
Expand Down
16 changes: 15 additions & 1 deletion pkg/workloads/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[st
if container == nil {
continue
}

if container.ReadinessProbe != nil {
probes[container.Name] = *GetProbeSpec(container.ReadinessProbe)
}
Expand All @@ -92,6 +91,21 @@ func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[st
return probes
}

func HasReadinessProbesTargetingPort(containers []*userconfig.Container, targetPort int32) bool {
for _, container := range containers {
if container == nil || container.ReadinessProbe == nil {
continue
}

probe := container.ReadinessProbe
if (probe.TCPSocket != nil && probe.TCPSocket.Port == targetPort) ||
probe.HTTPGet != nil && probe.HTTPGet.Port == targetPort {
return true
}
}
return false
}

func BaseClusterEnvVars() []kcore.EnvFromSource {
envVars := []kcore.EnvFromSource{
{
Expand Down
8 changes: 6 additions & 2 deletions pkg/workloads/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co
}

func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
proxyHasTCPProbe := !HasReadinessProbesTargetingPort(api.Pod.Containers, *api.Pod.Port)

return kcore.Container{
Name: ProxyContainerName,
Image: config.ClusterConfig.ImageProxy,
Expand All @@ -189,6 +191,8 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
s.Int32(int32(api.Pod.MaxConcurrency)),
"--max-queue-length",
s.Int32(int32(api.Pod.MaxQueueLength)),
"--has-tcp-probe",
s.Bool(proxyHasTCPProbe),
},
Ports: []kcore.ContainerPort{
{Name: consts.AdminPortName, ContainerPort: consts.AdminPortInt32},
Expand All @@ -213,10 +217,10 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
},
},
InitialDelaySeconds: 1,
TimeoutSeconds: 1,
TimeoutSeconds: 3,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 1,
FailureThreshold: 3,
},
}, ClusterConfigVolume()
}
Expand Down