Client library connection code samples

This page provides code samples for configuring client libraries to connect to clusters in Memorystore for Redis Cluster.

Client library code sample

This section shows a Lettuce client library code sample for connecting to a cluster. For this sample, the cluster doesn't use Identity and Access Management (IAM) authentication or in-transit encryption.

Lettuce

We recommend using Lettuce, versions 6.2.4 and later.

// Create RedisURI from the MRC discovery endpoint RedisURI redisUri = RedisURI.Builder.redis(CLUSTER_DISC_EP_ADDR, CLUSTER_DISC_EP_PORT).build(); // Configure client' resources // Configure reconnectDelay with exponential backoff and full jitter ClientResources resources = DefaultClientResources.builder()  .reconnectDelay(Delay.fullJitter(  Duration.ofMillis(100), // minimum 100 millisecond delay  Duration.ofSeconds(5), // maximum 5 second delay  100, TimeUnit.MILLISECONDS) // 100 millisecond base ).build(); // Create a cluster client with the URI and resources RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri); // Configure the topology refreshment options // Enable periodic cluster topology updates so that the client updates the cluster topology in the intervals of // 60 seconds // Enable adaptive topology refresh that uses all triggers: MOVED_REDIRECT, ASK_REDIRECT, // PERSISTENT_RECONNECTS, UNCOVERED_SLOT, UNKNOWN_NODE // Disable dynamicRefreshSources so that only the initial seed nodes (Memorystore for Redis Cluster // discovery endpoint) will be used as the source for topology discovery // Enable closing stale connections when refreshing the cluster topology. This reduces the need to handle // failed connections during command runtime. ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()  .enablePeriodicRefresh(1, TimeUnit.MINUTES)  .enableAllAdaptiveRefreshTriggers()  .dynamicRefreshSources(false)  .closeStaleConnections(true)  .build(); // Configure the socket options // Set connectTimeout based on your application requirements and workload // Enable TCP keepAlive to reduce the need to handle failed connections during command runtime SocketOptions socketOptions = SocketOptions.builder()  .connectTimeout(CONNECT_TIMEOUT)  .keepAlive(true)  .build(); // Configure the client options // Enable AutoReconnect when connection is lost // Set nodeFilter to filter out failed nodes from the topology // Disable validateClusterNodeMembership to allow redirecting commands to newly added nodes clusterClient.setOptions(ClusterClientOptions.builder()  .topologyRefreshOptions(topologyRefreshOptions)  .socketOptions(socketOptions)  .autoReconnect(true)  .nodeFilter(it ->  ! (it.is(RedisClusterNode.NodeFlag.FAIL)  || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)  || it.is(RedisClusterNode.NodeFlag.NOADDR)))  .validateClusterNodeMembership(false)  .build()); // Create a connection pool GenericObjectPool<StatefulRedisClusterConnection<String, String> pool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(), new GenericObjectPoolConfig()); pool.setMaxTotal(MAX_CONNECTIONS_IN_CONNECTION_POOL); // Get a connection from the connection pool StatefulRedisClusterConnection<String, String> connection = pool.borrowObject(); // Get a cluster sync command and call 'set' RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync(); syncCommands.set(key, value);

In-transit encryption client library code sample

This section gives an example of client code for authenticating with in-transit encryption for your Memorystore cluster with the go-redis client library.

go-redis

We recommend using go-redis, versions 9.11.0 and later.

