Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

This document describes the specification for how to write your Kafka cluster's desired state file. This currently must be a `YAML` file.

?> Current version: `1.0.2`
?> Current version: `1.0.3`

The desired state file consists of:

- **Settings** [Optional]: Specific settings for configuring `kafka-gitops`.
- **Topics** [Optional]: Topics and topic configuration definitions.
- **Topics** [Optional]: Topic and topic configuration definitions.
- **Services** [Optional]: Service definitions for generating ACLs.
- **Users** [Optional]: User definitions for generating ACLs.
- **Custom Service ACLs** [Optional]: Definitions for custom, non-generated ACLs.
Expand All @@ -20,14 +20,18 @@ The desired state file consists of:
**Options**:

- **ccloud** [Optional]: An object which contains an `enabled` field. Set this to true if using a Confluent Cloud cluster.
- **topics** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
- **topics** [Optional]:
- **defaults** [Optional]: Specify topic defaults so you don't need to specify them for every topic in the state file. Currently, only replication is supported.
- **blacklist** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.

**Example**:
```yaml
settings:
ccloud:
enabled: true
topics:
defaults:
replication: 3
blacklist:
prefixed:
- _confluent
Expand All @@ -51,6 +55,8 @@ topics:
segment.bytes: 1000000
```

If a default `replication` value is supplied in the `settings` block, then the `replication` field can be omitted. If a default `replication` value is provided and the `replication` field in the topic definition is set, the default will be overridden for that topic.

## Services

