Skip to content

Commit e7f01bb

Browse files
committed
[edgefunc] Many small fixes. Finally starts.
1 parent ad9a550 commit e7f01bb

File tree

8 files changed

+239
-117
lines changed

8 files changed

+239
-117
lines changed

cmd/backplane/main.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/apoxy-dev/apoxy-cli/pkg/backplane/wasm/ext_proc"
3939
"github.com/apoxy-dev/apoxy-cli/pkg/backplane/wasm/manifest"
4040
"github.com/apoxy-dev/apoxy-cli/pkg/cmd/utils"
41+
"github.com/apoxy-dev/apoxy-cli/pkg/edgefunc/runc"
4142
"github.com/apoxy-dev/apoxy-cli/pkg/log"
4243

4344
ctrlv1alpha1 "github.com/apoxy-dev/apoxy-cli/api/controllers/v1alpha1"
@@ -252,7 +253,7 @@ func main() {
252253
log.Fatalf("Failed to add readyz check: %v", err)
253254
}
254255

255-
log.Infof("Setting up controllers")
256+
log.Infof("Setting up controllers...")
256257
proxyOpts := []bpctrl.Option{
257258
bpctrl.WithGoPluginDir(*goPluginDir),
258259
}
@@ -277,6 +278,8 @@ func main() {
277278
apiServerHost = *apiServerAddr
278279
}
279280

281+
log.Infof("Starting Backplane controller")
282+
280283
pctrl := bpctrl.NewProxyReconciler(
281284
mgr.GetClient(),
282285
*proxyName,
@@ -285,19 +288,24 @@ func main() {
285288
proxyOpts...,
286289
)
287290
if err := pctrl.SetupWithManager(ctx, mgr); err != nil {
288-
log.Errorf("failed to set up Backplane controller: %v", err)
289-
return
291+
log.Fatalf("failed to set up Backplane controller: %v", err)
290292
}
291293

294+
log.Infof("Starting EdgeFunction controller")
295+
296+
edgeRuntime, err := runc.NewRuntime(ctx)
297+
if err != nil {
298+
log.Fatalf("failed to set up EdgeFunction controller: %v", err)
299+
}
292300
if err := bpctrl.NewEdgeFunctionRevisionReconciler(
293301
mgr.GetClient(),
294302
net.JoinHostPort(apiServerHost, strconv.Itoa(*wasmStorePort)),
295303
ms,
296304
*goPluginDir,
297305
*esZipDir,
306+
edgeRuntime,
298307
).SetupWithManager(ctx, mgr, *proxyName); err != nil {
299-
log.Errorf("failed to set up EdgeFunction controller: %v", err)
300-
return
308+
log.Fatalf("failed to set up EdgeFunction controller: %v", err)
301309
}
302310

303311
// Setup SIGTERM handler.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ require (
121121
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
122122
github.com/cespare/xxhash/v2 v2.3.0 // indirect
123123
github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect
124+
github.com/cilium/ebpf v0.16.0 // indirect
124125
github.com/containerd/console v1.0.4 // indirect
125126
github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect
126127
github.com/coreos/go-oidc/v3 v3.1.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,8 @@ github.com/checkpoint-restore/go-criu/v6 v6.3.0/go.mod h1:rrRTN/uSwY2X+BPRl/gkul
774774
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
775775
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
776776
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
777+
github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok=
778+
github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE=
777779
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
778780
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
779781
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=

pkg/apiserver/ingest/edgefunction.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ func NewWorker(kc *rest.Config, c versioned.Interface, baseDir string) *worker {
199199
a3y: c,
200200
baseDir: baseDir,
201201
}
202-
w.k8s = kubernetes.NewForConfigOrDie(kc)
202+
if kc != nil {
203+
w.k8s = kubernetes.NewForConfigOrDie(kc)
204+
}
203205
return w
204206
}
205207

