- Notifications
You must be signed in to change notification settings - Fork 25.5k
Add LogsDB option to route on sort fields #116687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8af55ad
1116d7d
2f0633c
bc2feda
b20501e
84d3927
e8c79a4
4e9fd9d
ef09258
285a74e
2ceeef1
412293e
b555036
4065c63
9457a47
3fd9350
bfd055d
2ac0021
a74723e
b9e45cc
7567efa
f97d4ef
87449f9
a922777
e71e1c4
a81cad3
47606bc
c81cf5c
384657a
d370de6
5e17f66
dbfc374
6068cbb
8dfcc9d
530bb78
92f2c3d
e35b814
dc20f82
07b3308
cfdb91f
72e520e
a6e3ff5
52f2dad
6f3bbaf
8e2bdb0
cce66f0
b2e2bd3
3ca11a8
603bd06
4dbc3c3
8204849
48fb2d8
86998ea
a59070e
58d428d
c72effa
e134822
71db536
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 116687 | ||
summary: Add LogsDB option to route on sort fields | ||
area: Logs | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
| @@ -40,6 +40,7 @@ | |
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.OptionalInt; | ||
import java.util.Set; | ||
import java.util.function.IntConsumer; | ||
import java.util.function.IntSupplier; | ||
| @@ -55,6 +56,7 @@ public abstract class IndexRouting { | |
| ||
static final NodeFeature BOOLEAN_ROUTING_PATH = new NodeFeature("routing.boolean_routing_path"); | ||
static final NodeFeature MULTI_VALUE_ROUTING_PATH = new NodeFeature("routing.multi_value_routing_path"); | ||
static final NodeFeature LOGSB_ROUTE_ON_SORT_FIELDS = new NodeFeature("routing.logsb_route_on_sort_fields"); | ||
| ||
/** | ||
* Build the routing from {@link IndexMetadata}. | ||
| @@ -165,7 +167,8 @@ private abstract static class IdAndRoutingOnly extends IndexRouting { | |
| ||
@Override | ||
public void preProcess(IndexRequest indexRequest) { | ||
// generate id if not already provided | ||
// Generate id if not already provided. | ||
// This is needed for routing, so it has to happen in pre-processing. | ||
final String id = indexRequest.id(); | ||
if (id == null) { | ||
if (shouldUseTimeBasedId(indexMode, creationVersion)) { | ||
| @@ -272,15 +275,20 @@ public void collectSearchShards(String routing, IntConsumer consumer) { | |
public static class ExtractFromSource extends IndexRouting { | ||
private final Predicate<String> isRoutingPath; | ||
private final XContentParserConfiguration parserConfig; | ||
private final IndexMode indexMode; | ||
private final boolean trackTimeSeriesRoutingHash; | ||
private final boolean addIdWithRoutingHash; | ||
private int hash = Integer.MAX_VALUE; | ||
| ||
ExtractFromSource(IndexMetadata metadata) { | ||
super(metadata); | ||
if (metadata.isRoutingPartitionedIndex()) { | ||
throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path"); | ||
} | ||
trackTimeSeriesRoutingHash = metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID); | ||
indexMode = metadata.getIndexMode(); | ||
trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES | ||
&& metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID); | ||
addIdWithRoutingHash = indexMode == IndexMode.LOGSDB; | ||
List<String> routingPaths = metadata.getRoutingPaths(); | ||
isRoutingPath = Regex.simpleMatcher(routingPaths.toArray(String[]::new)); | ||
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(routingPaths), null, true); | ||
| @@ -292,8 +300,13 @@ public boolean matchesField(String fieldName) { | |
| ||
@Override | ||
public void postProcess(IndexRequest indexRequest) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't completely follow why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right makes sense. Maybe add this as a comment? | ||
// Update the request with the routing hash, if needed. | ||
// This needs to happen in post-processing, after the routing hash is calculated. | ||
if (trackTimeSeriesRoutingHash) { | ||
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash)); | ||
} else if (addIdWithRoutingHash) { | ||
assert hash != Integer.MAX_VALUE; | ||
indexRequest.autoGenerateTimeBasedId(OptionalInt.of(hash)); | ||
} | ||
} | ||
| ||
| @@ -461,12 +474,15 @@ private int idToHash(String id) { | |
try { | ||
idBytes = Base64.getUrlDecoder().decode(id); | ||
} catch (IllegalArgumentException e) { | ||
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName); | ||
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName); | ||
} | ||
if (idBytes.length < 4) { | ||
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName); | ||
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName); | ||
} | ||
return hashToShardId(ByteUtils.readIntLE(idBytes, 0)); | ||
// For TSDB, the hash is stored as the id prefix. | ||
// For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id, | ||
// see IndexRequest#autoGenerateTimeBasedId. | ||
return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0)); | ||
} | ||
| ||
@Override | ||
| @@ -480,7 +496,7 @@ public void collectSearchShards(String routing, IntConsumer consumer) { | |
} | ||
| ||
private String error(String operation) { | ||
return operation + " is not supported because the destination index [" + indexName + "] is in time series mode"; | ||
return operation + " is not supported because the destination index [" + indexName + "] is in " + indexMode.getName() + " mode"; | ||
} | ||
} | ||
| ||
|
Uh oh!
There was an error while loading. Please reload this page.