**Synopsis**: Define the services that will utilize your Kafka cluster. These service definitions allow `kafka-gitops` to generate ACLs for you. Yay!
Expand Down
36 changes: 34 additions & 2 deletions src/main/java/com/devshawn/kafka/gitops/StateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.devshawn.kafka.gitops.domain.state.CustomAclDetails;
import com.devshawn.kafka.gitops.domain.state.DesiredState;
import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
Expand All @@ -23,6 +24,7 @@
import com.devshawn.kafka.gitops.service.ParserService;
import com.devshawn.kafka.gitops.service.RoleService;
import com.devshawn.kafka.gitops.util.LogUtil;
import com.devshawn.kafka.gitops.util.StateUtil;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -65,6 +67,7 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {

public DesiredStateFile getAndValidateStateFile() {
DesiredStateFile desiredStateFile = parserService.parseStateFile();
validateTopics(desiredStateFile);
validateCustomAcls(desiredStateFile);
return desiredStateFile;
}
Expand Down Expand Up @@ -131,8 +134,9 @@ private void createServiceAccount(String name, List<ServiceAccount> serviceAccou
private DesiredState getDesiredState() {
DesiredStateFile desiredStateFile = getAndValidateStateFile();
DesiredState.Builder desiredState = new DesiredState.Builder()
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
.putAllTopics(desiredStateFile.getTopics());
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));

generateTopicsState(desiredState, desiredStateFile);

if (isConfluentCloudEnabled(desiredStateFile)) {
generateConfluentCloudServiceAcls(desiredState, desiredStateFile);
Expand All @@ -145,6 +149,18 @@ private DesiredState getDesiredState() {
return desiredState.build();
}

private void generateTopicsState(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
Optional<Integer> defaultReplication = StateUtil.fetchReplication(desiredStateFile);
if (defaultReplication.isPresent()) {
desiredStateFile.getTopics().forEach((name, details) -> {
Integer replication = details.getReplication().isPresent() ? details.getReplication().get() : defaultReplication.get();
desiredState.putTopics(name, new TopicDetails.Builder().mergeFrom(details).setReplication(replication).build());
});
} else {
desiredState.putAllTopics(desiredStateFile.getTopics());
}
}

private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
desiredStateFile.getServices().forEach((name, service) -> {
Expand Down Expand Up @@ -278,6 +294,22 @@ private void validateCustomAcls(DesiredStateFile desiredStateFile) {
});
}

private void validateTopics(DesiredStateFile desiredStateFile) {
Optional<Integer> defaultReplication = StateUtil.fetchReplication(desiredStateFile);
if (!defaultReplication.isPresent()) {
desiredStateFile.getTopics().forEach((name, details) -> {
if (!details.getReplication().isPresent()) {
throw new ValidationException(String.format("Not set: [replication] in state file definition: topics -> %s", name));
}
});
} else {
if (defaultReplication.get() < 1) {
throw new ValidationException("The default replication factor must be a positive integer.");
}
}

}

private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getCcloud().isPresent()) {
return desiredStateFile.getSettings().get().getCcloud().get().isEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import org.inferred.freebuilder.FreeBuilder;

import java.util.Map;
import java.util.Optional;

@FreeBuilder
@JsonDeserialize(builder = TopicDetails.Builder.class)
public interface TopicDetails {

Integer getPartitions();

Integer getReplication();
Optional<Integer> getReplication();

Map<String, String> getConfigs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
@JsonDeserialize(builder = SettingsTopics.Builder.class)
public interface SettingsTopics {

Optional<SettingsTopicsDefaults> getDefaults();

Optional<SettingsTopicsBlacklist> getBlacklist();

class Builder extends SettingsTopics_Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.devshawn.kafka.gitops.domain.state.settings;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.inferred.freebuilder.FreeBuilder;

import java.util.Optional;

@FreeBuilder
@JsonDeserialize(builder = SettingsTopicsDefaults.Builder.class)
public interface SettingsTopicsDefaults {

Optional<Integer> getReplication();

class Builder extends SettingsTopicsDefaults_Builder {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -65,10 +66,10 @@ public void deleteAcl(AclBinding aclBinding) {

public void createTopic(String topicName, TopicDetails topicDetails) {
try (final AdminClient adminClient = buildAdminClient()) {
NewTopic newTopic = new NewTopic(topicName, topicDetails.getPartitions(), topicDetails.getReplication().shortValue());
NewTopic newTopic = new NewTopic(topicName, topicDetails.getPartitions(), topicDetails.getReplication().get().shortValue());
newTopic.configs(topicDetails.getConfigs());
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (InterruptedException | ExecutionException ex) {
} catch (InterruptedException | ExecutionException | NoSuchElementException ex) {
throw new KafkaExecutionException("Error thrown when attempting to create a Kafka topic", ex.getMessage());
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.devshawn.kafka.gitops.util;

import com.devshawn.kafka.gitops.domain.plan.*;
import com.devshawn.kafka.gitops.domain.plan.AclPlan;
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
import com.devshawn.kafka.gitops.domain.plan.PlanOverview;
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
import com.devshawn.kafka.gitops.domain.state.AclDetails;
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
import com.devshawn.kafka.gitops.enums.PlanAction;
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
import picocli.CommandLine;
Expand Down Expand Up @@ -43,6 +46,7 @@ private static void printTopicPlan(TopicPlan topicPlan) {
break;
case UPDATE:
System.out.println(yellow(String.format("~ [TOPIC] %s", topicPlan.getName())));
System.out.println(yellow("\t~ configs:"));
topicPlan.getTopicConfigPlans().forEach(LogUtil::printTopicConfigPlan);
System.out.println("\n");
break;
Expand All @@ -54,19 +58,24 @@ private static void printTopicPlan(TopicPlan topicPlan) {
}

private static void printTopicConfigPlanForNewTopics(TopicDetails topicDetails) {
topicDetails.getConfigs().forEach((key, value) -> System.out.println(green(String.format("\t+ %s: %s", key, value))));
System.out.println(green(String.format("\t+ partitions: %s", topicDetails.getPartitions())));
System.out.println(green(String.format("\t+ replication: %s", topicDetails.getReplication().get())));
if (topicDetails.getConfigs().size() > 0) {
System.out.println(green("\t+ configs:"));
topicDetails.getConfigs().forEach((key, value) -> System.out.println(green(String.format("\t\t+ %s: %s", key, value))));
}
}

private static void printTopicConfigPlan(TopicConfigPlan topicConfigPlan) {
switch (topicConfigPlan.getAction()) {
case ADD:
System.out.println(green(String.format("\t+ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
System.out.println(green(String.format("\t\t+ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
break;
case UPDATE:
System.out.println(yellow(String.format("\t~ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
System.out.println(yellow(String.format("\t\t~ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
break;
case REMOVE:
System.out.println(red(String.format("\t- %s", topicConfigPlan.getKey())));
System.out.println(red(String.format("\t\t- %s", topicConfigPlan.getKey())));
break;
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/devshawn/kafka/gitops/util/StateUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.devshawn.kafka.gitops.util;

import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;

import java.util.Optional;

public class StateUtil {

public static Optional<Integer> fetchReplication(DesiredStateFile desiredStateFile) {
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getTopics().isPresent()
&& desiredStateFile.getSettings().get().getTopics().get().getDefaults().isPresent()) {
return desiredStateFile.getSettings().get().getTopics().get().getDefaults().get().getReplication();
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class PlanCommandIntegrationSpec extends Specification {
"custom-group-id-connect",
"custom-application-id-streams",
"custom-storage-topic",
"custom-storage-topics"
"custom-storage-topics",
"default-replication",
"default-replication-multiple"
]
}

Expand Down Expand Up @@ -101,6 +103,7 @@ class PlanCommandIntegrationSpec extends Specification {
"seed-topic-modification-3" | false
"seed-topic-modification-no-delete" | true
"seed-acl-exists" | true
"seed-blacklist-topics" | false
}

void 'test invalid plans - #planName'() {
Expand Down Expand Up @@ -133,7 +136,9 @@ class PlanCommandIntegrationSpec extends Specification {
"unrecognized-property",
"invalid-format",
"invalid-missing-user-principal",
"invalid-storage-topics"
"invalid-storage-topics",
"invalid-default-replication-1",
"invalid-default-replication-2"
]
}

Expand Down
35 changes: 35 additions & 0 deletions src/test/resources/plans/default-replication-multiple-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"topicPlans": [
{
"name": "test-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 3,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "another-topic",
"action": "ADD",
"topicDetails": {
"partitions": 3,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "last-topic",
"action": "ADD",
"topicDetails": {
"partitions": 3,
"replication": 4,
"configs": {}
},
"topicConfigPlans": []
}
],
"aclPlans": []
}
16 changes: 16 additions & 0 deletions src/test/resources/plans/default-replication-multiple.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
settings:
topics:
defaults:
replication: 3

topics:
test-topic:
partitions: 6

another-topic:
partitions: 3
replication: 1

last-topic:
partitions: 3
replication: 4
15 changes: 15 additions & 0 deletions src/test/resources/plans/default-replication-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"topicPlans": [
{
"name": "test-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
}
],
"aclPlans": []
}
8 changes: 8 additions & 0 deletions src/test/resources/plans/default-replication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
settings:
topics:
defaults:
replication: 1

topics:
test-topic:
partitions: 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Generating execution plan...

[INVALID] The default replication factor must be a positive integer.
8 changes: 8 additions & 0 deletions src/test/resources/plans/invalid-default-replication-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
settings:
topics:
defaults:
replication: -1

topics:
test-topic:
partitions: 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Generating execution plan...

[INVALID] Not set: [replication] in state file definition: topics -> test-topic
3 changes: 3 additions & 0 deletions src/test/resources/plans/invalid-default-replication-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
topics:
test-topic:
partitions: 6
2 changes: 2 additions & 0 deletions src/test/resources/plans/multi-file-apply-output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ Executing apply...
Applying: [CREATE]

+ [TOPIC] test-topic
+ partitions: 6
+ replication: 1


Successfully applied.
Expand Down
Loading