Skip to content

Commit 6b74d58

Browse files
cosmo0920edsiper
authored andcommitted
config: storage: Implement dlq for filesystem chunks
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 3d6f0bc commit 6b74d58

File tree

3 files changed

+119
-0
lines changed

3 files changed

+119
-0
lines changed

include/fluent-bit/flb_config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ struct flb_config {
257257
/* DLQ for non-retriable output failures */
258258
int storage_keep_rejected; /* 0/1 */
259259
char *storage_rejected_path; /* relative to storage_path, default "rejected" */
260+
void *storage_rejected_stream; /* NULL until first use */
260261

261262
/* Embedded SQL Database support (SQLite3) */
262263
#ifdef FLB_HAVE_SQLDB

include/fluent-bit/flb_storage.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,11 @@ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_met
8787

8888
void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);
8989

90+
/* DLQ */
91+
int flb_storage_quarantine_chunk(struct flb_config *ctx,
92+
struct cio_chunk *ch,
93+
const char *tag,
94+
int status_code,
95+
const char *out_name);
96+
9097
#endif

src/flb_storage.c

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,117 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch
777777
*fs_chunks = storage_st.chunks_fs;
778778
}
779779

780+
781+
static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx)
782+
{
783+
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
784+
struct cio_stream *st;
785+
const char *name;
786+
787+
if (!ctx || !ctx->cio) {
788+
return NULL;
789+
}
790+
if (!ctx->storage_keep_rejected || !ctx->storage_path) {
791+
return NULL;
792+
}
793+
794+
if (ctx->storage_rejected_stream) {
795+
return ctx->storage_rejected_stream;
796+
}
797+
798+
name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected";
799+
800+
st = cio_stream_get(ctx->cio, name);
801+
if (!st) {
802+
st = cio_stream_create(ctx->cio, name, FLB_STORAGE_FS);
803+
}
804+
if (!st) {
805+
flb_warn("[storage] failed to create rejected stream '%s'", name);
806+
return NULL;
807+
}
808+
809+
ctx->storage_rejected_stream = st;
810+
return st;
811+
#else
812+
FLB_UNUSED(ctx);
813+
return NULL;
814+
#endif
815+
}
816+
817+
int flb_storage_quarantine_chunk(struct flb_config *ctx,
818+
struct cio_chunk *src,
819+
const char *tag,
820+
int status_code,
821+
const char *out_name)
822+
{
823+
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
824+
struct cio_stream *dlq;
825+
void *buf = NULL;
826+
size_t size = 0;
827+
int err = 0;
828+
char name[256];
829+
struct cio_chunk *dst;
830+
831+
if (!ctx || !src) {
832+
return -1;
833+
}
834+
dlq = get_or_create_rejected_stream(ctx);
835+
if (!dlq) {
836+
return -1;
837+
}
838+
839+
if (cio_chunk_is_up(src) != CIO_TRUE) {
840+
if (cio_chunk_up_force(src) != CIO_OK) {
841+
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
842+
return -1;
843+
}
844+
}
845+
846+
if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
847+
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
848+
return -1;
849+
}
850+
851+
/* Compose a simple, unique-ish file name */
852+
snprintf(name, sizeof(name),
853+
"%s_%d_%s_%p.flb",
854+
tag ? tag : "no-tag",
855+
status_code,
856+
out_name ? out_name : "out",
857+
(void *) src);
858+
859+
/* Create + write the DLQ copy */
860+
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
861+
if (!dst) {
862+
flb_warn("[storage] DLQ open failed (err=%d)", err);
863+
flb_free(buf);
864+
return -1;
865+
}
866+
if (cio_chunk_write(dst, buf, size) != CIO_OK ||
867+
cio_chunk_sync(dst) != CIO_OK) {
868+
flb_warn("[storage] DLQ write/sync failed");
869+
cio_chunk_close(dst, CIO_TRUE);
870+
flb_free(buf);
871+
return -1;
872+
}
873+
874+
cio_chunk_close(dst, CIO_FALSE);
875+
flb_free(buf);
876+
877+
flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);
878+
879+
return 0;
880+
#else
881+
FLB_UNUSED(ctx);
882+
FLB_UNUSED(src);
883+
FLB_UNUSED(tag);
884+
FLB_UNUSED(status_code);
885+
FLB_UNUSED(out_name);
886+
887+
return -1;
888+
#endif
889+
}
890+
780891
void flb_storage_destroy(struct flb_config *ctx)
781892
{
782893
struct cio_ctx *cio;

0 commit comments

Comments
 (0)