@@ -841,6 +843,7 @@ func (w *worker) ListenAndServeEdgeFuncs(host string, port int) error {
841843
mux := http.NewServeMux()
842844
mux.Handle("/wasm/", w)
843845
mux.Handle("/go/", w)
846+
mux.Handle("/js/", w)
844847
addr := fmt.Sprintf("%s:%d", host, port)
845848
log.Infof("Listening on %s", addr)
846849
return http.ListenAndServe(addr, mux)

pkg/backplane/controllers/edgefunction.go

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,21 @@ package controllers
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"net/http"
89
"os"
910
"path/filepath"
10-
"sync"
1111
"time"
1212

1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414
"k8s.io/apimachinery/pkg/types"
15+
"k8s.io/utils/ptr"
1516
ctrl "sigs.k8s.io/controller-runtime"
1617
"sigs.k8s.io/controller-runtime/pkg/builder"
1718
"sigs.k8s.io/controller-runtime/pkg/client"
19+
"sigs.k8s.io/controller-runtime/pkg/controller"
1820
clog "sigs.k8s.io/controller-runtime/pkg/log"
1921
"sigs.k8s.io/controller-runtime/pkg/predicate"
2022
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -41,9 +43,6 @@ type EdgeFunctionRevisionReconciler struct {
4143
goStoreDir string
4244
jsStoreDir string
4345
edgeRuntime edgefunc.Runtime
44-
45-
mu sync.Mutex
46-
edgeFuncs map[string]*v1alpha1.EdgeFunctionRevision
4746
}
4847

4948
// NewEdgeFunctionRevisionReconciler returns a new reconcile.Reconciler.
@@ -53,15 +52,15 @@ func NewEdgeFunctionRevisionReconciler(
5352
wasmStore manifest.Store,
5453
goStoreDir string,
5554
jsStoreDir string,
55+
edgeRuntime edgefunc.Runtime,
5656
) *EdgeFunctionRevisionReconciler {
5757
return &EdgeFunctionRevisionReconciler{
5858
Client: c,
5959
apiserverHost: apiserverHost,
6060
wasmStore: wasmStore,
6161
goStoreDir: goStoreDir,
6262
jsStoreDir: jsStoreDir,
63-
64-
edgeFuncs: make(map[string]*v1alpha1.EdgeFunctionRevision),
63+
edgeRuntime: edgeRuntime,
6564
}
6665
}
6766

@@ -74,17 +73,16 @@ func (r *EdgeFunctionRevisionReconciler) downloadFuncData(
7473

7574
resp, err := http.Get(fmt.Sprintf("http://%s/%s/%s", r.apiserverHost, fnType, ref))
7675
if err != nil {
77-
return nil, fmt.Errorf("failed to download Wasm module: %w", err)
76+
return nil, fmt.Errorf("failed to download edge function: %w", err)
7877
}
79-
8078
if resp.StatusCode != http.StatusOK {
81-
return nil, fmt.Errorf("failed to download Wasm module: %s", resp.Status)
79+
return nil, fmt.Errorf("failed to download edge function: %s", resp.Status)
8280
}
8381
defer resp.Body.Close()
8482

8583
data, err := io.ReadAll(resp.Body)
8684
if err != nil {
87-
return nil, fmt.Errorf("failed to read Wasm module: %w", err)
85+
return nil, fmt.Errorf("failed to read edge function data: %w", err)
8886
}
8987

9088
log.Info("Downloaded EdgeFunction module", "size", len(data), "type", fnType, "ref", ref)
@@ -101,6 +99,55 @@ func hasReadyCondition(conditions []metav1.Condition) bool {
10199
return false
102100
}
103101

102+
func (r *EdgeFunctionRevisionReconciler) reconileEdgeRuntime(ctx context.Context, ref string) error {
103+
log := clog.FromContext(ctx)
104+
105+
esZipPath := filepath.Join(r.jsStoreDir, ref, "bin.eszip")
106+
if _, err := os.Stat(esZipPath); err != nil && !os.IsNotExist(err) {
107+
return fmt.Errorf("Js bundle already exists for ref")
108+
} else if os.IsNotExist(err) {
109+
jsBundle, err := r.downloadFuncData(clog.IntoContext(ctx, log), "js", ref)
110+
if err != nil {
111+
return fmt.Errorf("failed to download Js data: %w", err)
112+
}
113+
114+
if err := os.MkdirAll(filepath.Join(r.jsStoreDir, ref), 0755); err != nil {
115+
return fmt.Errorf("failed to create Js directory: %w", err)
116+
}
117+
118+
if err := os.WriteFile(filepath.Join(r.jsStoreDir, ref, "data"), jsBundle, 0644); err != nil {
119+
return fmt.Errorf("failed to write Js data: %w", err)
120+
}
121+
// Use symlink to prevent Envoy from loading the plugin while it's being written to.
122+
if err := os.Symlink("data", esZipPath); err != nil {
123+
return fmt.Errorf("failed to create Js symlink: %w", err)
124+
}
125+
}
126+
127+
s, err := r.edgeRuntime.Status(ctx, ref)
128+
if err != nil && !errors.Is(err, edgefunc.ErrNotFound) {
129+
return fmt.Errorf("failed to get Edge Runtime status: %w", err)
130+
}
131+
132+
log.Info("Edge Runtime status", "state", s.State)
133+
134+
switch s.State {
135+
case edgefunc.StateRunning, edgefunc.StateCreated:
136+
log.Info("Edge Runtime is already running or created", "state", s.State)
137+
return nil
138+
case edgefunc.StateStopped:
139+
log.Info("Edge Runtime is stopped, starting it")
140+
}
141+
142+
log.Info("Starting Edge Runtime")
143+
144+
if err := r.edgeRuntime.Start(ctx, ref, esZipPath); err != nil {
145+
return fmt.Errorf("failed to start Edge Runtime: %w", err)
146+
}
147+
148+
return nil
149+
}
150+
104151
// Reconcile implements reconcile.Reconciler.
105152
func (r *EdgeFunctionRevisionReconciler) Reconcile(ctx context.Context, request reconcile.Request) (ctrl.Result, error) {
106153
log := clog.FromContext(ctx)
@@ -208,27 +255,9 @@ func (r *EdgeFunctionRevisionReconciler) Reconcile(ctx context.Context, request
208255
} else if rev.Spec.Code.JsSource != nil {
209256
log.Info("Js source detected")
210257

211-
jsBundle, err := r.downloadFuncData(clog.IntoContext(ctx, log), "js", ref)
212-
if err != nil {
213-
return ctrl.Result{}, fmt.Errorf("failed to download Js data: %w", err)
214-
}
215-
216-
if err := os.MkdirAll(filepath.Join(r.jsStoreDir, ref), 0755); err != nil {
217-
return ctrl.Result{}, fmt.Errorf("failed to create Js directory: %w", err)
218-
}
219-
220-
if err := os.WriteFile(filepath.Join(r.jsStoreDir, ref, "data"), jsBundle, 0644); err != nil {
221-
return ctrl.Result{}, fmt.Errorf("failed to write Js data: %w", err)
222-
}
223-
// Use symlink to prevent Envoy from loading the plugin while it's being written to.
224-
esZipPath := filepath.Join(r.jsStoreDir, ref, "bin.eszip")
225-
if err := os.Symlink("data", esZipPath); err != nil {
226-
return ctrl.Result{}, fmt.Errorf("failed to create Js symlink: %w", err)
258+
if err := r.reconileEdgeRuntime(ctx, ref); err != nil {
259+
return ctrl.Result{}, fmt.Errorf("failed to reconile Edge Runtime: %w", err)
227260
}
228-
229-
r.mu.Lock()
230-
r.edgeFuncs[ref] = rev
231-
r.mu.Unlock()
232261
} else {
233262
log.Info("No source detected")
234263
return ctrl.Result{}, nil
@@ -237,33 +266,6 @@ func (r *EdgeFunctionRevisionReconciler) Reconcile(ctx context.Context, request
237266
return ctrl.Result{}, nil
238267
}
239268

240-
func (r *EdgeFunctionRevisionReconciler) reconileEdgeRuntime(ctx context.Context, ref string) error {
241-
log := clog.FromContext(ctx)
242-
243-
s, err := r.edgeRuntime.Status(ctx, ref)
244-
if err != nil {
245-
return fmt.Errorf("failed to get Edge Runtime status: %w", err)
246-
}
247-
248-
switch s.State {
249-
case edgefunc.StateRunning:
250-
log.Info("Edge Runtime is already running")
251-
return nil
252-
case edgefunc.StateCreated:
253-
log.Info("Edge Runtime is already created")
254-
return nil
255-
case edgefunc.StateStopped:
256-
log.Info("Edge Runtime is stopped, starting it")
257-
default:
258-
return fmt.Errorf("Edge Runtime is in an unknown state: %s", s.State)
259-
}
260-
261-
if err := r.edgeRuntime.Start(ctx, ref, filepath.Join(r.jsStoreDir, ref, "bin.eszip")); err != nil {
262-
return fmt.Errorf("failed to start Edge Runtime: %w", err)
263-
}
264-
return nil
265-
}
266-
267269
func targetRefPredicate(proxyName string) predicate.Funcs {
268270
return predicate.NewPredicateFuncs(func(obj client.Object) bool {
269271
if obj == nil {
@@ -309,5 +311,9 @@ func (r *EdgeFunctionRevisionReconciler) SetupWithManager(
309311
targetRefPredicate(proxyName),
310312
),
311313
).
314+
WithOptions(controller.Options{
315+
MaxConcurrentReconciles: 1,
316+
RecoverPanic: ptr.To(true),
317+
}).
312318
Complete(r)
313319
}

0 commit comments

Comments
 (0)