Create or update a Logstash pipeline Generally available; Added in 7.12.0

PUT /_logstash/pipeline/{id}

Create a pipeline that is used for Logstash Central Management. If the specified pipeline exists, it is replaced.

Required authorization

  • Cluster privileges: manage_logstash_pipelines
External documentation

Path parameters

  • id string Required

    An identifier for the pipeline.

application/json

Body Required

  • description string Required

    A description of the pipeline. This description is not used by Elasticsearch or Logstash.

  • last_modified string | number Required

    A date and time, either as a string whose format can depend on the context (defaulting to ISO 8601), or a number of milliseconds since the Epoch. Elasticsearch accepts both as input, but will generally output a string representation.

    One of:

    Time unit for milliseconds

  • pipeline string Required

    The configuration for the pipeline.

    External documentation
  • pipeline_metadata object Required
    Hide pipeline_metadata attributes Show pipeline_metadata attributes object
    • type string Required
    • version string Required
  • pipeline_settings object Required
    Hide pipeline_settings attributes Show pipeline_settings attributes object
    • pipeline.workers number Required

      The number of workers that will, in parallel, execute the filter and output stages of the pipeline.

    • pipeline.batch.size number Required

      The maximum number of events an individual worker thread will collect from inputs before attempting to execute its filters and outputs.

    • pipeline.batch.delay number Required

      When creating pipeline event batches, how long in milliseconds to wait for each event before dispatching an undersized batch to pipeline workers.

    • queue.type string Required

      The internal queuing model to use for event buffering.

    • queue.max_bytes string Required

      The total capacity of the queue (queue.type: persisted) in number of bytes.

    • queue.checkpoint.writes number Required

      The maximum number of written events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted).

  • username string Required

    The user who last updated the pipeline.

Responses

  • 200 application/json
PUT _logstash/pipeline/my_pipeline { "description": "Sample pipeline for illustration purposes", "last_modified": "2021-01-02T02:50:51.250Z", "pipeline_metadata": { "type": "logstash_pipeline", "version": 1 }, "username": "elastic", "pipeline": "input {}\\n filter { grok {} }\\n output {}", "pipeline_settings": { "pipeline.workers": 1, "pipeline.batch.size": 125, "pipeline.batch.delay": 50, "queue.type": "memory", "queue.max_bytes": "1gb", "queue.checkpoint.writes": 1024 } }
resp = client.logstash.put_pipeline( id="my_pipeline", pipeline={ "description": "Sample pipeline for illustration purposes", "last_modified": "2021-01-02T02:50:51.250Z", "pipeline_metadata": { "type": "logstash_pipeline", "version": 1 }, "username": "elastic", "pipeline": "input {}\\n filter { grok {} }\\n output {}", "pipeline_settings": { "pipeline.workers": 1, "pipeline.batch.size": 125, "pipeline.batch.delay": 50, "queue.type": "memory", "queue.max_bytes": "1gb", "queue.checkpoint.writes": 1024 } }, )
const response = await client.logstash.putPipeline({ id: "my_pipeline", pipeline: { description: "Sample pipeline for illustration purposes", last_modified: "2021-01-02T02:50:51.250Z", pipeline_metadata: { type: "logstash_pipeline", version: 1, }, username: "elastic", pipeline: "input {}\\n filter { grok {} }\\n output {}", pipeline_settings: { "pipeline.workers": 1, "pipeline.batch.size": 125, "pipeline.batch.delay": 50, "queue.type": "memory", "queue.max_bytes": "1gb", "queue.checkpoint.writes": 1024, }, }, });
response = client.logstash.put_pipeline( id: "my_pipeline", body: { "description": "Sample pipeline for illustration purposes", "last_modified": "2021-01-02T02:50:51.250Z", "pipeline_metadata": { "type": "logstash_pipeline", "version": 1 }, "username": "elastic", "pipeline": "input {}\\n filter { grok {} }\\n output {}", "pipeline_settings": { "pipeline.workers": 1, "pipeline.batch.size": 125, "pipeline.batch.delay": 50, "queue.type": "memory", "queue.max_bytes": "1gb", "queue.checkpoint.writes": 1024 } } )
$resp = $client->logstash()->putPipeline([ "id" => "my_pipeline", "body" => [ "description" => "Sample pipeline for illustration purposes", "last_modified" => "2021-01-02T02:50:51.250Z", "pipeline_metadata" => [ "type" => "logstash_pipeline", "version" => 1, ], "username" => "elastic", "pipeline" => "input {}\\n filter { grok {} }\\n output {}", "pipeline_settings" => [ "pipeline.workers" => 1, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "queue.type" => "memory", "queue.max_bytes" => "1gb", "queue.checkpoint.writes" => 1024, ], ], ]);
curl -X PUT -H "Authorization: ApiKey $ELASTIC_API_KEY" -H "Content-Type: application/json" -d '{"description":"Sample pipeline for illustration purposes","last_modified":"2021-01-02T02:50:51.250Z","pipeline_metadata":{"type":"logstash_pipeline","version":1},"username":"elastic","pipeline":"input {}\\n filter { grok {} }\\n output {}","pipeline_settings":{"pipeline.workers":1,"pipeline.batch.size":125,"pipeline.batch.delay":50,"queue.type":"memory","queue.max_bytes":"1gb","queue.checkpoint.writes":1024}}' "$ELASTICSEARCH_URL/_logstash/pipeline/my_pipeline"
client.logstash().putPipeline(p -> p .id("my_pipeline") .pipeline(pi -> pi .description("Sample pipeline for illustration purposes") .lastModified(DateTime.of("2021-01-02T02:50:51.250Z")) .pipeline("input {}\n filter { grok {} }\n output {}") .pipelineMetadata(pip -> pip .type("logstash_pipeline") .version("1") ) .pipelineSettings(pip -> pip .pipelineWorkers(1) .pipelineBatchSize(125) .pipelineBatchDelay(50) .queueType("memory") .queueMaxBytes("1gb") .queueCheckpointWrites(1024) ) .username("elastic") ) ); 
Request example
Run `PUT _logstash/pipeline/my_pipeline` to create a pipeline.
{ "description": "Sample pipeline for illustration purposes", "last_modified": "2021-01-02T02:50:51.250Z", "pipeline_metadata": { "type": "logstash_pipeline", "version": 1 }, "username": "elastic", "pipeline": "input {}\\n filter { grok {} }\\n output {}", "pipeline_settings": { "pipeline.workers": 1, "pipeline.batch.size": 125, "pipeline.batch.delay": 50, "queue.type": "memory", "queue.max_bytes": "1gb", "queue.checkpoint.writes": 1024 } }