Client library connection code samples

This page provides code samples for configuring client libraries to connect to clusters in Memorystore for Redis Cluster.

Client library code sample

This section shows a Lettuce client library code sample for connecting to a cluster. For this sample, the cluster doesn't use Identity and Access Management (IAM) authentication or in-transit encryption .

Lettuce

We recommend using Lettuce, versions 6.2.4 and later.

 // Create RedisURI from the MRC discovery endpoint 
 RedisURI 
  
 redisUri 
  
 = 
  
 RedisURI 
 . 
 Builder 
 . 
 redis 
 ( 
 CLUSTER_DISC_EP_ADDR 
 , 
  
 CLUSTER_DISC_EP_PORT 
 ). 
 build 
 (); 
 // Configure client' resources 
 // Configure reconnectDelay with exponential backoff and full jitter 
 ClientResources 
  
 resources 
  
 = 
  
 DefaultClientResources 
 . 
 builder 
 () 
  
 . 
 reconnectDelay 
 ( 
 Delay 
 . 
 fullJitter 
 ( 
  
 Duration 
 . 
 ofMillis 
 ( 
 100 
 ), 
  
 // minimum 100 millisecond delay 
  
 Duration 
 . 
 ofSeconds 
 ( 
 5 
 ), 
  
 // maximum 5 second delay 
  
 100 
 , 
  
 TimeUnit 
 . 
 MILLISECONDS 
 ) 
  
 // 100 millisecond base 
 ). 
 build 
 (); 
 // Create a cluster client with the URI and resources 
 RedisClusterClient 
  
 clusterClient 
  
 = 
  
 RedisClusterClient 
 . 
 create 
 ( 
 resources 
 , 
  
 redisUri 
 ); 
 // Configure the topology refreshment options 
 // Enable periodic cluster topology updates so that the client updates the cluster topology in the intervals of 
 // 60 seconds 
 // Enable adaptive topology refresh that uses all triggers: MOVED_REDIRECT, ASK_REDIRECT, 
 // PERSISTENT_RECONNECTS, UNCOVERED_SLOT, UNKNOWN_NODE 
 // Disable dynamicRefreshSources so that only the initial seed nodes (Memorystore for Redis Cluster 
 // discovery endpoint) will be used as the source for topology discovery 
 // Enable closing stale connections when refreshing the cluster topology. This reduces the need to handle 
 // failed connections during command runtime. 
 ClusterTopologyRefreshOptions 
  
 topologyRefreshOptions 
  
 = 
  
 ClusterTopologyRefreshOptions 
 . 
 builder 
 () 
  
 . 
 enablePeriodicRefresh 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ) 
  
 . 
 enableAllAdaptiveRefreshTriggers 
 () 
  
 . 
 dynamicRefreshSources 
 ( 
 false 
 ) 
  
 . 
 closeStaleConnections 
 ( 
 true 
 ) 
  
 . 
 build 
 (); 
 // Configure the socket options 
 // Set connectTimeout based on your application requirements and workload 
 // Enable TCP keepAlive to reduce the need to handle failed connections during command runtime 
 SocketOptions 
  
 socketOptions 
  
 = 
  
 SocketOptions 
 . 
 builder 
 () 
  
 . 
 connectTimeout 
 ( 
 CONNECT_TIMEOUT 
 ) 
  
 . 
 keepAlive 
 ( 
 true 
 ) 
  
 . 
 build 
 (); 
 // Configure the client options 
 // Enable AutoReconnect when connection is lost 
 // Set nodeFilter to filter out failed nodes from the topology 
 // Disable validateClusterNodeMembership to allow redirecting commands to newly added nodes 
 clusterClient 
 . 
 setOptions 
 ( 
 ClusterClientOptions 
 . 
 builder 
 () 
  
 . 
 topologyRefreshOptions 
 ( 
 topologyRefreshOptions 
 ) 
  
 . 
 socketOptions 
 ( 
 socketOptions 
 ) 
  
 . 
 autoReconnect 
 ( 
 true 
 ) 
  
 . 
 nodeFilter 
 ( 
 it 
  
 -> 
  
 ! 
  
 ( 
 it 
 . 
 is 
 ( 
 RedisClusterNode 
 . 
 NodeFlag 
 . 
 FAIL 
 ) 
  
 || 
  
 it 
 . 
 is 
 ( 
 RedisClusterNode 
 . 
 NodeFlag 
 . 
 EVENTUAL_FAIL 
 ) 
  
 || 
  
 it 
 . 
 is 
 ( 
 RedisClusterNode 
 . 
 NodeFlag 
 . 
 NOADDR 
 ))) 
  
 . 
 validateClusterNodeMembership 
 ( 
 false 
 ) 
  
 . 
 build 
 ()); 
 // Create a connection pool 
 GenericObjectPool<StatefulRedisClusterConnection<String 
 , 
  
 String 
