Skip to content
21 changes: 20 additions & 1 deletion javaagent-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import com.google.protobuf.gradle.*
plugins {
`java-library`
idea
id("com.google.protobuf") version "0.8.13"
id("com.google.protobuf") version "0.8.15"
id("org.hypertrace.publish-maven-central-plugin")
}

Expand All @@ -12,7 +12,18 @@ protobuf {
// The artifact spec for the Protobuf Compiler
artifact = "com.google.protobuf:protoc:3.11.4"
}
plugins {
id("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:1.37.0"
}
}
generateProtoTasks {
ofSourceSet("main").forEach {
it.plugins {
// Apply the "grpc" plugin whose spec is defined above, without options.
id("grpc")
}
}
}
}

Expand All @@ -31,6 +42,14 @@ dependencies {

api("com.google.protobuf:protobuf-java:3.11.4")
api("com.google.protobuf:protobuf-java-util:3.11.4")
api("io.grpc:grpc-stub:1.37.0")
api("io.grpc:grpc-protobuf:1.37.0")
api("io.grpc:grpc-netty:1.37.0")
if (JavaVersion.current().isJava9Compatible) {
// Workaround for @javax.annotation.Generated
// see: https://github.com/grpc/grpc-java/issues/3633
api("javax.annotation:javax.annotation-api:1.3.1")
}
// convert yaml to json, since java protobuf impl supports only json
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.11.3")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private EnvironmentConfig() {}
private static final String HT_PREFIX = "ht.";

public static final String CONFIG_FILE_PROPERTY = HT_PREFIX + "config.file";
public static final String DYNAMIC_CONFIG_SERVICE_URL = HT_PREFIX + "dynamic.config.service.url";
static final String SERVICE_NAME = HT_PREFIX + "service.name";
static final String ENABLED = HT_PREFIX + "enabled";
static final String RESOURCE_ATTRIBUTES = HT_PREFIX + ".resource.attributes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BoolValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Parser;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.hypertrace.agent.config.Config;
import org.hypertrace.agent.config.Config.AgentConfig;
import org.hypertrace.agent.config.Config.DataCapture;
Expand All @@ -38,10 +43,15 @@
import org.hypertrace.agent.config.Config.Opa.Builder;
import org.hypertrace.agent.config.Config.PropagationFormat;
import org.hypertrace.agent.config.Config.Reporting;
import org.hypertrace.agent.config.ConfigurationServiceGrpc;
import org.hypertrace.agent.config.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** {@link HypertraceConfig} loads a yaml config from file. */
/**
* {@link HypertraceConfig} loads a yaml config from file and gets dynamic config if dynamic config
* service url is set in env vars or system properties.
*/
public class HypertraceConfig {

private HypertraceConfig() {}
Expand All @@ -64,10 +74,25 @@ private HypertraceConfig() {}

public static AgentConfig get() {
if (agentConfig == null) {
String dynamicConfigServiceUrl =
EnvironmentConfig.getProperty(EnvironmentConfig.DYNAMIC_CONFIG_SERVICE_URL);
DynamicConfigFetcher dynamicConfigFetcher = null;
if (dynamicConfigServiceUrl != null) {
dynamicConfigFetcher = new DynamicConfigFetcher(dynamicConfigServiceUrl);
Executors.newScheduledThreadPool(
1,
runnable -> {
Thread thread = new Thread(runnable, "dynamic_agent_config_fetcher");
thread.setDaemon(true);
// thread.setContextClassLoader(null);
return thread;
})
.scheduleAtFixedRate(dynamicConfigFetcher, 60, 30, TimeUnit.SECONDS);
}
synchronized (HypertraceConfig.class) {
if (agentConfig == null) {
try {
agentConfig = load();
agentConfig = load(dynamicConfigFetcher);
log.info(
"Config loaded: {}",
JsonFormat.printer().omittingInsignificantWhitespace().print(agentConfig));
Expand Down Expand Up @@ -130,21 +155,29 @@ public static void reset() {
}
}

private static AgentConfig load() throws IOException {
@VisibleForTesting
static AgentConfig load(DynamicConfigFetcher dynamicConfigFetcher) throws IOException {
String configFile = EnvironmentConfig.getProperty(EnvironmentConfig.CONFIG_FILE_PROPERTY);
if (configFile == null) {
return EnvironmentConfig.applyPropertiesAndEnvVars(applyDefaults(AgentConfig.newBuilder()))
.build();
AgentConfig.Builder configBuilder = AgentConfig.newBuilder();
if (dynamicConfigFetcher != null) {
configBuilder = dynamicConfigFetcher.initializeConfig().toBuilder();
}
return EnvironmentConfig.applyPropertiesAndEnvVars(applyDefaults(configBuilder)).build();
}
return load(configFile);
return load(configFile, dynamicConfigFetcher);
}

@VisibleForTesting
static AgentConfig load(String filename) throws IOException {
static AgentConfig load(String filename, DynamicConfigFetcher dynamicConfigFetcher)
throws IOException {
File configFile = new File(filename);
if (!configFile.exists() || configFile.isDirectory() || !configFile.canRead()) {
log.error("Config file {} does not exist", filename);
AgentConfig.Builder configBuilder = AgentConfig.newBuilder();
if (dynamicConfigFetcher != null) {
configBuilder = dynamicConfigFetcher.initializeConfig().toBuilder();
}
return EnvironmentConfig.applyPropertiesAndEnvVars(applyDefaults(configBuilder)).build();
}

Expand Down Expand Up @@ -236,4 +269,41 @@ private static String convertYamlToJson(InputStream yaml) throws IOException {
ObjectMapper jsonWriter = new ObjectMapper();
return jsonWriter.writeValueAsString(obj);
}

public static class DynamicConfigFetcher implements Runnable {

private final ConfigurationServiceGrpc.ConfigurationServiceBlockingStub blockingStub;

private static Int64Value configUpdatedTimestamp;

private DynamicConfigFetcher(String dynamicConfigServiceUrl) {
Channel channel =
ManagedChannelBuilder.forTarget(dynamicConfigServiceUrl).usePlaintext().build();
blockingStub = ConfigurationServiceGrpc.newBlockingStub(channel);
configUpdatedTimestamp = Int64Value.newBuilder().setValue(System.currentTimeMillis()).build();
}

@Override
public void run() {
Service.UpdateConfigurationResponse response =
blockingStub.updateConfiguration(
Service.UpdateConfigurationRequest.newBuilder()
.setTimestamp(configUpdatedTimestamp)
.build());
configUpdatedTimestamp = response.getTimestamp();
AgentConfig.Builder configBuilder = HypertraceConfig.get().toBuilder();
configBuilder.setEnabled(response.getEnabled());
configBuilder.setDataCapture(response.getDataCapture());
configBuilder.setJavaagent(response.getJavaAgent());
agentConfig = configBuilder.build();
}

private AgentConfig initializeConfig() {
Service.InitialConfigurationResponse response =
blockingStub.initialConfiguration(
Service.InitialConfigurationRequest.newBuilder().build());
configUpdatedTimestamp = response.getTimestamp();
return response.getAgentConfig();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright The Hypertrace Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.hypertrace.agent.core.config;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.hypertrace.agent.config.ConfigurationServiceGrpc;
import org.hypertrace.agent.config.Service;

public class DynamicConfigServer {

private final Server server;
private Service.InitialConfigurationResponse initialConfigurationResponse;
private Service.UpdateConfigurationResponse updateConfigurationResponse;

public DynamicConfigServer(int port) {
server = ServerBuilder.forPort(port).addService(new DynamicConfigService()).build();
}

/** Start serving requests. */
public void start() throws IOException {
server.start();
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
DynamicConfigServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}

/** Stop serving requests and shutdown resources. */
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

/** Await termination on the main thread since the grpc library uses daemon threads. */
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

public Service.InitialConfigurationResponse getInitialConfigurationResponse() {
return initialConfigurationResponse;
}

public void setInitialConfigurationResponse(
Service.InitialConfigurationResponse initialConfigurationResponse) {
this.initialConfigurationResponse = initialConfigurationResponse;
}

public Service.UpdateConfigurationResponse getUpdateConfigurationResponse() {
return updateConfigurationResponse;
}

public void setUpdateConfigurationResponse(
Service.UpdateConfigurationResponse updateConfigurationResponse) {
this.updateConfigurationResponse = updateConfigurationResponse;
}

/** Main method. This comment makes the linter happy. */
public static void main(String[] args) throws Exception {
DynamicConfigServer server = new DynamicConfigServer(8980);
server.start();
server.blockUntilShutdown();
}

private class DynamicConfigService extends ConfigurationServiceGrpc.ConfigurationServiceImplBase {
@Override
public void initialConfiguration(
org.hypertrace.agent.config.Service.InitialConfigurationRequest request,
io.grpc.stub.StreamObserver<
org.hypertrace.agent.config.Service.InitialConfigurationResponse>
responseObserver) {

responseObserver.onNext(initialConfigurationResponse);
responseObserver.onCompleted();
}

@Override
public void updateConfiguration(
org.hypertrace.agent.config.Service.UpdateConfigurationRequest request,
io.grpc.stub.StreamObserver<org.hypertrace.agent.config.Service.UpdateConfigurationResponse>
responseObserver) {
responseObserver.onNext(updateConfigurationResponse);
responseObserver.onCompleted();
}
}
}
Loading