import (  "context"  "crypto/tls"  "crypto/x509"  "io/ioutil"  "log"  "time"  "github.com/go-redis/redis/v9" ) func example() {  // Load CA cert  caFilePath :=   caCert, err := ioutil.ReadFile(caFilePath)  if err != nil {  log.Fatal(err)  }  caCertPool := x509.NewCertPool()  caCertPool.AppendCertsFromPEM(caCert)  // Setup Redis Connection pool  client := redis.NewClusterClient(&redis.ClusterOptions{  Addrs: []string{"CLUSTER_DISC_EP_ADDR:CLUSTER_DISC_EP_PORT"},  // PoolSize applies per cluster node and not for the whole cluster.  PoolSize: 10,  ConnMaxIdleTime: 60 * time.Second,  MinIdleConns: 1,  TLSConfig: &tls.Config{  RootCAs: caCertPool,  },  })  ctx := context.Background()  err = client.Set(ctx, "key", "value", 0).Err()  if err != nil {  log.Fatal(err)  } }

IAM authentication and in-transit encryption code sample

This section gives an example of how to authenticate and connect to a cluster by using both IAM authentication and in-transit encryption with various client libraries:

redis-py

We recommend using redis-py, versions 5.1 and later.

from google.cloud import iam_credentials_v1 from redis.backoff import ConstantBackoff from redis.retry import Retry from redis.exceptions import ( ConnectionError, AuthenticationWrongNumberOfArgsError, AuthenticationError ) from redis.utils import (str_if_bytes) import redis service_account="projects/-/serviceAccounts/<TO-DO-1: email of service account used to authenticate to Redis Cluster>" host=<TO-DO-2: your Redis Cluster discovery endpoint ip> ssl_ca_certs=<TO-DO-3, your trusted server ca file name> def generate_access_token(): # Create a client client = iam_credentials_v1.IAMCredentialsClient() # Initialize request argument(s) request = iam_credentials_v1.GenerateAccessTokenRequest( name=service_account, scope=['https://www.googleapis.com/auth/cloud-platform'], ) # Make the request response = client.generate_access_token(request=request) # Handle the response return str(response.access_token) def iam_connect(self): "Initialize the connection and authenticate" self._parser.on_connect(self) auth_args = (generate_access_token(),) self.send_command("AUTH", *auth_args, check_health=False) try: auth_response = self.read_response() except AuthenticationWrongNumberOfArgsError: self.send_command("AUTH", self.password, check_health=False) auth_response = self.read_response() if str_if_bytes(auth_response) != "OK": raise AuthenticationError("Invalid Username or Password") # Connect to Memorystore for Redis Cluster backoff = ConstantBackoff(3) retry = Retry(retries=-1, backoff=backoff, supported_errors=(ConnectionError, ConnectionResetError)) r=redis.cluster.RedisCluster(host=host, port=6379,redis_connect_func=iam_connect, retry=retry, ssl=True, ssl_ca_certs=ssl_ca_certs) print(r.get('key'))

Lettuce

We recommend using Lettuce, versions 6.2.4 and later.

import com.google.cloud.iam.credentials.v1.GenerateAccessTokenResponse; import com.google.cloud.iam.credentials.v1.IamCredentialsClient; import io.lettuce.core.RedisCredentials; import io.lettuce.core.RedisCredentialsProvider; import io.lettuce.core.RedisURI; import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import io.lettuce.core.resource.Delay; import java.io.Closeable; import java.io.File; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import reactor.core.publisher.Mono; public class IAMAuth {  /**  * This thread-safe implementation (excluding the main app below) is intended for production use.  * It provides a background refresh logic that shouldn't overload IAM service in case. the  * application has many many connections (connection storms can result in IAM throttles  * otherwise). 
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider, Runnable, Closeable { private static final Logger logger = Logger.getLogger(RedisClusterCredentialsProvider.class.getName()); private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); private final IamCredentialsClient iamClient; private final String accountName; private final Duration refreshDuration; private final Duration lifetime; private volatile RedisCredentials credentials; private volatile Instant lastRefreshInstant; private volatile Exception lastException; /** * AccountName: * "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; * RefreshDuration: Duration.ofSeconds(300) Lifetime: Duration.ofSeconds(3600); */ public RedisClusterCredentialsProvider( String accountName, Duration refreshDuration, Duration lifetime) throws Exception { this.iamClient = IamCredentialsClient.create(); this.accountName = accountName; this.refreshDuration = refreshDuration; this.lifetime = lifetime; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow(); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); } @Override public Mono resolveCredentials() { if (hasTokenExpired()) { throw new RuntimeException("Background IAM token refresh failed", lastException); } return Mono.just(this.credentials); } private boolean hasTokenExpired() { if (this.lastRefreshInstant == null || this.lifetime == null) { return true; } return Instant.now().isAfter(this.lastRefreshInstant.plus(this.lifetime)); } // To be invoked by customer app on shutdown @Override public void close() { service.shutdown(); iamClient.close(); } @Override public void run() { try { // fetch token if it is time to refresh if (this.lastRefreshInstant != null && this.refreshDuration != null && Instant.now().isBefore(this.lastRefreshInstant.plus(this.refreshDuration))) { // nothing to do return; } refreshTokenNow(); } catch (Exception e) { // suppress all errors as we cannot allow the task to die // log for visibility logger.log(Level.parse("SEVERE"), "Background IAM token refresh failed", e); } } private void refreshTokenNow() { try { logger.info("Refreshing IAM token"); com.google.protobuf.Duration lifetimeProto = com.google.protobuf.Duration.newBuilder() .setSeconds(lifetime.getSeconds()) .setNanos(lifetime.getNano()) .build(); GenerateAccessTokenResponse response = this.iamClient.generateAccessToken( this.accountName, new ArrayList<>(), Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"), lifetimeProto); // got a successful token refresh this.credentials = new RedisCredentials() { @Override public boolean hasUsername() { return false; } @Override public boolean hasPassword() { return true; } @Override public String getUsername() { return "default"; } @Override public char[] getPassword() { return response.getAccessToken().toCharArray(); } }; this.lastRefreshInstant = Instant.now(); // clear the last saved exception this.lastException = null; logger.info( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this.refreshDuration + "], accountName [" + this.accountName + "] and lifetime [" + this.lifetime + "]"); } catch (Exception e) { // Save last exception for inline feedback this.lastException = e; // Bubble up for direct feedback throw e; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main(String[] args) throws Exception { // These are the parameters the user needs to replace String discoveryEndpointIp = "CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS"; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER; String accountName = "ACCOUNT_NAME"; String caFileName = "CA_FILE_NAME"; int refreshDurationSec = REFRESH_DURATION_SEC; int lifetimeSec = LIFETIME_SEC; RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider( accountName, Duration.ofSeconds(refreshDurationSec), Duration.ofSeconds(lifetimeSec)); RedisURI redisUri = RedisURI.Builder.redis(discoveryEndpointIp, discoveryEndpointPort) .withSsl(true) .withAuthentication(credentialsProvider) .build(); ClientResources resources = DefaultClientResources.builder() .reconnectDelay( Delay.fullJitter( Duration.ofMillis(100), Duration.ofSeconds(5), 100, TimeUnit.MILLISECONDS)) .build(); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(1, TimeUnit.MINUTES) .enableAllAdaptiveRefreshTriggers() .dynamicRefreshSources(false) .closeStaleConnections(true) .build(); SslOptions sslOptions = SslOptions.builder().jdkSslProvider().trustManager(new File(caFileName)).build(); SocketOptions socketOptions = SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).keepAlive(true).build(); // Create Redis Cluster Client RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri); clusterClient.setOptions( ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .socketOptions(socketOptions) .sslOptions(sslOptions) .autoReconnect(true) .nodeFilter( it -> !(it.is(RedisClusterNode.NodeFlag.FAIL) || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) || it.is(RedisClusterNode.NodeFlag.NOADDR))) .validateClusterNodeMembership(false) .build()); // Establish connection to Redis Cluster StatefulRedisClusterConnection<String, String> connection = clusterClient.connect(); // Retrieve synchronous Redis Cluster commands RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync(); // Perform Redis operations syncCommands.set("key1", "value1"); String value = syncCommands.get("key1"); System.out.println("Retrieved value: " + value); int count = 0; for (int i = 0; i < 1000; i++) { String k = "lettucekey" + String.valueOf(i); String v = "lettucevalue" + String.valueOf(i); syncCommands.set(k, v); String got = syncCommands.get(k); if (got.equals(v)) { count++; } else { System.out.println("unexpected value"); } } System.out.println("Successfully got " + String.valueOf(count) + " keys"); // Close the connection and shutdown the client connection.close(); clusterClient.shutdown(); ((Closeable) credentialsProvider).close(); } }

Jedis

We recommend using Jedis, versions 4.4.0 and later.

import com.google.cloud.iam.credentials.v1.GenerateAccessTokenResponse; import com.google.cloud.iam.credentials.v1.IamCredentialsClient; import java.io.Closeable; import java.io.FileInputStream; import java.io.InputStream; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Connection; import redis.clients.jedis.DefaultJedisClientConfig; import redis.clients.jedis.DefaultRedisCredentials; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.RedisCredentials; import redis.clients.jedis.RedisCredentialsProvider; /** Customers are free to update/replace code as they see fit. */ public class IAMAuth {  /**  * This thread-safe implementation (excluding the main app below) is intended for production use.  * It provides a background refresh logic that shouldn't overload IAM service in case. the  * application has many many connections (connection storms can result in IAM throttles  * otherwise). 
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider, Runnable, Closeable { private static final Logger logger = Logger.getLogger(RedisClusterCredentialsProvider.class.getName()); private final IamCredentialsClient iamClient; private final ScheduledExecutorService service; private final String accountName; private final Duration refreshDuration; private final Duration lifetime; private volatile RedisCredentials credentials; private volatile Instant lastRefreshInstant; private volatile Exception lastException; // AccountName: // "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; // RefreshDuration: Duration.ofSeconds(300); // Lifetime: Duration.ofSeconds(3600); public RedisClusterCredentialsProvider( String accountName, Duration refreshDuration, Duration lifetime) throws Exception { this.iamClient = IamCredentialsClient.create(); this.service = Executors.newSingleThreadScheduledExecutor(); this.accountName = accountName; this.refreshDuration = refreshDuration; this.lifetime = lifetime; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow(); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); } public RedisCredentials get() { if (hasTokenExpired()) { throw new RuntimeException("Background IAM token refresh failed", lastException); } return this.credentials; } private boolean hasTokenExpired() { if (this.lastRefreshInstant == null || this.lifetime == null) { return true; } return Instant.now().isAfter(this.lastRefreshInstant.plus(this.lifetime)); } // To be invoked by customer app on shutdown @Override public void close() { service.shutdown(); iamClient.close(); } @Override public void run() { try { // fetch token if it is time to refresh if (this.lastRefreshInstant != null && this.refreshDuration != null && Instant.now().isBefore(this.lastRefreshInstant.plus(this.refreshDuration))) { // nothing to do return; } refreshTokenNow(); } catch (Exception e) { // suppress all errors as we cannot allow the task to die // log for visibility logger.log(Level.parse("SEVERE"), "Background IAM token refresh failed", e); } } private void refreshTokenNow() { try { logger.info("Refreshing IAM token"); List delegates = new ArrayList<>(); com.google.protobuf.Duration lifetimeProto = com.google.protobuf.Duration.newBuilder() .setSeconds(lifetime.getSeconds()) .setNanos(lifetime.getNano()) .build(); GenerateAccessTokenResponse response = iamClient.generateAccessToken( this.accountName, delegates, Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"), lifetimeProto); // got a successful token refresh this.credentials = new DefaultRedisCredentials("default", response.getAccessToken()); this.lastRefreshInstant = Instant.now(); // clear the last saved exception this.lastException = null; logger.info( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this.refreshDuration + "], accountName [" + this.accountName + "] and lifetime [" + this.lifetime + "]"); } catch (Exception e) { // Save last exception for inline feedback this.lastException = e; // Bubble up for direct feedback throw e; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main(String[] args) throws Exception { String discoveryEndpointIp = "CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS"; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER; GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setTestWhileIdle(true); int timeout = 5000; int maxAttempts = 5; HostAndPort discovery = new HostAndPort(discoveryEndpointIp, discoveryEndpointPort); RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider( "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", Duration.ofSeconds(300), Duration.ofSeconds(3600)); // Create JedisCluster cluster InputStream is = new FileInputStream("server-ca.pem"); // You could get a resource as a stream instead. CertificateFactory cf = CertificateFactory.getInstance("X.509"); X509Certificate caCert = (X509Certificate) cf.generateCertificate(is); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null); // You don't need the KeyStore cluster to come from a file. ks.setCertificateEntry("caCert", caCert); tmf.init(ks); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, tmf.getTrustManagers(), null); JedisCluster jedisCluster = new JedisCluster( discovery, DefaultJedisClientConfig.builder() .connectionTimeoutMillis(timeout) .socketTimeoutMillis(timeout) .credentialsProvider(credentialsProvider) .ssl(true) .sslSocketFactory(sslContext.getSocketFactory()) .build(), maxAttempts, config); // Perform operations on the cluster jedisCluster.set("myKey", "Hello, Redis Cluster!"); String value = jedisCluster.get("myKey"); System.out.println("Value for myKey: " + value); int count = 0; for (int i = 0; i < 1000; i++) { String k = "jediskey" + String.valueOf(i); String v = "jedisvalue" + String.valueOf(i); jedisCluster.set(k, v); String got = jedisCluster.get(k); if (got.equals(v)) { count++; } else { System.out.println("unexpected value"); } } System.out.println("Successfully got " + String.valueOf(count) + " keys"); // Disconnect from the cluster jedisCluster.close(); // Cleanup the resources used by the provider ((Closeable) credentialsProvider).close(); } }

Go

We recommend using Go, versions 1.24.5 and later.

package main import (  "context"  "crypto/tls"  "crypto/x509"  "flag"  "fmt"  "io/ioutil"  "log"  "sync"  "time"  credentials "google.golang.org/genproto/googleapis/iam/credentials/v1"  "github.com/golang/protobuf/ptypes"  "github.com/redis/go-redis/v9"  "google.golang.org/api/option"  gtransport "google.golang.org/api/transport/grpc" ) var (  svcAccount = flag.String("a", "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", "service account email")  lifetime = flag.Duration("d", time.Hour, "lifetime of token")  refreshDuration = flag.Duration("r", 5*time.Minute, "token refresh duration")  checkTokenExpiryInterval = flag.Duration("e", 10*time.Second, "check token expiry interval")  lastRefreshInstant = time.Time{}  errLastSeen = error(nil)  token = ""  mu = sync.RWMutex{} ) func retrieveToken() (string, error) {  ctx := context.Background()  conn, err := gtransport.Dial(ctx,  option.WithEndpoint("iamcredentials.googleapis.com:443"),  option.WithScopes("https://www.googleapis.com/auth/cloud-platform"))  if err != nil {  log.Printf("Failed to dial API, error: %v", err)  return token, err  }  client := credentials.NewIAMCredentialsClient(conn)  req := credentials.GenerateAccessTokenRequest{  Name: *svcAccount,  Scope: []string{"https://www.googleapis.com/auth/cloud-platform"},  Lifetime: ptypes.DurationProto(*lifetime),  }  rsp, err := client.GenerateAccessToken(ctx, &req)  if err != nil {  log.Printf"Failed to call GenerateAccessToken with request: %v, error: %v", req, err)  return token, err  }  return rsp.AccessToken, nil } func refreshTokenLoop() {  if *refreshDuration > *lifetime {  log.Fatal("Refresh should not happen after token is already expired.")  }  for {  mu.RLock()  lastRefreshTime := lastRefreshInstant  mu.RUnlock()  if time.Now().After(lastRefreshTime.Add(*refreshDuration)) {  var err error  retrievedToken, err := retrieveToken()  mu.Lock()  token = retrievedToken  if err != nil {  errLastSeen = err  } else {  lastRefreshInstant = time.Now()  }  mu.Unlock()  }  time.Sleep(*checkTokenExpiryInterval)  } } func retrieveTokenFunc() (string, string) {  mu.RLock()  defer mu.RUnlock()  if time.Now().After(lastRefreshInstant.Add(*refreshDuration)) {  log.Printf("Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v", lastRefreshInstant, *refreshDuration, errLastSeen)  return "", ""  }  username := "default"  password := token  return username, password } func main() {  // Load CA cert  caFilePath := CA_FILE_PATH  clusterDicEpAddr := CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS_AND_PORT  caCert, err := ioutil.ReadFile(caFilePath)  if err != nil {  log.Fatal(err)  }  caCertPool := x509.NewCertPool()  caCertPool.AppendCertsFromPEM(caCert)  token, err = retrieveToken()  if err != nil {  log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err)  }  lastRefreshInstant = time.Now()  go refreshTokenLoop()  // Setup Redis Connection pool  client := redis.NewClusterClient(&redis.ClusterOptions{  Addrs: []string{clusterDicEpAddr},  // PoolSize applies per cluster node and not for the whole cluster.  PoolSize: 10,  ConnMaxIdleTime: 60 * time.Second,  MinIdleConns: 1,  CredentialsProvider: retrieveTokenFunc,  TLSConfig: &tls.Config{  RootCAs: caCertPool,  },  })  ctx := context.Background()  err = client.Set(ctx, "key", "value", 0).Err()  if err != nil {  log.Fatal(err)  }  val, err := client.Get(ctx, "key").Result()  if err != nil {  log.Fatal(err)  }  fmt.Printf("Got the value for key: key, which is %s \n", val) }