>  
 pool 
  
 = 
  
 ConnectionPoolSupport 
 . 
 createGenericObjectPool 
 (() 
  
 -> 
  
 clusterClient 
 . 
 connect 
 (), 
  
 new 
  
 GenericObjectPoolConfig 
 ()); 
 pool 
 . 
 setMaxTotal 
 ( 
 MAX_CONNECTIONS_IN_CONNECTION_POOL 
 ); 
 // Get a connection from the connection pool 
 StatefulRedisClusterConnection<String 
 , 
  
 String 
>  
 connection 
  
 = 
  
 pool 
 . 
 borrowObject 
 (); 
 // Get a cluster sync command and call 'set' 
 RedisAdvancedClusterCommands<String 
 , 
  
 String 
>  
 syncCommands 
  
 = 
  
 connection 
 . 
 sync 
 (); 
 syncCommands 
 . 
 set 
 ( 
 key 
 , 
  
 value 
 ); 

In-transit encryption client library code sample

This section gives an example of client code for authenticating with in-transit encryption for your Memorystore cluster with the go-redis client library.

go-redis

We recommend using go-redis, versions 9.11.0 and later.

 import 
  
 ( 
  
 "context" 
  
 "crypto/tls" 
  
 "crypto/x509" 
  
 "io/ioutil" 
  
 "log" 
  
 "time" 
  
 "github.com/go-redis/redis/v9" 
 ) 
 func 
  
 example 
 () 
  
 { 
  
 // Load CA cert 
  
 caFilePath 
  
 := 
  
   
 caCert 
 , 
  
 err 
  
 := 
  
 ioutil 
 . 
 ReadFile 
 ( 
 caFilePath 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 err 
 ) 
  
 } 
  
 caCertPool 
  
 := 
  
 x509 
 . 
 NewCertPool 
 () 
  
 caCertPool 
 . 
 AppendCertsFromPEM 
 ( 
 caCert 
 ) 
  
 // Setup Redis Connection pool 
  
 client 
  
 := 
  
 redis 
 . 
 NewClusterClient 
 ( 
 & 
 redis 
 . 
 ClusterOptions 
 { 
  
 Addrs 
 : 
  
 [] 
 string 
 { 
 "CLUSTER_DISC_EP_ADDR:CLUSTER_DISC_EP_PORT" 
 }, 
  
 // PoolSize applies per cluster node and not for the whole cluster. 
  
 PoolSize 
 : 
  
 10 
 , 
  
 ConnMaxIdleTime 
 : 
  
 60 
  
 * 
  
 time 
 . 
 Second 
 , 
  
 MinIdleConns 
 : 
  
 1 
 , 
  
 TLSConfig 
 : 
  
 & 
 tls 
 . 
 Config 
 { 
  
 RootCAs 
 : 
  
 caCertPool 
 , 
  
 }, 
  
 }) 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 err 
  
 = 
  
 client 
 . 
 Set 
 ( 
 ctx 
 , 
  
 "key" 
 , 
  
 "value" 
 , 
  
 0 
 ). 
 Err 
 () 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 err 
 ) 
  
 } 
 } 
 

IAM authentication and in-transit encryption code sample

This section gives an example of how to authenticate and connect to a cluster by using both IAM authentication and in-transit encryption with various client libraries:

redis-py

We recommend using redis-py , versions 5.1 and later.

 from 
  
 google.cloud 
  
 import 
  iam_credentials 
 
_v1 from 
  
 redis.backoff 
  
 import 
 ConstantBackoff 
 from 
  
 redis.retry 
  
 import 
 Retry 
 from 
  
 redis.exceptions 
  
 import 
 ( 
 ConnectionError 
 , 
 AuthenticationWrongNumberOfArgsError 
 , 
 AuthenticationError 
 ) 
 from 
  
 redis.utils 
  
 import 
 ( 
 str_if_bytes 
 ) 
 import 
  
 redis 
 service_account 
 = 
 "projects/-/serviceAccounts/<TO-DO-1: email of service account used to authenticate to Redis Cluster>" 
 host 
 = 
< TO 
 - 
 DO 
 - 
 2 
 : 
 your 
 Redis 
 Cluster 
 discovery 
 endpoint 
 ip 
 > 
 ssl_ca_certs 
 = 
