Skip to content

Commit ae25f18

Browse files
authored
Update in-flight request counter, switch to FastAPI + Uvicorn (#838)
1 parent 8e56648 commit ae25f18

28 files changed

+451
-220
lines changed

cli/cmd/get.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func apiTable(apis []spec.API, statuses []status.Status, allMetrics []metrics.Me
181181
{Title: "requested"},
182182
{Title: "failed", Hidden: totalFailed == 0},
183183
{Title: "last update"},
184-
{Title: "avg inference"},
184+
{Title: "avg request"},
185185
{Title: "2XX"},
186186
{Title: "4XX", Hidden: total4XX == 0},
187187
{Title: "5XX", Hidden: total5XX == 0},

docs/deployments/autoscaling.md

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,48 @@ _WARNING: you are on the master branch, please refer to the docs on the branch t
44

55
Cortex autoscales your web services based on your configuration.
66

7+
## Replica Parallelism
8+
9+
* `workers_per_replica` (default: 1): Each replica runs a web server with `workers_per_replica` workers, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 workers per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `workers_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API.
10+
11+
* `threads_per_worker` (default: 1): Each worker uses a thread pool of size `threads_per_worker` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files) or GPU-based inference, increasing the number of threads per worker can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per worker is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per worker.
12+
13+
`workers_per_replica` * `threads_per_worker` represents the number of requests that your replica can work in parallel. For example, if `workers_per_replica` is 2 and `threads_per_worker` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3.
14+
715
## Autoscaling Replicas
816

9-
Cortex adjusts the number of replicas that are serving predictions by monitoring the compute resource usage of each API. The number of replicas will be at least `min_replicas` and no more than `max_replicas`.
17+
* `min_replicas`: The lower bound on how many replicas can be running for an API.
18+
19+
* `max_replicas`: The upper bound on how many replicas can be running for an API.
20+
21+
* `target_replica_concurrency` (default: `workers_per_replica` * `threads_per_worker`): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions.
22+
23+
Replica concurrency is simply how many requests have been sent to a replica and have not yet been responded to (also referred to as in-flight requests). Therefore, it includes requests which are currently being processed and requests which are waiting in the replica's queue.
24+
25+
The autoscaler uses this formula to determine the number of desired replicas:
26+
27+
`desired replicas = sum(in-flight requests in each replica) / target_replica_concurrency`
28+
29+
For example, setting `target_replica_concurrency` to `workers_per_replica` * `threads_per_worker` (the default) causes the cluster to adjust the number of replicas so that on average, requests are immediately processed without waiting in a queue, and workers/threads are never idle.
30+
31+
* `max_replica_concurrency` (default: 1024): This is the maximum number of in-flight requests per replica before requests are rejected with HTTP error code 503. `max_replica_concurrency` includes requests that are currently being processed as well as requests that are waiting in the replica's queue (a replica can actively process `workers_per_replica` * `threads_per_worker` requests concurrently, and will hold any additional requests in a local queue). Decreasing `max_replica_concurrency` and configuring the client to retry when it receives 503 responses will improve queue fairness by preventing requests from sitting in long queues.
32+
33+
Note (if `workers_per_replica` > 1): Because requests are randomly assigned to workers within a replica (which leads to unbalanced worker queues), clients may receive 503 responses before reaching `max_replica_concurrency`. For example, if you set `workers_per_replica: 2` and `max_replica_concurrency: 100`, each worker will have a maximum queue length of 50 requests. If your replica receives 90 requests, there is a possibility that more than 50 requests are routed to 1 worker, therefore each additional request beyond the 50 requests are responded with a 503.
34+
35+
* `window` (default: 60s): The time over which to average the API wide in-flight requests (which is the sum of in-flight requests in each replica). The longer the window, the slower the autoscaler will react to changes in API wide in-flight requests, since it is averaged over the `window`. API wide in-flight requests is calculated every 10 seconds, so `window` must be a multiple of 10 seconds.
36+
37+
* `downscale_stabilization_period` (default: 5m): The API will not scale below the highest recommendation made during this period. Every 10 seconds, the autoscaler makes a recommendation based on all of the other configuration parameters described here. It will then take the max of the current recommendation and all recommendations made during the `downscale_stabilization_period`, and use that to determine the final number of replicas to scale to. Increasing this value will cause the cluster to react more slowly to decreased traffic, and will reduce thrashing.
38+
39+
* `upscale_stabilization_period` (default: 0m): The API will not scale above the lowest recommendation made during this period. Every 10 seconds, the autoscaler makes a recommendation based on all of the other configuration parameters described here. It will then take the min of the current recommendation and all recommendations made during the `upscale_stabilization_period`, and use that to determine the final number of replicas to scale to. Increasing this value will cause the cluster to react more slowly to increased traffic, and will reduce thrashing. The default is 0 minutes, which means that the cluster will react quickly to increased traffic.
40+
41+
* `max_downscale_factor` (default: 0.5): The maximum factor by which to scale down the API on a single scaling event. For example, if `max_downscale_factor` is 0.5 and there are 10 running replicas, the autoscaler will not recommend fewer than 5 replicas. Increasing this number will allow the cluster to shrink more quickly in response to dramatic dips in traffic.
42+
43+
* `max_upscale_factor` (default: 10): The maximum factor by which to scale up the API on a single scaling event. For example, if `max_upscale_factor` is 10 and there are 5 running replicas, the autoscaler will not recommend more than 50 replicas. Increasing this number will allow the cluster to grow more quickly in response to dramatic spikes in traffic.
44+
45+
* `downscale_tolerance` (default: 0.1): Any recommendation falling within this factor below the current number of replicas will not trigger a scale down event. For example, if `downscale_tolerance` is 0.1 and there are 20 running replicas, a recommendation of 18 or 19 replicas will not be acted on, and the API will remain at 20 replicas. Increasing this value will prevent thrashing, but setting it too high will prevent the cluster from maintaining it's optimal size.
46+
47+
* `upscale_tolerance` (default: 0.1): Any recommendation falling within this factor above the current number of replicas will not trigger a scale up event. For example, if `upscale_tolerance` is 0.1 and there are 20 running replicas, a recommendation of 21 or 22 replicas will not be acted on, and the API will remain at 20 replicas. Increasing this value will prevent thrashing, but setting it too high will prevent the cluster from maintaining it's optimal size.
1048

1149
## Autoscaling Nodes
1250

13-
Cortex spins up and down nodes based on the aggregate resource requests of all APIs. The number of nodes will be at least `min_instances` and no more than `max_instances` (configured during installation and modifiable via `cortex cluster update` or the [AWS console](https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-manual-scaling.html)).
51+
Cortex spins up and down nodes based on the aggregate resource requests of all APIs. The number of nodes will be at least `min_instances` and no more than `max_instances` ([configured during installation](../cluster-management/config.md) and modifiable via `cortex cluster update`).

docs/deployments/onnx.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ You can deploy ONNX models as web services by defining a class that implements C
2929
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
3030
workers_per_replica: <int> # the number of parallel serving workers to run on each replica (default: 1)
3131
threads_per_worker: <int> # the number of threads per worker (default: 1)
32-
target_queue_length: <float> # the desired queue length per replica (default: 0)
33-
window: <duration> # the time over which to average the API's queue length (default: 60s)
32+
target_replica_concurrency: <float> # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker)
33+
max_replica_concurrency: <int> # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024)
34+
window: <duration> # the time over which to average the API's concurrency (default: 60s)
3435
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
3536
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 0m)
3637
max_downscale_factor: <float> # the maximum factor by which to scale down the API on a single scaling event (default: 0.5)
@@ -129,6 +130,7 @@ dill==0.3.1.1
129130
msgpack==0.6.2
130131
numpy==1.18.0
131132
onnxruntime==1.1.0
133+
pyyaml==5.3
132134
requests==2.22.0
133135
```
134136

docs/deployments/python.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ In addition to supporting Python models via the Python Predictor interface, Cort
3333
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
3434
workers_per_replica: <int> # the number of parallel serving workers to run on each replica (default: 1)
3535
threads_per_worker: <int> # the number of threads per worker (default: 1)
36-
target_queue_length: <float> # the desired queue length per replica (default: 0)
37-
window: <duration> # the time over which to average the API's queue length (default: 60s)
36+
target_replica_concurrency: <float> # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker)
37+
max_replica_concurrency: <int> # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024)
38+
window: <duration> # the time over which to average the API's concurrency (default: 60s)
3839
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
3940
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 0m)
4041
max_downscale_factor: <float> # the maximum factor by which to scale down the API on a single scaling event (default: 0.5)
@@ -155,6 +156,7 @@ numpy==1.18.0
155156
pandas==0.25.3
156157
opencv-python==4.1.2.30
157158
Pillow==6.2.1
159+
pyyaml==5.3
158160
requests==2.22.0
159161
scikit-image==0.16.2
160162
scikit-learn==0.22

docs/deployments/tensorflow.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ You can deploy TensorFlow models as web services by defining a class that implem
3030
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
3131
workers_per_replica: <int> # the number of parallel serving workers to run on each replica (default: 1)
3232
threads_per_worker: <int> # the number of threads per worker (default: 1)
33-
target_queue_length: <float> # the desired queue length per replica (default: 0)
34-
window: <duration> # the time over which to average the API's queue length (default: 60s)
33+
target_replica_concurrency: <float> # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker)
34+
max_replica_concurrency: <int> # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024)
35+
window: <duration> # the time over which to average the API's concurrency (default: 60s)
3536
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
3637
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 0m)
3738
max_downscale_factor: <float> # the maximum factor by which to scale down the API on a single scaling event (default: 0.5)
@@ -122,6 +123,7 @@ msgpack==0.6.2
122123
numpy==1.18.0
123124
requests==2.22.0
124125
opencv-python==4.1.2.30
126+
pyyaml==5.3
125127
tensor2tensor==1.15.4
126128
tensorflow-hub==0.7.0
127129
tensorflow==2.1.0

