Skip to content

Conversation

lkts
Copy link
Contributor

@lkts lkts commented Feb 11, 2025

This PR introduces an implementation of FallbackSyntheticSourceBlockLoader for number fields and related tests. This allows to not synthesize entire synthetic _source in a block loader when doc_values are not available. unsigned_long and scaled_float are not covered in this PR.

Benchmark results are below (lower is better). Note that we benchmark only long but these results should be representable since the implementation is very similar.

before

Benchmark Mode Cnt Score Error Units FallbackSyntheticSourceBlockLoaderBenchmark.benchmark avgt 5 0.232 ± 0.002 s/op 

after

Benchmark Mode Cnt Score Error Units FallbackSyntheticSourceBlockLoaderBenchmark.benchmark avgt 5 0.104 ± 0.002 s/op 
Benchmark code
 /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the "Elastic License * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side * Public License v 1"; you may not use this file except in compliance with, at * your election, the "Elastic License 2.0", the "GNU Affero General Public * License v3.0 only", or the "Server Side Public License, v 1". */ package org.elasticsearch.benchmark.index.mapper; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.LuceneDocument; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceFieldMetrics; import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.search.fetch.StoredFieldsSpec; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; @Fork(value = 1) @Warmup(iterations = 5) @Measurement(iterations = 5) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Benchmark) public class FallbackSyntheticSourceBlockLoaderBenchmark { public static final int DOC_COUNT = 10000; private Random random; private Directory directory; private IndexReader indexReader; private MapperService mapperService; private BlockLoader blockLoader; private LeafReaderContext leafReaderContext; private BlockLoaderStoredFieldsFromLeafLoader storedFieldsLoader; private DummyBlockLoaderBuilder builder; static { LogConfigurator.configureESLogging(); // doc values implementations need logging } private static String SAMPLE_LOGS_MAPPING = """ { "properties": { "field_under_test": { "type": "long", "doc_values": false }, "kafka": { "properties": { "log": { "properties": { "component": { "ignore_above": 1024, "type": "keyword" }, "trace": { "properties": { "message": { "type": "text" }, "class": { "ignore_above": 1024, "type": "keyword" } } }, "thread": { "ignore_above": 1024, "type": "keyword" }, "class": { "ignore_above": 1024, "type": "keyword" } } } } }, "host": { "properties": { "hostname": { "ignore_above": 1024, "type": "keyword" }, "os": { "properties": { "build": { "ignore_above": 1024, "type": "keyword" }, "kernel": { "ignore_above": 1024, "type": "keyword" }, "codename": { "ignore_above": 1024, "type": "keyword" }, "name": { "ignore_above": 1024, "type": "keyword", "fields": { "text": { "type": "text" } } }, "family": { "ignore_above": 1024, "type": "keyword" }, "version": { "ignore_above": 1024, "type": "keyword" }, "platform": { "ignore_above": 1024, "type": "keyword" } } }, "domain": { "ignore_above": 1024, "type": "keyword" }, "ip": { "type": "ip" }, "containerized": { "type": "boolean" }, "name": { "ignore_above": 1024, "type": "keyword" }, "id": { "ignore_above": 1024, "type": "keyword" }, "type": { "ignore_above": 1024, "type": "keyword" }, "mac": { "ignore_above": 1024, "type": "keyword" }, "architecture": { "ignore_above": 1024, "type": "keyword" } } } } } """; @Setup public void setUp() throws IOException { this.random = new Random(); Settings settings = Settings.builder() .put("index.number_of_replicas", 0) .put("index.number_of_shards", 1) .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) .put("index.mapping.total_fields.limit", 100000) .put("index.mapping.source.mode", "synthetic") .build(); this.mapperService = MapperServiceFactory.create(settings, SAMPLE_LOGS_MAPPING); this.directory = new ByteBuffersDirectory(); try (IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE))) { var documents = generateRandomDocuments(DOC_COUNT); for (var document : documents) { LuceneDocument doc = mapperService.documentMapper().parse(document).rootDoc(); iw.addDocument(doc); } iw.commit(); } this.indexReader = DirectoryReader.open(directory); this.leafReaderContext = indexReader.leaves().get(0); SearchLookup searchLookup = new SearchLookup(null, null, null); this.blockLoader = mapperService.fieldType("field_under_test").blockLoader(new MappedFieldType.BlockLoaderContext() { @Override public String indexName() { return mapperService.getIndexSettings().getIndex().getName(); } @Override public IndexSettings indexSettings() { return mapperService.getIndexSettings(); } @Override public MappedFieldType.FieldExtractPreference fieldExtractPreference() { // TODO randomize when adding support for fields that care about this return MappedFieldType.FieldExtractPreference.NONE; } @Override public SearchLookup lookup() { return searchLookup; } @Override public Set<String> sourcePaths(String name) { return mapperService.mappingLookup().sourcePaths(name); } @Override public String parentField(String field) { return mapperService.mappingLookup().parentField(field); } @Override public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { return (FieldNamesFieldMapper.FieldNamesFieldType) mapperService.fieldType(FieldNamesFieldMapper.NAME); } }); var columnAtATimeReader = blockLoader.columnAtATimeReader(leafReaderContext); if (columnAtATimeReader != null) { throw new IllegalStateException(); } StoredFieldsSpec storedFieldsSpec = blockLoader.rowStrideStoredFieldSpec(); SourceLoader.Leaf leafSourceLoader = null; if (storedFieldsSpec.requiresSource()) { var sourceLoader = mapperService.mappingLookup().newSourceLoader(null, SourceFieldMetrics.NOOP); leafSourceLoader = sourceLoader.leaf(leafReaderContext.reader(), null); storedFieldsSpec = storedFieldsSpec.merge( new StoredFieldsSpec(true, storedFieldsSpec.requiresMetadata(), sourceLoader.requiredStoredFields()) ); } this.storedFieldsLoader = new BlockLoaderStoredFieldsFromLeafLoader( StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(leafReaderContext, null), leafSourceLoader ); storedFieldsLoader.advanceTo(0); this.builder = new DummyBlockLoaderBuilder(); } @TearDown public void tearDown() throws IOException { IOUtils.close(indexReader, directory); } @Benchmark public void benchmark() throws IOException { for (int i = 0; i < DOC_COUNT; i++) { storedFieldsLoader.advanceTo(i); blockLoader.rowStrideReader(leafReaderContext).read(i, storedFieldsLoader, builder); } } private SourceToParse[] generateRandomDocuments(int count) throws IOException { var docs = new SourceToParse[count]; for (int i = 0; i < count; i++) { docs[i] = generateRandomDocument(); } return docs; } private SourceToParse generateRandomDocument() throws IOException { var builder = XContentBuilder.builder(XContentType.JSON.xContent()); builder.startObject(); if (random.nextBoolean()) { builder.field("field_under_test", randomLong()); } else { var size = random.nextInt(6); builder.startArray("field_under_test"); for (int i = 0; i < size; i++) { builder.value(randomLong()); } builder.endArray(); } builder.startObject("kafka"); { builder.startObject("log"); { builder.field("component", randomString(10)); builder.startArray("trace"); { builder.startObject(); { builder.field("message", randomString(50)); builder.field("class", randomString(10)); } builder.endObject(); builder.startObject(); { builder.field("message", randomString(50)); builder.field("class", randomString(10)); } builder.endObject(); } builder.endArray(); builder.field("thread", randomString(10)); builder.field("class", randomString(10)); } builder.endObject(); } builder.endObject(); builder.startObject("host"); { builder.field("hostname", randomString(10)); builder.startObject("os"); { builder.field("name", randomString(10)); } builder.endObject(); builder.field("domain", randomString(10)); builder.field("ip", randomIp()); builder.field("name", randomString(10)); } builder.endObject(); builder.endObject(); return new SourceToParse(UUIDs.randomBase64UUID(), BytesReference.bytes(builder), XContentType.JSON); } private Long randomLong() { return random.nextDouble() <= 0.1 ? null : random.nextLong(); } private String randomIp() { return "" + random.nextInt(255) + '.' + random.nextInt(255) + '.' + random.nextInt(255) + '.' + random.nextInt(255); } private String randomString(int maxLength) { var length = random.nextInt(maxLength); var builder = new StringBuilder(length); for (int i = 0; i < length; i++) { builder.append((byte) (32 + random.nextInt(94))); } return builder.toString(); } private class DummyBlockLoaderBuilder implements BlockLoader.LongBuilder { private final List<Object> vals = new ArrayList<>(); @Override public BlockLoader.Block build() { return null; } @Override public BlockLoader.Builder appendNull() { vals.add(null); return this; } @Override public BlockLoader.LongBuilder appendLong(long value) { vals.add(value); return this; } @Override public BlockLoader.Builder beginPositionEntry() { return this; } @Override public BlockLoader.Builder endPositionEntry() { return this; } @Override public void close() { } public List<Object> getVals() { return vals; } } } 

Contributes to #115394.

}

MappedFieldType ft = new NumberFieldType(context.buildFullName(leafName()), this);
MappedFieldType ft = new NumberFieldType(context.buildFullName(leafName()), this, context.isSourceSynthetic());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to do this because this PR will be backported and in 8.x _source.mode parameter is still operational.

@lkts lkts requested review from dnhatn and martijnvg February 11, 2025 23:10
@lkts lkts marked this pull request as ready for review February 11, 2025 23:11
@lkts lkts added auto-backport Automatically create backport pull requests when merged :StorageEngine/Mapping The storage related side of mappings v8.19.0 >enhancement labels Feb 11, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @lkts, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-storage-engine (Team:StorageEngine)

Copy link
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Great to see how this new block loader improves performance compared to the source based block loader.

@@ -0,0 +1,3 @@
package org.elasticsearch.benchmark.index.mapper;

public class FallbackSyntheticSourceBlockLoaderBenchmark {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intend to include the benchmark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't intend to do that, i am not sure how this ended up being here :)

var converted = type.parse(value, coerce);
accumulator.add(converted);
} catch (Exception e) {
// Malformed value, skip it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok and it matches with the behavior of ignore malformed in es|ql? If a value is malformed it isn't available in doc values and never gets returned by the block loader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do the same for stored source.

// Transform number to correct type (e.g. reduce precision)
accumulator.add(type.parse(rawValue, coerce));
} catch (Exception e) {
// Malformed value, skip it./gradlew ":server:test" --tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove ./gradlew ":server:test" --tests ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Thanks @lkts


package org.elasticsearch.index.mapper.blockloader;

import org.elasticsearch.logsdb.datageneration.FieldType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should graduate the datageneration package out of logsdb at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I even have a PR for that but i've been changing things there and it was conflicting. I'll merge that in.

@lkts lkts merged commit b8d7e99 into elastic:main Feb 13, 2025
17 checks passed
@lkts lkts deleted the synthetic_source_block_loader_numbers branch February 13, 2025 00:12
@elasticsearchmachine
Copy link
Collaborator

💚 Backport successful

Status Branch Result
8.x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-backport Automatically create backport pull requests when merged >enhancement :StorageEngine/Mapping The storage related side of mappings Team:StorageEngine v8.19.0 v9.1.0

4 participants