< TO 
 - 
 DO 
 - 
 3 
 , 
 your 
 trusted 
 server 
 ca 
 file 
 name 
 > 
 def 
  
 generate_access_token 
 (): 
 # Create a client 
 client 
 = 
 iam_credentials_v1 
 . 
  IAMCredentialsClient 
 
 () 
 # Initialize request argument(s) 
 request 
 = 
 iam_credentials_v1 
 . 
  GenerateAccessTokenRequest 
 
 ( 
 name 
 = 
 service_account 
 , 
 scope 
 = 
 [ 
 'https://www.googleapis.com/auth/cloud-platform' 
 ], 
 ) 
 # Make the request 
 response 
 = 
 client 
 . 
  generate_access_token 
 
 ( 
 request 
 = 
 request 
 ) 
 # Handle the response 
 return 
 str 
 ( 
 response 
 . 
 access_token 
 ) 
 def 
  
 iam_connect 
 ( 
 self 
 ): 
 "Initialize the connection and authenticate" 
 self 
 . 
 _parser 
 . 
 on_connect 
 ( 
 self 
 ) 
 auth_args 
 = 
 ( 
 generate_access_token 
 (),) 
 self 
 . 
 send_command 
 ( 
 "AUTH" 
 , 
 * 
 auth_args 
 , 
 check_health 
 = 
 False 
 ) 
 try 
 : 
 auth_response 
 = 
 self 
 . 
 read_response 
 () 
 except 
 AuthenticationWrongNumberOfArgsError 
 : 
 self 
 . 
 send_command 
 ( 
 "AUTH" 
 , 
 self 
 . 
 password 
 , 
 check_health 
 = 
 False 
 ) 
 auth_response 
 = 
 self 
 . 
 read_response 
 () 
 if 
 str_if_bytes 
 ( 
 auth_response 
 ) 
 != 
 "OK" 
 : 
 raise 
 AuthenticationError 
 ( 
 "Invalid Username or Password" 
 ) 
 # Connect to Memorystore for Redis Cluster 
 backoff 
 = 
 ConstantBackoff 
 ( 
 3 
 ) 
 retry 
 = 
 Retry 
 ( 
 retries 
 =- 
 1 
 , 
 backoff 
 = 
 backoff 
 , 
 supported_errors 
 = 
 ( 
 ConnectionError 
 , 
 ConnectionResetError 
 )) 
 r 
 = 
 redis 
 . 
 cluster 
 . 
 RedisCluster 
 ( 
 host 
 = 
 host 
 , 
 port 
 = 
 6379 
 , 
 redis_connect_func 
 = 
 iam_connect 
 , 
 retry 
 = 
 retry 
 , 
 ssl 
 = 
 True 
 , 
 ssl_ca_certs 
 = 
 ssl_ca_certs 
 ) 
 print 
 ( 
 r 
 . 
 get 
 ( 
 'key' 
 )) 

Lettuce