examples/pytorch/sentiment-analyzer/cortex.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# WARNING: you are on the master branch, please refer to the examples on the branch that matches your `cortex version`
22

3-
- name: analyzer
3+
- name: sentiment-analyzer
44
predictor:
55
type: python
66
path: predictor.py

images/request-monitor/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ RUN GO111MODULE=on CGO_ENABLED=0 GOOS=linux go build -installsuffix cgo -o reque
1010

1111
FROM alpine:3.11
1212

13-
RUN apk --no-cache add ca-certificates bash iproute2
13+
RUN apk --no-cache add ca-certificates bash
1414

1515
COPY --from=builder /go/src/github.com/cortexlabs/cortex/images/request-monitor/request-monitor /root/
1616
RUN chmod +x /root/request-monitor

images/request-monitor/request-monitor.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"bytes"
2120
"fmt"
2221
"log"
2322
"os"
24-
"os/exec"
25-
"strings"
2623
"sync"
2724
"time"
2825

@@ -79,14 +76,16 @@ func main() {
7976
client = cloudwatch.New(sess)
8077
requestCounter := Counter{}
8178

79+
os.OpenFile("/request_monitor_ready.txt", os.O_RDONLY|os.O_CREATE, 0666)
80+
8281
for {
83-
if _, err := os.Stat("/mnt/health_check.txt"); err == nil {
82+
if _, err := os.Stat("/mnt/api_readiness.txt"); err == nil {
8483
break
8584
} else if os.IsNotExist(err) {
86-
fmt.Println("waiting...")
85+
fmt.Println("waiting for replica to be ready...")
8786
time.Sleep(_tickInterval)
8887
} else {
89-
log.Printf("error encountered while looking for /mnt/health_check.txt") // unexpected
88+
log.Printf("error encountered while looking for /mnt/api_readiness.txt") // unexpected
9089
time.Sleep(_tickInterval)
9190
}
9291
}
@@ -164,22 +163,21 @@ func publishStats(apiName string, counter *Counter, client *cloudwatch.CloudWatc
164163
}
165164
}
166165

167-
func updateOpenConnections(requestCounter *Counter, timer *time.Timer) {
168-
cmd := exec.Command("ss")
169-
var out bytes.Buffer
170-
cmd.Stdout = &out
171-
err := cmd.Run()
166+
func getFileCount() int {
167+
dir, err := os.Open("/mnt/requests")
172168
if err != nil {
173-
log.Fatal(err)
169+
panic(err)
174170
}
175-
176-
output := out.String()
177-
count := 0
178-
for _, str := range strings.Split(output, "\n") {
179-
if strings.Contains(str, ":8888 ") && strings.Contains(str, "ESTAB") {
180-
count++
181-
}
171+
defer dir.Close()
172+
fileNames, err := dir.Readdirnames(0)
173+
if err != nil {
174+
panic(err)
182175
}
176+
return len(fileNames)
177+
}
178+
179+
func updateOpenConnections(requestCounter *Counter, timer *time.Timer) {
180+
count := getFileCount()
183181
requestCounter.Append(count)
184182
timer.Reset(_requestSampleInterval)
185183
}

pkg/lib/k8s/pod.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package k8s
1919
import (
2020
"bytes"
2121
"regexp"
22+
"strings"
2223
"time"
2324

2425
"github.com/cortexlabs/cortex/pkg/lib/errors"
@@ -157,18 +158,20 @@ func GetPodStatus(pod *kcore.Pod) PodStatus {
157158
for _, containerStatus := range pod.Status.ContainerStatuses {
158159
if containerStatus.LastTerminationState.Terminated != nil {
159160
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
160-
if exitCode == 137 {
161-
return PodStatusKilledOOM
162-
}
161+
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
163162
if _killStatuses[exitCode] {
163+
if strings.Contains(reason, "oom") {
164+
return PodStatusKilledOOM
165+
}
164166
return PodStatusKilled
165167
}
166168
} else if containerStatus.State.Terminated != nil {
167169
exitCode := containerStatus.State.Terminated.ExitCode
168-
if exitCode == 137 {
169-
return PodStatusKilledOOM
170-
}
170+
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
171171
if _killStatuses[exitCode] {
172+
if strings.Contains(reason, "oom") {
173+
return PodStatusKilledOOM
174+
}
172175
return PodStatusKilled
173176
}
174177
}
@@ -200,23 +203,29 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P
200203
numRunning++
201204
} else if containerStatus.State.Terminated != nil {
202205
exitCode := containerStatus.State.Terminated.ExitCode
206+
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
203207
if exitCode == 0 {
204208
numSucceeded++
205-
} else if exitCode == 137 {
206-
numKilledOOM++
207209
} else if _killStatuses[exitCode] {
208-
numKilled++
210+
if strings.Contains(reason, "oom") {
211+
numKilledOOM++
212+
} else {
213+
numKilled++
214+
}
209215
} else {
210216
numFailed++
211217
}
212218
} else if containerStatus.LastTerminationState.Terminated != nil {
213219
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
220+
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
214221
if exitCode == 0 {
215222
numSucceeded++
216-
} else if exitCode == 137 {
217-
numKilledOOM++
218223
} else if _killStatuses[exitCode] {
219-
numKilled++
224+
if strings.Contains(reason, "oom") {
225+
numKilledOOM++
226+
} else {
227+
numKilled++
228+
}
220229
} else {
221230
numFailed++
222231
}

0 commit comments

Comments
 (0)