This page provides code samples for configuring client libraries to connect to clusters in Memorystore for Redis Cluster.
Client library code sample
This section shows a Lettuce client library code sample for connecting to a cluster. For this sample, the cluster doesn't use Identity and Access Management (IAM) authentication or in-transit encryption .
Lettuce
We recommend using Lettuce, versions 6.2.4 and later.
// Create RedisURI from the MRC discovery endpoint RedisURI redisUri = RedisURI . Builder . redis ( CLUSTER_DISC_EP_ADDR , CLUSTER_DISC_EP_PORT ). build (); // Configure client' resources // Configure reconnectDelay with exponential backoff and full jitter ClientResources resources = DefaultClientResources . builder () . reconnectDelay ( Delay . fullJitter ( Duration . ofMillis ( 100 ), // minimum 100 millisecond delay Duration . ofSeconds ( 5 ), // maximum 5 second delay 100 , TimeUnit . MILLISECONDS ) // 100 millisecond base ). build (); // Create a cluster client with the URI and resources RedisClusterClient clusterClient = RedisClusterClient . create ( resources , redisUri ); // Configure the topology refreshment options // Enable periodic cluster topology updates so that the client updates the cluster topology in the intervals of // 60 seconds // Enable adaptive topology refresh that uses all triggers: MOVED_REDIRECT, ASK_REDIRECT, // PERSISTENT_RECONNECTS, UNCOVERED_SLOT, UNKNOWN_NODE // Disable dynamicRefreshSources so that only the initial seed nodes (Memorystore for Redis Cluster // discovery endpoint) will be used as the source for topology discovery // Enable closing stale connections when refreshing the cluster topology. This reduces the need to handle // failed connections during command runtime. ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions . builder () . enablePeriodicRefresh ( 1 , TimeUnit . MINUTES ) . enableAllAdaptiveRefreshTriggers () . dynamicRefreshSources ( false ) . closeStaleConnections ( true ) . build (); // Configure the socket options // Set connectTimeout based on your application requirements and workload // Enable TCP keepAlive to reduce the need to handle failed connections during command runtime SocketOptions socketOptions = SocketOptions . builder () . connectTimeout ( CONNECT_TIMEOUT ) . keepAlive ( true ) . build (); // Configure the client options // Enable AutoReconnect when connection is lost // Set nodeFilter to filter out failed nodes from the topology // Disable validateClusterNodeMembership to allow redirecting commands to newly added nodes clusterClient . setOptions ( ClusterClientOptions . builder () . topologyRefreshOptions ( topologyRefreshOptions ) . socketOptions ( socketOptions ) . autoReconnect ( true ) . nodeFilter ( it -> ! ( it . is ( RedisClusterNode . NodeFlag . FAIL ) || it . is ( RedisClusterNode . NodeFlag . EVENTUAL_FAIL ) || it . is ( RedisClusterNode . NodeFlag . NOADDR ))) . validateClusterNodeMembership ( false ) . build ()); // Create a connection pool GenericObjectPool<StatefulRedisClusterConnection<String , String > pool = ConnectionPoolSupport . createGenericObjectPool (() -> clusterClient . connect (), new GenericObjectPoolConfig ()); pool . setMaxTotal ( MAX_CONNECTIONS_IN_CONNECTION_POOL ); // Get a connection from the connection pool StatefulRedisClusterConnection<String , String > connection = pool . borrowObject (); // Get a cluster sync command and call 'set' RedisAdvancedClusterCommands<String , String > syncCommands = connection . sync (); syncCommands . set ( key , value );
In-transit encryption client library code sample
This section gives an example of client code for authenticating with in-transit encryption for your Memorystore cluster with the go-redis client library.
go-redis
We recommend using go-redis, versions 9.11.0 and later.
import ( "context" "crypto/tls" "crypto/x509" "io/ioutil" "log" "time" "github.com/go-redis/redis/v9" ) func example () { // Load CA cert caFilePath :=caCert , err := ioutil . ReadFile ( caFilePath ) if err != nil { log . Fatal ( err ) } caCertPool := x509 . NewCertPool () caCertPool . AppendCertsFromPEM ( caCert ) // Setup Redis Connection pool client := redis . NewClusterClient ( & redis . ClusterOptions { Addrs : [] string { "CLUSTER_DISC_EP_ADDR:CLUSTER_DISC_EP_PORT" }, // PoolSize applies per cluster node and not for the whole cluster. PoolSize : 10 , ConnMaxIdleTime : 60 * time . Second , MinIdleConns : 1 , TLSConfig : & tls . Config { RootCAs : caCertPool , }, }) ctx := context . Background () err = client . Set ( ctx , "key" , "value" , 0 ). Err () if err != nil { log . Fatal ( err ) } }
IAM authentication and in-transit encryption code sample
This section gives an example of how to authenticate and connect to a cluster by using both IAM authentication and in-transit encryption with various client libraries:
redis-py
We recommend using redis-py , versions 5.1 and later.
from google.cloud import iam_credentials _v1 from redis.backoff import ConstantBackoff from redis.retry import Retry from redis.exceptions import ( ConnectionError , AuthenticationWrongNumberOfArgsError , AuthenticationError ) from redis.utils import ( str_if_bytes ) import redis service_account = "projects/-/serviceAccounts/<TO-DO-1: email of service account used to authenticate to Redis Cluster>" host = < TO - DO - 2 : your Redis Cluster discovery endpoint ip > ssl_ca_certs = < TO - DO - 3 , your trusted server ca file name > def generate_access_token (): # Create a client client = iam_credentials_v1 . IAMCredentialsClient () # Initialize request argument(s) request = iam_credentials_v1 . GenerateAccessTokenRequest ( name = service_account , scope = [ 'https://www.googleapis.com/auth/cloud-platform' ], ) # Make the request response = client . generate_access_token ( request = request ) # Handle the response return str ( response . access_token ) def iam_connect ( self ): "Initialize the connection and authenticate" self . _parser . on_connect ( self ) auth_args = ( generate_access_token (),) self . send_command ( "AUTH" , * auth_args , check_health = False ) try : auth_response = self . read_response () except AuthenticationWrongNumberOfArgsError : self . send_command ( "AUTH" , self . password , check_health = False ) auth_response = self . read_response () if str_if_bytes ( auth_response ) != "OK" : raise AuthenticationError ( "Invalid Username or Password" ) # Connect to Memorystore for Redis Cluster backoff = ConstantBackoff ( 3 ) retry = Retry ( retries =- 1 , backoff = backoff , supported_errors = ( ConnectionError , ConnectionResetError )) r = redis . cluster . RedisCluster ( host = host , port = 6379 , redis_connect_func = iam_connect , retry = retry , ssl = True , ssl_ca_certs = ssl_ca_certs ) print ( r . get ( 'key' ))
Lettuce
We recommend using Lettuce , versions 6.2.4 and later.
import com.google.cloud.iam.credentials.v1. GenerateAccessTokenResponse ; import com.google.cloud.iam.credentials.v1. IamCredentialsClient ; import io.lettuce.core.RedisCredentials ; import io.lettuce.core.RedisCredentialsProvider ; import io.lettuce.core.RedisURI ; import io.lettuce.core.SocketOptions ; import io.lettuce.core.SslOptions ; import io.lettuce.core.cluster.ClusterClientOptions ; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions ; import io.lettuce.core.cluster.RedisClusterClient ; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection ; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands ; import io.lettuce.core.cluster.models.partitions.RedisClusterNode ; import io.lettuce.core.resource.ClientResources ; import io.lettuce.core.resource.DefaultClientResources ; import io.lettuce.core.resource.Delay ; import java.io.Closeable ; import java.io.File ; import java.time.Duration ; import java.time.Instant ; import java.util.ArrayList ; import java.util.Collections ; import java.util.List ; import java.util.concurrent.Executors ; import java.util.concurrent.ScheduledExecutorService ; import java.util.concurrent.TimeUnit ; import java.util.logging.Level ; import java.util.logging.Logger ; import reactor.core.publisher.Mono ; public class IAMAuth { /** * This thread-safe implementation (excluding the main app below) is intended for production use. * It provides a background refresh logic that shouldn't overload IAM service in case. the * application has many many connections (connection storms can result in IAM throttles * otherwise).
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider , Runnable , Closeable { private static final Logger logger = Logger . getLogger ( RedisClusterCredentialsProvider . class . getName ()); private final ScheduledExecutorService service = Executors . newSingleThreadScheduledExecutor (); private final IamCredentialsClient iamClient ; private final String accountName ; private final Duration refreshDuration ; private final Duration lifetime ; private volatile RedisCredentials credentials ; private volatile Instant lastRefreshInstant ; private volatile Exception lastException ; /** * AccountName: * "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; * RefreshDuration: Duration.ofSeconds(300) Lifetime: Duration.ofSeconds(3600); */ public RedisClusterCredentialsProvider ( String accountName , Duration refreshDuration , Duration lifetime ) throws Exception { this . iamClient = IamCredentialsClient . create (); this . accountName = accountName ; this . refreshDuration = refreshDuration ; this . lifetime = lifetime ; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow (); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service . scheduleWithFixedDelay ( this , 10 , 10 , TimeUnit . SECONDS ); } @Override public MonoresolveCredentials () { if ( hasTokenExpired ()) { throw new RuntimeException ( "Background IAM token refresh failed" , lastException ); } return Mono . just ( this . credentials ); } private boolean hasTokenExpired () { if ( this . lastRefreshInstant == null || this . lifetime == null ) { return true ; } return Instant . now (). isAfter ( this . lastRefreshInstant . plus ( this . lifetime )); } // To be invoked by customer app on shutdown @Override public void close () { service . shutdown (); iamClient . close (); } @Override public void run () { try { // fetch token if it is time to refresh if ( this . lastRefreshInstant != null && this . refreshDuration != null && Instant . now (). isBefore ( this . lastRefreshInstant . plus ( this . refreshDuration ))) { // nothing to do return ; } refreshTokenNow (); } catch ( Exception e ) { // suppress all errors as we cannot allow the task to die // log for visibility logger . log ( Level . parse ( "SEVERE" ), "Background IAM token refresh failed" , e ); } } private void refreshTokenNow () { try { logger . info ( "Refreshing IAM token" ); com . google . protobuf . Duration lifetimeProto = com . google . protobuf . Duration . newBuilder () . setSeconds ( lifetime . getSeconds ()) . setNanos ( lifetime . getNano ()) . build (); GenerateAccessTokenResponse response = this . iamClient . generateAccessToken ( this . accountName , new ArrayList <> (), Collections . singletonList ( "https://www.googleapis.com/auth/cloud-platform" ), lifetimeProto ); // got a successful token refresh this . credentials = new RedisCredentials () { @Override public boolean hasUsername () { return false ; } @Override public boolean hasPassword () { return true ; } @Override public String getUsername () { return "default" ; } @Override public char [] getPassword () { return response . getAccessToken (). toCharArray (); } }; this . lastRefreshInstant = Instant . now (); // clear the last saved exception this . lastException = null ; logger . info ( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this . refreshDuration + "], accountName [" + this . accountName + "] and lifetime [" + this . lifetime + "]" ); } catch ( Exception e ) { // Save last exception for inline feedback this . lastException = e ; // Bubble up for direct feedback throw e ; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main ( String [] args ) throws Exception { // These are the parameters the user needs to replace String discoveryEndpointIp = " CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS " ; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER ; String accountName = " ACCOUNT_NAME " ; String caFileName = " CA_FILE_NAME " ; int refreshDurationSec = REFRESH_DURATION_SEC ; int lifetimeSec = LIFETIME_SEC ; RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider ( accountName , Duration . ofSeconds ( refreshDurationSec ), Duration . ofSeconds ( lifetimeSec )); RedisURI redisUri = RedisURI . Builder . redis ( discoveryEndpointIp , discoveryEndpointPort ) . withSsl ( true ) . withAuthentication ( credentialsProvider ) . build (); ClientResources resources = DefaultClientResources . builder () . reconnectDelay ( Delay . fullJitter ( Duration . ofMillis ( 100 ), Duration . ofSeconds ( 5 ), 100 , TimeUnit . MILLISECONDS )) . build (); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions . builder () . enablePeriodicRefresh ( 1 , TimeUnit . MINUTES ) . enableAllAdaptiveRefreshTriggers () . dynamicRefreshSources ( false ) . closeStaleConnections ( true ) . build (); SslOptions sslOptions = SslOptions . builder (). jdkSslProvider (). trustManager ( new File ( caFileName )). build (); SocketOptions socketOptions = SocketOptions . builder (). connectTimeout ( Duration . ofSeconds ( 5 )). keepAlive ( true ). build (); // Create Redis Cluster Client RedisClusterClient clusterClient = RedisClusterClient . create ( resources , redisUri ); clusterClient . setOptions ( ClusterClientOptions . builder () . topologyRefreshOptions ( topologyRefreshOptions ) . socketOptions ( socketOptions ) . sslOptions ( sslOptions ) . autoReconnect ( true ) . nodeFilter ( it -> ! ( it . is ( RedisClusterNode . NodeFlag . FAIL ) || it . is ( RedisClusterNode . NodeFlag . EVENTUAL_FAIL ) || it . is ( RedisClusterNode . NodeFlag . NOADDR ))) . validateClusterNodeMembership ( false ) . build ()); // Establish connection to Redis Cluster StatefulRedisClusterConnection < String , String > connection = clusterClient . connect (); // Retrieve synchronous Redis Cluster commands RedisAdvancedClusterCommands < String , String > syncCommands = connection . sync (); // Perform Redis operations syncCommands . set ( "key1" , "value1" ); String value = syncCommands . get ( "key1" ); System . out . println ( "Retrieved value: " + value ); int count = 0 ; for ( int i = 0 ; i < 1000 ; i ++ ) { String k = "lettucekey" + String . valueOf ( i ); String v = "lettucevalue" + String . valueOf ( i ); syncCommands . set ( k , v ); String got = syncCommands . get ( k ); if ( got . equals ( v )) { count ++ ; } else { System . out . println ( "unexpected value" ); } } System . out . println ( "Successfully got " + String . valueOf ( count ) + " keys" ); // Close the connection and shutdown the client connection . close (); clusterClient . shutdown (); (( Closeable ) credentialsProvider ). close (); } }
Jedis
We recommend using Jedis , versions 4.4.0 and later.
import com.google.cloud.iam.credentials.v1. GenerateAccessTokenResponse ; import com.google.cloud.iam.credentials.v1. IamCredentialsClient ; import java.io.Closeable ; import java.io.FileInputStream ; import java.io.InputStream ; import java.security.KeyStore ; import java.security.cert.CertificateFactory ; import java.security.cert.X509Certificate ; import java.time.Duration ; import java.time.Instant ; import java.util.ArrayList ; import java.util.Collections ; import java.util.List ; import java.util.concurrent.Executors ; import java.util.concurrent.ScheduledExecutorService ; import java.util.concurrent.TimeUnit ; import java.util.logging.Level ; import java.util.logging.Logger ; import javax.net.ssl.SSLContext ; import javax.net.ssl.TrustManagerFactory ; import org.apache.commons.pool2.impl.GenericObjectPoolConfig ; import redis.clients.jedis.Connection ; import redis.clients.jedis.DefaultJedisClientConfig ; import redis.clients.jedis.DefaultRedisCredentials ; import redis.clients.jedis.HostAndPort ; import redis.clients.jedis.JedisCluster ; import redis.clients.jedis.RedisCredentials ; import redis.clients.jedis.RedisCredentialsProvider ; /** Customers are free to update/replace code as they see fit. */ public class IAMAuth { /** * This thread-safe implementation (excluding the main app below) is intended for production use. * It provides a background refresh logic that shouldn't overload IAM service in case. the * application has many many connections (connection storms can result in IAM throttles * otherwise).
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider , Runnable , Closeable { private static final Logger logger = Logger . getLogger ( RedisClusterCredentialsProvider . class . getName ()); private final IamCredentialsClient iamClient ; private final ScheduledExecutorService service ; private final String accountName ; private final Duration refreshDuration ; private final Duration lifetime ; private volatile RedisCredentials credentials ; private volatile Instant lastRefreshInstant ; private volatile Exception lastException ; // AccountName: // "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; // RefreshDuration: Duration.ofSeconds(300); // Lifetime: Duration.ofSeconds(3600); public RedisClusterCredentialsProvider ( String accountName , Duration refreshDuration , Duration lifetime ) throws Exception { this . iamClient = IamCredentialsClient . create (); this . service = Executors . newSingleThreadScheduledExecutor (); this . accountName = accountName ; this . refreshDuration = refreshDuration ; this . lifetime = lifetime ; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow (); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service . scheduleWithFixedDelay ( this , 10 , 10 , TimeUnit . SECONDS ); } public RedisCredentials get () { if ( hasTokenExpired ()) { throw new RuntimeException ( "Background IAM token refresh failed" , lastException ); } return this . credentials ; } private boolean hasTokenExpired () { if ( this . lastRefreshInstant == null || this . lifetime == null ) { return true ; } return Instant . now (). isAfter ( this . lastRefreshInstant . plus ( this . lifetime )); } // To be invoked by customer app on shutdown @Override public void close () { service . shutdown (); iamClient . close (); } @Override public void run () { try { // fetch token if it is time to refresh if ( this . lastRefreshInstant != null && this . refreshDuration != null && Instant . now (). isBefore ( this . lastRefreshInstant . plus ( this . refreshDuration ))) { // nothing to do return ; } refreshTokenNow (); } catch ( Exception e ) { // suppress all errors as we cannot allow the task to die // log for visibility logger . log ( Level . parse ( "SEVERE" ), "Background IAM token refresh failed" , e ); } } private void refreshTokenNow () { try { logger . info ( "Refreshing IAM token" ); Listdelegates = new ArrayList <> (); com . google . protobuf . Duration lifetimeProto = com . google . protobuf . Duration . newBuilder () . setSeconds ( lifetime . getSeconds ()) . setNanos ( lifetime . getNano ()) . build (); GenerateAccessTokenResponse response = iamClient . generateAccessToken ( this . accountName , delegates , Collections . singletonList ( "https://www.googleapis.com/auth/cloud-platform" ), lifetimeProto ); // got a successful token refresh this . credentials = new DefaultRedisCredentials ( "default" , response . getAccessToken ()); this . lastRefreshInstant = Instant . now (); // clear the last saved exception this . lastException = null ; logger . info ( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this . refreshDuration + "], accountName [" + this . accountName + "] and lifetime [" + this . lifetime + "]" ); } catch ( Exception e ) { // Save last exception for inline feedback this . lastException = e ; // Bubble up for direct feedback throw e ; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main ( String [] args ) throws Exception { String discoveryEndpointIp = " CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS " ; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER ; GenericObjectPoolConfig config = new GenericObjectPoolConfig (); config . setTestWhileIdle ( true ); int timeout = 5000 ; int maxAttempts = 5 ; HostAndPort discovery = new HostAndPort ( discoveryEndpointIp , discoveryEndpointPort ); RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider ( "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com" , Duration . ofSeconds ( 300 ), Duration . ofSeconds ( 3600 )); // Create JedisCluster cluster InputStream is = new FileInputStream ( "server-ca.pem" ); // You could get a resource as a stream instead. CertificateFactory cf = CertificateFactory . getInstance ( "X.509" ); X509Certificate caCert = ( X509Certificate ) cf . generateCertificate ( is ); TrustManagerFactory tmf = TrustManagerFactory . getInstance ( TrustManagerFactory . getDefaultAlgorithm ()); KeyStore ks = KeyStore . getInstance ( KeyStore . getDefaultType ()); ks . load ( null ); // You don't need the KeyStore cluster to come from a file. ks . setCertificateEntry ( "caCert" , caCert ); tmf . init ( ks ); SSLContext sslContext = SSLContext . getInstance ( "TLS" ); sslContext . init ( null , tmf . getTrustManagers (), null ); JedisCluster jedisCluster = new JedisCluster ( discovery , DefaultJedisClientConfig . builder () . connectionTimeoutMillis ( timeout ) . socketTimeoutMillis ( timeout ) . credentialsProvider ( credentialsProvider ) . ssl ( true ) . sslSocketFactory ( sslContext . getSocketFactory ()) . build (), maxAttempts , config ); // Perform operations on the cluster jedisCluster . set ( "myKey" , "Hello, Redis Cluster!" ); String value = jedisCluster . get ( "myKey" ); System . out . println ( "Value for myKey: " + value ); int count = 0 ; for ( int i = 0 ; i < 1000 ; i ++ ) { String k = "jediskey" + String . valueOf ( i ); String v = "jedisvalue" + String . valueOf ( i ); jedisCluster . set ( k , v ); String got = jedisCluster . get ( k ); if ( got . equals ( v )) { count ++ ; } else { System . out . println ( "unexpected value" ); } } System . out . println ( "Successfully got " + String . valueOf ( count ) + " keys" ); // Disconnect from the cluster jedisCluster . close (); // Cleanup the resources used by the provider (( Closeable ) credentialsProvider ). close (); } }
Go
We recommend using Go , versions 1.24.5 and later.
package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "sync" "time" credentials "google.golang.org/genproto/googleapis/iam/credentials/v1" "github.com/golang/protobuf/ptypes" "github.com/redis/go-redis/v9" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" ) var ( svcAccount = flag . String ( "a" , "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com" , "service account email" ) lifetime = flag . Duration ( "d" , time . Hour , "lifetime of token" ) refreshDuration = flag . Duration ( "r" , 5 * time . Minute , "token refresh duration" ) checkTokenExpiryInterval = flag . Duration ( "e" , 10 * time . Second , "check token expiry interval" ) lastRefreshInstant = time . Time {} errLastSeen = error ( nil ) token = "" mu = sync . RWMutex {} ) func retrieveToken () ( string , error ) { ctx := context . Background () conn , err := gtransport . Dial ( ctx , option . WithEndpoint ( "iamcredentials.googleapis.com:443" ), option . WithScopes ( "https://www.googleapis.com/auth/cloud-platform" )) if err != nil { log . Printf ( "Failed to dial API, error: %v" , err ) return token , err } client := credentials . NewIAMCredentialsClient ( conn ) req := credentials . GenerateAccessTokenRequest { Name : * svcAccount , Scope : [] string { "https://www.googleapis.com/auth/cloud-platform" }, Lifetime : ptypes . DurationProto ( * lifetime ), } rsp , err := client . GenerateAccessToken ( ctx , & req ) if err != nil { log . Printf "Failed to call GenerateAccessToken with request: %v, error: %v" , req , err ) return token , err } return rsp . AccessToken , nil } func refreshTokenLoop () { if * refreshDuration > * lifetime { log . Fatal ( "Refresh should not happen after token is already expired." ) } for { mu . RLock () lastRefreshTime := lastRefreshInstant mu . RUnlock () if time . Now (). After ( lastRefreshTime . Add ( * refreshDuration )) { var err error retrievedToken , err := retrieveToken () mu . Lock () token = retrievedToken if err != nil { errLastSeen = err } else { lastRefreshInstant = time . Now () } mu . Unlock () } time . Sleep ( * checkTokenExpiryInterval ) } } func retrieveTokenFunc () ( string , string ) { mu . RLock () defer mu . RUnlock () if time . Now (). After ( lastRefreshInstant . Add ( * refreshDuration )) { log . Printf ( "Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v" , lastRefreshInstant , * refreshDuration , errLastSeen ) return "" , "" } username := "default" password := token return username , password } func main () { // Load CA cert caFilePath := CA_FILE_PATH clusterDicEpAddr := CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS_AND_PORT caCert , err := ioutil . ReadFile ( caFilePath ) if err != nil { log . Fatal ( err ) } caCertPool := x509 . NewCertPool () caCertPool . AppendCertsFromPEM ( caCert ) token , err = retrieveToken () if err != nil { log . Fatal ( "Cannot retrieve IAM token to authenticate to the cluster, error: %v" , err ) } lastRefreshInstant = time . Now () go refreshTokenLoop () // Setup Redis Connection pool client := redis . NewClusterClient ( & redis . ClusterOptions { Addrs : [] string { clusterDicEpAddr }, // PoolSize applies per cluster node and not for the whole cluster. PoolSize : 10 , ConnMaxIdleTime : 60 * time . Second , MinIdleConns : 1 , CredentialsProvider : retrieveTokenFunc , TLSConfig : & tls . Config { RootCAs : caCertPool , }, }) ctx := context . Background () err = client . Set ( ctx , "key" , "value" , 0 ). Err () if err != nil { log . Fatal ( err ) } val , err := client . Get ( ctx , "key" ). Result () if err != nil { log . Fatal ( err ) } fmt . Printf ( "Got the value for key: key, which is %s \n" , val ) }