We recommend using Lettuce , versions 6.2.4 and later.

 import 
  
 com.google.cloud.iam.credentials.v1. GenerateAccessTokenResponse 
 
 ; 
 import 
  
 com.google.cloud.iam.credentials.v1. IamCredentialsClient 
 
 ; 
 import 
  
 io.lettuce.core.RedisCredentials 
 ; 
 import 
  
 io.lettuce.core.RedisCredentialsProvider 
 ; 
 import 
  
 io.lettuce.core.RedisURI 
 ; 
 import 
  
 io.lettuce.core.SocketOptions 
 ; 
 import 
  
 io.lettuce.core.SslOptions 
 ; 
 import 
  
 io.lettuce.core.cluster.ClusterClientOptions 
 ; 
 import 
  
 io.lettuce.core.cluster.ClusterTopologyRefreshOptions 
 ; 
 import 
  
 io.lettuce.core.cluster.RedisClusterClient 
 ; 
 import 
  
 io.lettuce.core.cluster.api.StatefulRedisClusterConnection 
 ; 
 import 
  
 io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands 
 ; 
 import 
  
 io.lettuce.core.cluster.models.partitions.RedisClusterNode 
 ; 
 import 
  
 io.lettuce.core.resource.ClientResources 
 ; 
 import 
  
 io.lettuce.core.resource.DefaultClientResources 
 ; 
 import 
  
 io.lettuce.core.resource.Delay 
 ; 
 import 
  
 java.io.Closeable 
 ; 
 import 
  
 java.io.File 
 ; 
 import 
  
 java.time.Duration 
 ; 
 import 
  
 java.time.Instant 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.Collections 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.Executors 
 ; 
 import 
  
 java.util.concurrent.ScheduledExecutorService 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.logging.Level 
 ; 
 import 
  
 java.util.logging.Logger 
 ; 
 import 
  
 reactor.core.publisher.Mono 
 ; 
 public 
  
 class 
 IAMAuth 
  
 { 
  
 /** 
 * This thread-safe implementation (excluding the main app below) is intended for production use. 
 * It provides a background refresh logic that shouldn't overload IAM service in case. the 
 * application has many many connections (connection storms can result in IAM throttles 
 * otherwise). 

*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider , Runnable , Closeable { private static final Logger logger = Logger . getLogger ( RedisClusterCredentialsProvider . class . getName ()); private final ScheduledExecutorService service = Executors . newSingleThreadScheduledExecutor (); private final IamCredentialsClient iamClient ; private final String accountName ; private final Duration refreshDuration ; private final Duration lifetime ; private volatile RedisCredentials credentials ; private volatile Instant lastRefreshInstant ; private volatile Exception lastException ; /** * AccountName: * "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; * RefreshDuration: Duration.ofSeconds(300) Lifetime: Duration.ofSeconds(3600); */ public RedisClusterCredentialsProvider ( String accountName , Duration refreshDuration , Duration lifetime ) throws Exception { this . iamClient = IamCredentialsClient . create (); this . accountName = accountName ; this . refreshDuration = refreshDuration ; this . lifetime = lifetime ; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow (); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service . scheduleWithFixedDelay ( this , 10 , 10 , TimeUnit . SECONDS ); } @Override public Mono resolveCredentials () { if ( hasTokenExpired ()) { throw new RuntimeException ( "Background IAM token refresh failed" , lastException ); } return Mono . just ( this . credentials ); } private boolean hasTokenExpired () { if ( this . lastRefreshInstant == null || this . lifetime == null ) { return true ; } return Instant . now (). isAfter ( this . lastRefreshInstant . plus ( this . lifetime )); } // To be invoked by customer app on shutdown @Override public void close () { service . shutdown (); iamClient . close (); } @Override public void run () { try { // fetch token if it is time to refresh if ( this . lastRefreshInstant != null && this . refreshDuration != null && Instant . now (). isBefore ( this . lastRefreshInstant . plus ( this . refreshDuration ))) { // nothing to do return ; } refreshTokenNow (); } catch ( Exception e ) { // suppress all errors as we cannot allow the task to die // log for visibility logger . log ( Level . parse ( "SEVERE" ), "Background IAM token refresh failed" , e ); } } private void refreshTokenNow () { try { logger . info ( "Refreshing IAM token" ); com . google . protobuf . Duration lifetimeProto = com . google . protobuf . Duration . newBuilder () . setSeconds ( lifetime . getSeconds ()) . setNanos ( lifetime . getNano ()) . build (); GenerateAccessTokenResponse response = this . iamClient . generateAccessToken ( this . accountName , new ArrayList <> (), Collections . singletonList ( "https://www.googleapis.com/auth/cloud-platform" ), lifetimeProto ); // got a successful token refresh this . credentials = new RedisCredentials () { @Override public boolean hasUsername () { return false ; } @Override public boolean hasPassword () { return true ; } @Override public String getUsername () { return "default" ; } @Override public char [] getPassword () { return response . getAccessToken (). toCharArray (); } }; this . lastRefreshInstant = Instant . now (); // clear the last saved exception this . lastException = null ; logger . info ( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this . refreshDuration + "], accountName [" + this . accountName + "] and lifetime [" + this . lifetime + "]" ); } catch ( Exception e ) { // Save last exception for inline feedback this . lastException = e ; // Bubble up for direct feedback throw e ; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main ( String [] args ) throws Exception { // These are the parameters the user needs to replace String discoveryEndpointIp = " CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS " ; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER ; String accountName = " ACCOUNT_NAME " ; String caFileName = " CA_FILE_NAME " ; int refreshDurationSec = REFRESH_DURATION_SEC ; int lifetimeSec = LIFETIME_SEC ; RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider ( accountName , Duration . ofSeconds ( refreshDurationSec ), Duration . ofSeconds ( lifetimeSec )); RedisURI redisUri = RedisURI . Builder . redis ( discoveryEndpointIp , discoveryEndpointPort ) . withSsl ( true ) . withAuthentication ( credentialsProvider ) . build (); ClientResources resources = DefaultClientResources . builder () . reconnectDelay ( Delay . fullJitter ( Duration . ofMillis ( 100 ), Duration . ofSeconds ( 5 ), 100 , TimeUnit . MILLISECONDS )) . build (); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions . builder () . enablePeriodicRefresh ( 1 , TimeUnit . MINUTES ) . enableAllAdaptiveRefreshTriggers () . dynamicRefreshSources ( false ) . closeStaleConnections ( true ) . build (); SslOptions sslOptions = SslOptions . builder (). jdkSslProvider (). trustManager ( new File ( caFileName )). build (); SocketOptions socketOptions = SocketOptions . builder (). connectTimeout ( Duration . ofSeconds ( 5 )). keepAlive ( true ). build (); // Create Redis Cluster Client RedisClusterClient clusterClient = RedisClusterClient . create ( resources , redisUri ); clusterClient . setOptions ( ClusterClientOptions . builder () . topologyRefreshOptions ( topologyRefreshOptions ) . socketOptions ( socketOptions ) . sslOptions ( sslOptions ) . autoReconnect ( true ) . nodeFilter ( it -> ! ( it . is ( RedisClusterNode . NodeFlag . FAIL ) || it . is ( RedisClusterNode . NodeFlag . EVENTUAL_FAIL ) || it . is ( RedisClusterNode . NodeFlag . NOADDR ))) . validateClusterNodeMembership ( false ) . build ()); // Establish connection to Redis Cluster StatefulRedisClusterConnection < String , String > connection = clusterClient . connect (); // Retrieve synchronous Redis Cluster commands RedisAdvancedClusterCommands < String , String > syncCommands = connection . sync (); // Perform Redis operations syncCommands . set ( "key1" , "value1" ); String value = syncCommands . get ( "key1" ); System . out . println ( "Retrieved value: " + value ); int count = 0 ; for ( int i = 0 ; i < 1000 ; i ++ ) { String k = "lettucekey" + String . valueOf ( i ); String v = "lettucevalue" + String . valueOf ( i ); syncCommands . set ( k , v ); String got = syncCommands . get ( k ); if ( got . equals ( v )) { count ++ ; } else { System . out . println ( "unexpected value" ); } } System . out . println ( "Successfully got " + String . valueOf ( count ) + " keys" ); // Close the connection and shutdown the client connection . close (); clusterClient . shutdown (); (( Closeable ) credentialsProvider ). close (); } }

Jedis

We recommend using Jedis , versions 4.4.0 and later.

 import 
  
 com.google.cloud.iam.credentials.v1. GenerateAccessTokenResponse 
 
 ; 
 import 
  
 com.google.cloud.iam.credentials.v1. IamCredentialsClient 
 
 ; 
 import 
  
 java.io.Closeable 
 ; 
 import 
  
 java.io.FileInputStream 
 ; 
 import 
  
 java.io.InputStream 
 ; 
 import 
  
 java.security.KeyStore 
 ; 
 import 
  
 java.security.cert.CertificateFactory 
 ; 
 import 
  
 java.security.cert.X509Certificate 
 ; 
 import 
  
 java.time.Duration 
 ; 
 import 
  
 java.time.Instant 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.Collections 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.Executors 
 ; 
 import 
  
 java.util.concurrent.ScheduledExecutorService 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.logging.Level 
 ; 
 import 
  
 java.util.logging.Logger 
 ; 
 import 
  
 javax.net.ssl.SSLContext 
 ; 
 import 
  
 javax.net.ssl.TrustManagerFactory 
 ; 
 import 
  
 org.apache.commons.pool2.impl.GenericObjectPoolConfig 
 ; 
 import 
  
 redis.clients.jedis.Connection 
 ; 
 import 
  
 redis.clients.jedis.DefaultJedisClientConfig 
 ; 
 import 
  
 redis.clients.jedis.DefaultRedisCredentials 
 ; 
 import 
  
 redis.clients.jedis.HostAndPort 
 ; 
 import 
  
 redis.clients.jedis.JedisCluster 
 ; 
 import 
  
 redis.clients.jedis.RedisCredentials 
 ; 
 import 
  
 redis.clients.jedis.RedisCredentialsProvider 
 ; 
 /** Customers are free to update/replace code as they see fit. */ 
 public 
  
 class 
 IAMAuth 
  
 { 
  
 /** 
 * This thread-safe implementation (excluding the main app below) is intended for production use. 
 * It provides a background refresh logic that shouldn't overload IAM service in case. the 
 * application has many many connections (connection storms can result in IAM throttles 
 * otherwise). 

*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider , Runnable , Closeable { private static final Logger logger = Logger . getLogger ( RedisClusterCredentialsProvider . class . getName ()); private final IamCredentialsClient iamClient ; private final ScheduledExecutorService service ; private final String accountName ; private final Duration refreshDuration ; private final Duration lifetime ; private volatile RedisCredentials credentials ; private volatile Instant lastRefreshInstant ; private volatile Exception lastException ; // AccountName: // "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; // RefreshDuration: Duration.ofSeconds(300); // Lifetime: Duration.ofSeconds(3600); public RedisClusterCredentialsProvider ( String accountName , Duration refreshDuration , Duration lifetime ) throws Exception { this . iamClient = IamCredentialsClient . create (); this . service = Executors . newSingleThreadScheduledExecutor (); this . accountName = accountName ; this . refreshDuration = refreshDuration ; this . lifetime = lifetime ; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow (); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service . scheduleWithFixedDelay ( this , 10 , 10 , TimeUnit . SECONDS ); } public RedisCredentials get () { if ( hasTokenExpired ()) { throw new RuntimeException ( "Background IAM token refresh failed" , lastException ); } return this . credentials ; } private boolean hasTokenExpired () { if ( this . lastRefreshInstant == null || this . lifetime == null ) { return true ; } return Instant . now (). isAfter ( this . lastRefreshInstant . plus ( this . lifetime )); } // To be invoked by customer app on shutdown @Override public void close () { service . shutdown (); iamClient . close (); } @Override public void run () { try { // fetch token if it is time to refresh if ( this . lastRefreshInstant != null && this . refreshDuration != null && Instant . now (). isBefore ( this . lastRefreshInstant . plus ( this . refreshDuration ))) { // nothing to do return ; } refreshTokenNow (); } catch ( Exception e ) { // suppress all errors as we cannot allow the task to die // log for visibility logger . log ( Level . parse ( "SEVERE" ), "Background IAM token refresh failed" , e ); } } private void refreshTokenNow () { try { logger . info ( "Refreshing IAM token" ); List delegates = new ArrayList <> (); com . google . protobuf . Duration lifetimeProto = com . google . protobuf . Duration . newBuilder () . setSeconds ( lifetime . getSeconds ()) . setNanos ( lifetime . getNano ()) . build (); GenerateAccessTokenResponse response = iamClient . generateAccessToken ( this . accountName , delegates , Collections . singletonList ( "https://www.googleapis.com/auth/cloud-platform" ), lifetimeProto ); // got a successful token refresh this . credentials = new DefaultRedisCredentials ( "default" , response . getAccessToken ()); this . lastRefreshInstant = Instant . now (); // clear the last saved exception this . lastException = null ; logger . info ( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this . refreshDuration + "], accountName [" + this . accountName + "] and lifetime [" + this . lifetime + "]" ); } catch ( Exception e ) { // Save last exception for inline feedback this . lastException = e ; // Bubble up for direct feedback throw e ; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main ( String [] args ) throws Exception { String discoveryEndpointIp = " CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS " ; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER ; GenericObjectPoolConfig config = new GenericObjectPoolConfig (); config . setTestWhileIdle ( true ); int timeout = 5000 ; int maxAttempts = 5 ; HostAndPort discovery = new HostAndPort ( discoveryEndpointIp , discoveryEndpointPort ); RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider ( "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com" , Duration . ofSeconds ( 300 ), Duration . ofSeconds ( 3600 )); // Create JedisCluster cluster InputStream is = new FileInputStream ( "server-ca.pem" ); // You could get a resource as a stream instead. CertificateFactory cf = CertificateFactory . getInstance ( "X.509" ); X509Certificate caCert = ( X509Certificate ) cf . generateCertificate ( is ); TrustManagerFactory tmf = TrustManagerFactory . getInstance ( TrustManagerFactory . getDefaultAlgorithm ()); KeyStore ks = KeyStore . getInstance ( KeyStore . getDefaultType ()); ks . load ( null ); // You don't need the KeyStore cluster to come from a file. ks . setCertificateEntry ( "caCert" , caCert ); tmf . init ( ks ); SSLContext sslContext = SSLContext . getInstance ( "TLS" ); sslContext . init ( null , tmf . getTrustManagers (), null ); JedisCluster jedisCluster = new JedisCluster ( discovery , DefaultJedisClientConfig . builder () . connectionTimeoutMillis ( timeout ) . socketTimeoutMillis ( timeout ) . credentialsProvider ( credentialsProvider ) . ssl ( true ) . sslSocketFactory ( sslContext . getSocketFactory ()) . build (), maxAttempts , config ); // Perform operations on the cluster jedisCluster . set ( "myKey" , "Hello, Redis Cluster!" ); String value = jedisCluster . get ( "myKey" ); System . out . println ( "Value for myKey: " + value ); int count = 0 ; for ( int i = 0 ; i < 1000 ; i ++ ) { String k = "jediskey" + String . valueOf ( i ); String v = "jedisvalue" + String . valueOf ( i ); jedisCluster . set ( k , v ); String got = jedisCluster . get ( k ); if ( got . equals ( v )) { count ++ ; } else { System . out . println ( "unexpected value" ); } } System . out . println ( "Successfully got " + String . valueOf ( count ) + " keys" ); // Disconnect from the cluster jedisCluster . close (); // Cleanup the resources used by the provider (( Closeable ) credentialsProvider ). close (); } }

Go

We recommend using Go , versions 1.24.5 and later.

 package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 "crypto/tls" 
  
 "crypto/x509" 
  
 "flag" 
  
 "fmt" 
  
 "io/ioutil" 
  
 "log" 
  
 "sync" 
  
 "time" 
  
 credentials 
  
 "google.golang.org/genproto/googleapis/iam/credentials/v1" 
  
 "github.com/golang/protobuf/ptypes" 
  
 "github.com/redis/go-redis/v9" 
  
 "google.golang.org/api/option" 
  
 gtransport 
  
 "google.golang.org/api/transport/grpc" 
 ) 
 var 
  
 ( 
  
 svcAccount 
  
 = 
  
 flag 
 . 
 String 
 ( 
 "a" 
 , 
  
 "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com" 
 , 
  
 "service account email" 
 ) 
  
 lifetime 
  
 = 
  
 flag 
 . 
 Duration 
 ( 
 "d" 
 , 
  
 time 
 . 
 Hour 
 , 
  
 "lifetime of token" 
 ) 
  
 refreshDuration 
  
 = 
  
 flag 
 . 
 Duration 
 ( 
 "r" 
 , 
  
 5 
 * 
 time 
 . 
 Minute 
 , 
  
 "token refresh duration" 
 ) 
  
 checkTokenExpiryInterval 
  
 = 
  
 flag 
 . 
 Duration 
 ( 
 "e" 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 , 
  
 "check token expiry interval" 
 ) 
  
 lastRefreshInstant 
  
 = 
  
 time 
 . 
 Time 
 {} 
  
 errLastSeen 
  
 = 
  
 error 
 ( 
 nil 
 ) 
  
 token 
  
 = 
  
 "" 
  
 mu 
  
 = 
  
 sync 
 . 
 RWMutex 
 {} 
 ) 
 func 
  
 retrieveToken 
 () 
  
 ( 
 string 
 , 
  
 error 
 ) 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 conn 
 , 
  
 err 
  
 := 
  
 gtransport 
 . 
 Dial 
 ( 
 ctx 
 , 
  
 option 
 . 
 WithEndpoint 
 ( 
 "iamcredentials.googleapis.com:443" 
 ), 
  
 option 
 . 
 WithScopes 
 ( 
 "https://www.googleapis.com/auth/cloud-platform" 
 )) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Printf 
 ( 
 "Failed to dial API, error: %v" 
 , 
  
 err 
 ) 
  
 return 
  
 token 
 , 
  
 err 
  
 } 
  
 client 
  
 := 
  
 credentials 
 . 
 NewIAMCredentialsClient 
 ( 
 conn 
 ) 
  
 req 
  
 := 
  
 credentials 
 . 
 GenerateAccessTokenRequest 
 { 
  
 Name 
 : 
  
 * 
 svcAccount 
 , 
  
 Scope 
 : 
  
 [] 
 string 
 { 
 "https://www.googleapis.com/auth/cloud-platform" 
 }, 
  
 Lifetime 
 : 
  
 ptypes 
 . 
 DurationProto 
 ( 
 * 
 lifetime 
 ), 
  
 } 
  
 rsp 
 , 
  
 err 
  
 := 
  
 client 
 . 
 GenerateAccessToken 
 ( 
 ctx 
 , 
  
 & 
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Printf 
 "Failed to call GenerateAccessToken with request: %v, error: %v" 
 , 
  
 req 
 , 
  
 err 
 ) 
  
 return 
  
 token 
 , 
  
 err 
  
 } 
  
 return 
  
 rsp 
 . 
 AccessToken 
 , 
  
 nil 
 } 
 func 
  
 refreshTokenLoop 
 () 
  
 { 
  
 if 
  
 * 
 refreshDuration 
  
 > 
  
 * 
 lifetime 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 "Refresh should not happen after token is already expired." 
 ) 
  
 } 
  
 for 
  
 { 
  
 mu 
 . 
 RLock 
 () 
  
 lastRefreshTime 
  
 := 
  
 lastRefreshInstant 
  
 mu 
 . 
 RUnlock 
 () 
  
 if 
  
 time 
 . 
 Now 
 (). 
 After 
 ( 
 lastRefreshTime 
 . 
 Add 
 ( 
 * 
 refreshDuration 
 )) 
  
 { 
  
 var 
  
 err 
  
 error 
  
 retrievedToken 
 , 
  
 err 
  
 := 
  
 retrieveToken 
 () 
  
 mu 
 . 
 Lock 
 () 
  
 token 
  
 = 
  
 retrievedToken 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 errLastSeen 
  
 = 
  
 err 
  
 } 
  
 else 
  
 { 
  
 lastRefreshInstant 
  
 = 
  
 time 
 . 
 Now 
 () 
  
 } 
  
 mu 
 . 
 Unlock 
 () 
  
 } 
  
 time 
 . 
 Sleep 
 ( 
 * 
 checkTokenExpiryInterval 
 ) 
  
 } 
 } 
 func 
  
 retrieveTokenFunc 
 () 
  
 ( 
 string 
 , 
  
 string 
 ) 
  
 { 
  
 mu 
 . 
 RLock 
 () 
  
 defer 
  
 mu 
 . 
 RUnlock 
 () 
  
 if 
  
 time 
 . 
 Now 
 (). 
 After 
 ( 
 lastRefreshInstant 
 . 
 Add 
 ( 
 * 
 refreshDuration 
 )) 
  
 { 
  
 log 
 . 
 Printf 
 ( 
 "Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v" 
 , 
  
 lastRefreshInstant 
 , 
  
 * 
 refreshDuration 
 , 
  
 errLastSeen 
 ) 
  
 return 
  
 "" 
 , 
  
 "" 
  
 } 
  
 username 
  
 := 
  
 "default" 
  
 password 
  
 := 
  
 token 
  
 return 
  
 username 
 , 
  
 password 
 } 
 func 
  
 main 
 () 
  
 { 
  
 // Load CA cert 
  
 caFilePath 
  
 := 
  
  CA_FILE_PATH 
 
  
 clusterDicEpAddr 
  
 := 
  
  CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS_AND_PORT 
 
  
 caCert 
 , 
  
 err 
  
 := 
  
 ioutil 
 . 
 ReadFile 
 ( 
 caFilePath 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 err 
 ) 
  
 } 
  
 caCertPool 
  
 := 
  
 x509 
 . 
 NewCertPool 
 () 
  
 caCertPool 
 . 
 AppendCertsFromPEM 
 ( 
 caCert 
 ) 
  
 token 
 , 
  
 err 
  
 = 
  
 retrieveToken 
 () 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 "Cannot retrieve IAM token to authenticate to the cluster, error: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 lastRefreshInstant 
  
 = 
  
 time 
 . 
 Now 
 () 
  
 go 
  
 refreshTokenLoop 
 () 
  
 // Setup Redis Connection pool 
  
 client 
  
 := 
  
 redis 
 . 
 NewClusterClient 
 ( 
 & 
 redis 
 . 
 ClusterOptions 
 { 
  
 Addrs 
 : 
  
 [] 
 string 
 { 
 clusterDicEpAddr 
 }, 
  
 // PoolSize applies per cluster node and not for the whole cluster. 
  
 PoolSize 
 : 
  
 10 
 , 
  
 ConnMaxIdleTime 
 : 
  
 60 
  
 * 
  
 time 
 . 
 Second 
 , 
  
 MinIdleConns 
 : 
  
 1 
 , 
  
 CredentialsProvider 
 : 
  
 retrieveTokenFunc 
 , 
  
 TLSConfig 
 : 
  
 & 
 tls 
 . 
 Config 
 { 
  
 RootCAs 
 : 
  
 caCertPool 
 , 
  
 }, 
  
 }) 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 err 
  
 = 
  
 client 
 . 
 Set 
 ( 
 ctx 
 , 
  
 "key" 
 , 
  
 "value" 
 , 
  
 0 
 ). 
 Err 
 () 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 err 
 ) 
  
 } 
  
 val 
 , 
  
 err 
  
 := 
  
 client 
 . 
 Get 
 ( 
 ctx 
 , 
  
 "key" 
 ). 
 Result 
 () 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 log 
 . 
 Fatal 
 ( 
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Got the value for key: key, which is %s \n" 
 , 
  
 val 
 ) 
 } 
Design a Mobile Site
View Site in Mobile | Classic
Share by: