Skip to content

Commit 4e69fb6

Browse files
Pipelines CLI Improvements (#8780)
* Rename `wrangler pipelines show` to `wrangler pipelines get` * Remove `--enable-http|worker-binding` in favor of `--source <sources...>` format Replace `--enable-worker-binding` and `--enable-http` with `--source worker` and `--source http` (or `--source http worker` for both). This also refactors how these bindings get added to the pipeline configuration, opting for a handler pattern instead of multiple if statements. * Remove `--file-template` and `--partition-template` flags from `wrangler pipelines create|update` * Prettify `wrangler pipelines get <pipeline>` output ✨ * Clarify defaults, minimums, and maximums for pipelines commands We specifically do not use yargs `default` field as these values are purely informational and the API is the ultimate decider of what the default value will be at a given time. * Show pipelines command in wrangler help * update snapshots * Fix updating CORS origins for pipelines * Fix upper limit on batch-max-rows --------- Co-authored-by: Peter Bacon Darwin <pbacondarwin@cloudflare.com>
1 parent d4c1171 commit 4e69fb6

File tree

9 files changed

+402
-244
lines changed

9 files changed

+402
-244
lines changed

.changeset/busy-chairs-happen.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"wrangler": patch
3+
---
4+
5+
- Rename `wrangler pipelines show` to `wrangler pipelines get`
6+
- Replace `--enable-worker-binding` and `--enable-http` with `--source worker` and `--source http` (or
7+
`--source http worker` for both)
8+
- Remove `--file-template` and `--partition-template` flags from `wrangler pipelines create|update`
9+
- Add pretty output for `wrangler pipelines get <pipeline>`. Existing output is available using `--format=json`.
10+
- Clarify the minimums, maximums, and defaults (if unset) for `wrangler pipelines create` commands.

packages/wrangler/src/__tests__/index.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ describe("wrangler", () => {
6363
wrangler dispatch-namespace 🏗️ Manage dispatch namespaces
6464
wrangler ai 🤖 Manage AI models
6565
wrangler workflows 🔁 Manage Workflows [open-beta]
66+
wrangler pipelines 🚰 Manage Worker Pipelines [open beta]
6667
wrangler login 🔓 Login to Cloudflare
6768
wrangler logout 🚪 Logout from Cloudflare
6869
wrangler whoami 🕵️ Retrieve your user information
@@ -122,6 +123,7 @@ describe("wrangler", () => {
122123
wrangler dispatch-namespace 🏗️ Manage dispatch namespaces
123124
wrangler ai 🤖 Manage AI models
124125
wrangler workflows 🔁 Manage Workflows [open-beta]
126+
wrangler pipelines 🚰 Manage Worker Pipelines [open beta]
125127
wrangler login 🔓 Login to Cloudflare
126128
wrangler logout 🚪 Logout from Cloudflare
127129
wrangler whoami 🕵️ Retrieve your user information

packages/wrangler/src/__tests__/pipelines.test.ts

Lines changed: 119 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,20 @@ describe("pipelines", () => {
3030
type: "http",
3131
format: "json",
3232
authentication: false,
33+
cors: {
34+
origins: ["*"],
35+
},
3336
},
3437
],
3538
transforms: [],
3639
destination: {
3740
type: "r2",
3841
format: "json",
39-
batch: {},
42+
batch: {
43+
max_bytes: 100000000,
44+
max_duration_s: 300,
45+
max_rows: 100000,
46+
},
4047
compression: {
4148
type: "none",
4249
},
@@ -101,6 +108,18 @@ describe("pipelines", () => {
101108
name: name,
102109
endpoint: "foo",
103110
};
111+
112+
// API will set defaults if not provided
113+
if (!pipeline.destination.batch.max_rows) {
114+
pipeline.destination.batch.max_rows = 10_000_000;
115+
}
116+
if (!pipeline.destination.batch.max_bytes) {
117+
pipeline.destination.batch.max_bytes = 100_000_000;
118+
}
119+
if (!pipeline.destination.batch.max_duration_s) {
120+
pipeline.destination.batch.max_duration_s = 300;
121+
}
122+
104123
return HttpResponse.json(
105124
{
106125
success: !error,
@@ -142,7 +161,7 @@ describe("pipelines", () => {
142161
return requests;
143162
}
144163

145-
function mockShowRequest(
164+
function mockGetRequest(
146165
name: string,
147166
pipeline: Pipeline | null,
148167
status: number = 200,
@@ -255,10 +274,12 @@ describe("pipelines", () => {
255274
expect(std.out).toMatchInlineSnapshot(`
256275
"wrangler pipelines
257276
277+
🚰 Manage Worker Pipelines [open beta]
278+
258279
COMMANDS
259280
wrangler pipelines create <pipeline> Create a new Pipeline
260281
wrangler pipelines list List current Pipelines
261-
wrangler pipelines show <pipeline> Show a Pipeline configuration
282+
wrangler pipelines get <pipeline> Get a Pipeline configuration
262283
wrangler pipelines update <pipeline> Update a Pipeline
263284
wrangler pipelines delete <pipeline> Delete a Pipeline
264285
@@ -286,27 +307,24 @@ describe("pipelines", () => {
286307
pipeline The name of the new pipeline [string] [required]
287308
288309
Source settings
289-
--enable-worker-binding Send data from a Worker to a Pipeline using a Binding [boolean] [default: true]
290-
--enable-http Generate an endpoint to ingest data via HTTP [boolean] [default: true]
291-
--require-http-auth Require Cloudflare API Token for HTTPS endpoint authentication [boolean] [default: false]
292-
--cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array]
310+
--source Space separated list of allowed sources. Options are 'http' or 'worker' [array] [default: [\\"http\\",\\"worker\\"]]
311+
--require-http-auth Require Cloudflare API Token for HTTPS endpoint authentication [boolean] [default: false]
312+
--cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin). Defaults to an empty array [array]
293313
294314
Batch hints
295-
--batch-max-mb Maximum batch size in megabytes before flushing [number]
296-
--batch-max-rows Maximum number of rows per batch before flushing [number]
297-
--batch-max-seconds Maximum age of batch in seconds before flushing [number]
315+
--batch-max-mb Maximum batch size in megabytes before flushing. Defaults to 100 MB if unset. Minimum: 1, Maximum: 100 [number]
316+
--batch-max-rows Maximum number of rows per batch before flushing. Defaults to 10,000,000 if unset. Minimum: 100, Maximum: 10,000,000 [number]
317+
--batch-max-seconds Maximum age of batch in seconds before flushing. Defaults to 300 if unset. Minimum: 1, Maximum: 300 [number]
298318
299319
Destination settings
300320
--r2-bucket Destination R2 bucket name [string] [required]
301321
--r2-access-key-id R2 service Access Key ID for authentication. Leave empty for OAuth confirmation. [string]
302322
--r2-secret-access-key R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation. [string]
303-
--r2-prefix Prefix for storing files in the destination bucket [string] [default: \\"\\"]
323+
--r2-prefix Prefix for storing files in the destination bucket. Default is no prefix [string] [default: \\"\\"]
304324
--compression Compression format for output files [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] [default: \\"gzip\\"]
305-
--file-template Template for individual file names (must include \${slug}). For example: \\"\${slug}.log.gz\\" [string]
306-
--partition-template Path template for partitioned files in the bucket. If not specified, the default will be used [string]
307325
308326
Pipeline settings
309-
--shard-count Number of pipeline shards. More shards handle higher request volume; fewer shards produce larger output files [number]
327+
--shard-count Number of pipeline shards. More shards handle higher request volume; fewer shards produce larger output files. Defaults to 2 if unset. Minimum: 1, Maximum: 15 [number]
310328
311329
GLOBAL FLAGS
312330
-c, --config Path to Wrangler configuration file [string]
@@ -325,10 +343,30 @@ describe("pipelines", () => {
325343
expect(requests.count).toEqual(1);
326344
expect(std.out).toMatchInlineSnapshot(`
327345
"🌀 Creating Pipeline named \\"my-pipeline\\"
328-
✅ Successfully created Pipeline \\"my-pipeline\\" with id 0001
346+
✅ Successfully created Pipeline \\"my-pipeline\\" with ID 0001
347+
348+
Id: 0001
349+
Name: my-pipeline
350+
Sources:
351+
HTTP:
352+
Endpoint: foo
353+
Authentication: off
354+
Format: JSON
355+
Worker:
356+
Format: JSON
357+
Destination:
358+
Type: R2
359+
Bucket: test-bucket
360+
Format: newline-delimited JSON
361+
Compression: GZIP
362+
Batch hints:
363+
Max bytes: 100 MB
364+
Max duration: 300 seconds
365+
Max records: 10,000,000
366+
329367
🎉 You can now send data to your Pipeline!
330368
331-
To start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration:
369+
To send data to your pipeline from a Worker, add the following to your wrangler config file:
332370
333371
{
334372
\\"pipelines\\": [
@@ -341,7 +379,7 @@ describe("pipelines", () => {
341379
342380
Send data to your Pipeline's HTTP endpoint:
343381
344-
curl \\"foo\\" -d '[{\\"foo\\": \\"bar\\"}]'
382+
curl \\"foo\\" -d '[{\\"foo\\": \\"bar\\"}]'
345383
"
346384
`);
347385
});
@@ -369,16 +407,16 @@ describe("pipelines", () => {
369407
expect(requests.count).toEqual(1);
370408

371409
// contain http source and include auth
372-
expect(requests.body?.source[1].type).toEqual("http");
373-
expect((requests.body?.source[1] as HttpSource).authentication).toEqual(
410+
expect(requests.body?.source[0].type).toEqual("http");
411+
expect((requests.body?.source[0] as HttpSource).authentication).toEqual(
374412
true
375413
);
376414
});
377415

378416
it("should create a pipeline without http", async () => {
379417
const requests = mockCreateRequest("my-pipeline");
380418
await runWrangler(
381-
"pipelines create my-pipeline --enable-http=false --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret"
419+
"pipelines create my-pipeline --source worker --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret"
382420
);
383421
expect(requests.count).toEqual(1);
384422

@@ -409,15 +447,44 @@ describe("pipelines", () => {
409447
expect(requests.count).toEqual(1);
410448
});
411449

412-
describe("show", () => {
413-
it("should show pipeline", async () => {
414-
const requests = mockShowRequest("foo", samplePipeline);
415-
await runWrangler("pipelines show foo");
450+
describe("get", () => {
451+
it("should get pipeline pretty", async () => {
452+
const requests = mockGetRequest("foo", samplePipeline);
453+
await runWrangler("pipelines get foo");
416454

417455
expect(std.err).toMatchInlineSnapshot(`""`);
418456
expect(std.out).toMatchInlineSnapshot(`
419-
"Retrieving config for Pipeline \\"foo\\".
420-
{
457+
"Id: 0001
458+
Name: my-pipeline
459+
Sources:
460+
HTTP:
461+
Endpoint: https://0001.pipelines.cloudflarestorage.com
462+
Authentication: off
463+
CORS Origins: *
464+
Format: JSON
465+
Worker:
466+
Format: JSON
467+
Destination:
468+
Type: R2
469+
Bucket: bucket
470+
Format: newline-delimited JSON
471+
Compression: NONE
472+
Batch hints:
473+
Max bytes: 100 MB
474+
Max duration: 300 seconds
475+
Max records: 100,000
476+
"
477+
`);
478+
expect(requests.count).toEqual(1);
479+
});
480+
481+
it("should get pipeline json", async () => {
482+
const requests = mockGetRequest("foo", samplePipeline);
483+
await runWrangler("pipelines get foo --format=json");
484+
485+
expect(std.err).toMatchInlineSnapshot(`""`);
486+
expect(std.out).toMatchInlineSnapshot(`
487+
"{
421488
\\"id\\": \\"0001\\",
422489
\\"version\\": 1,
423490
\\"name\\": \\"my-pipeline\\",
@@ -430,14 +497,23 @@ describe("pipelines", () => {
430497
{
431498
\\"type\\": \\"http\\",
432499
\\"format\\": \\"json\\",
433-
\\"authentication\\": false
500+
\\"authentication\\": false,
501+
\\"cors\\": {
502+
\\"origins\\": [
503+
\\"*\\"
504+
]
505+
}
434506
}
435507
],
436508
\\"transforms\\": [],
437509
\\"destination\\": {
438510
\\"type\\": \\"r2\\",
439511
\\"format\\": \\"json\\",
440-
\\"batch\\": {},
512+
\\"batch\\": {
513+
\\"max_bytes\\": 100000000,
514+
\\"max_duration_s\\": 300,
515+
\\"max_rows\\": 100000
516+
},
441517
\\"compression\\": {
442518
\\"type\\": \\"none\\"
443519
},
@@ -452,20 +528,19 @@ describe("pipelines", () => {
452528
});
453529

454530
it("should fail on missing pipeline", async () => {
455-
const requests = mockShowRequest("bad-pipeline", null, 404, {
531+
const requests = mockGetRequest("bad-pipeline", null, 404, {
456532
code: 1000,
457533
message: "Pipeline does not exist",
458534
});
459535
await expect(
460-
runWrangler("pipelines show bad-pipeline")
536+
runWrangler("pipelines get bad-pipeline")
461537
).rejects.toThrowError();
462538

463539
await endEventLoop();
464540

465541
expect(std.err).toMatchInlineSnapshot(`""`);
466542
expect(normalizeOutput(std.out)).toMatchInlineSnapshot(`
467-
"Retrieving config for Pipeline \\"bad-pipeline\\".
468-
X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/pipelines/bad-pipeline) failed.
543+
"X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/pipelines/bad-pipeline) failed.
469544
Pipeline does not exist [code: 1000]
470545
If you think this is a bug, please open an issue at:
471546
https://github.com/cloudflare/workers-sdk/issues/new/choose"
@@ -477,7 +552,7 @@ describe("pipelines", () => {
477552
describe("update", () => {
478553
it("should update a pipeline", async () => {
479554
const pipeline: Pipeline = samplePipeline;
480-
mockShowRequest(pipeline.name, pipeline);
555+
mockGetRequest(pipeline.name, pipeline);
481556

482557
const update = JSON.parse(JSON.stringify(pipeline));
483558
update.destination.compression.type = "gzip";
@@ -489,7 +564,7 @@ describe("pipelines", () => {
489564

490565
it("should update a pipeline with new bucket", async () => {
491566
const pipeline: Pipeline = samplePipeline;
492-
mockShowRequest(pipeline.name, pipeline);
567+
mockGetRequest(pipeline.name, pipeline);
493568

494569
const update = JSON.parse(JSON.stringify(pipeline));
495570
update.destination.path.bucket = "new_bucket";
@@ -509,7 +584,7 @@ describe("pipelines", () => {
509584

510585
it("should update a pipeline with new credential", async () => {
511586
const pipeline: Pipeline = samplePipeline;
512-
mockShowRequest(pipeline.name, pipeline);
587+
mockGetRequest(pipeline.name, pipeline);
513588

514589
const update = JSON.parse(JSON.stringify(pipeline));
515590
update.destination.path.bucket = "new-bucket";
@@ -529,7 +604,7 @@ describe("pipelines", () => {
529604

530605
it("should update a pipeline with source changes http auth", async () => {
531606
const pipeline: Pipeline = samplePipeline;
532-
mockShowRequest(pipeline.name, pipeline);
607+
mockGetRequest(pipeline.name, pipeline);
533608

534609
const update = JSON.parse(JSON.stringify(pipeline));
535610
update.source = [
@@ -542,7 +617,7 @@ describe("pipelines", () => {
542617
const updateReq = mockUpdateRequest(update.name, update);
543618

544619
await runWrangler(
545-
"pipelines update my-pipeline --enable-worker-binding=false --enable-http --require-http-auth"
620+
"pipelines update my-pipeline --source http --require-http-auth"
546621
);
547622

548623
expect(updateReq.count).toEqual(1);
@@ -555,7 +630,7 @@ describe("pipelines", () => {
555630

556631
it("should update a pipeline cors headers", async () => {
557632
const pipeline: Pipeline = samplePipeline;
558-
mockShowRequest(pipeline.name, pipeline);
633+
mockGetRequest(pipeline.name, pipeline);
559634

560635
const update = JSON.parse(JSON.stringify(pipeline));
561636
update.source = [
@@ -568,19 +643,19 @@ describe("pipelines", () => {
568643
const updateReq = mockUpdateRequest(update.name, update);
569644

570645
await runWrangler(
571-
"pipelines update my-pipeline --enable-worker-binding=false --enable-http --cors-origins http://localhost:8787"
646+
"pipelines update my-pipeline --cors-origins http://localhost:8787"
572647
);
573648

574649
expect(updateReq.count).toEqual(1);
575-
expect(updateReq.body?.source.length).toEqual(1);
576-
expect(updateReq.body?.source[0].type).toEqual("http");
577-
expect((updateReq.body?.source[0] as HttpSource).cors?.origins).toEqual([
650+
expect(updateReq.body?.source.length).toEqual(2);
651+
expect(updateReq.body?.source[1].type).toEqual("http");
652+
expect((updateReq.body?.source[1] as HttpSource).cors?.origins).toEqual([
578653
"http://localhost:8787",
579654
]);
580655
});
581656

582657
it("should fail a missing pipeline", async () => {
583-
const requests = mockShowRequest("bad-pipeline", null, 404, {
658+
const requests = mockGetRequest("bad-pipeline", null, 404, {
584659
code: 1000,
585660
message: "Pipeline does not exist",
586661
});
@@ -604,7 +679,7 @@ describe("pipelines", () => {
604679

605680
it("should remove transformations", async () => {
606681
const pipeline: Pipeline = samplePipeline;
607-
mockShowRequest(pipeline.name, pipeline);
682+
mockGetRequest(pipeline.name, pipeline);
608683

609684
const update = JSON.parse(JSON.stringify(pipeline));
610685
update.transforms = [

packages/wrangler/src/index.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -955,10 +955,14 @@ export function createCLIParser(argv: string[]) {
955955
]);
956956
registry.registerNamespace("workflows");
957957

958-
// pipelines
959-
wrangler.command("pipelines", false, (pipelinesYargs) => {
960-
return pipelines(pipelinesYargs.command(subHelp));
961-
});
958+
// [OPEN BETA] pipelines
959+
wrangler.command(
960+
"pipelines",
961+
`🚰 Manage Worker Pipelines ${chalk.hex(betaCmdColor)("[open beta]")}`,
962+
(pipelinesYargs) => {
963+
return pipelines(pipelinesYargs.command(subHelp));
964+
}
965+
);
962966

963967
/******************** CMD GROUP ***********************/
964968

0 commit comments

Comments
 (0)