Use RunInference for Generative AI

This notebook shows how to use the Apache Beam RunInference transform for generative AI tasks. It uses a large language model (LLM) from the Hugging Face Model Hub .

This notebook demonstrates the following steps:

  • Load and save a model from the Hugging Face Model Hub.
  • Use the PyTorch model handler for RunInference.

For more information about using RunInference, see Get started with AI/ML pipelines in the Apache Beam documentation.

Install the Apache Beam SDK and dependencies

Use the following code to install the Apache Beam Python SDK, PyTorch, and Transformers.

 pip  
install  
apache_beam [ 
gcp ]== 
 2 
.48.0 
 pip  
install  
torch 
 pip  
install  
transformers 

Use the following code to import dependencies

  import 
  
 os 
 import 
  
 apache_beam 
  
 as 
  
 beam 
 from 
  
 apache_beam.options.pipeline_options 
  
 import 
 PipelineOptions 
 from 
  
 apache_beam.ml.inference.base 
  
 import 
 PredictionResult 
 from 
  
 apache_beam.ml.inference.base 
  
 import 
 RunInference 
 from 
  
 apache_beam.ml.inference.pytorch_inference 
  
 import 
 make_tensor_model_fn 
 from 
  
 apache_beam.ml.inference.pytorch_inference 
  
 import 
 PytorchModelHandlerTensor 
 import 
  
 torch 
 from 
  
 transformers 
  
 import 
 AutoConfig 
 from 
  
 transformers 
  
 import 
 AutoModelForSeq2SeqLM 
 from 
  
 transformers 
  
 import 
 AutoTokenizer 
 from 
  
 transformers.tokenization_utils 
  
 import 
 PreTrainedTokenizer 
 MAX_RESPONSE_TOKENS 
 = 
 256 
 model_name 
 = 
 "google/flan-t5-small" 
 state_dict_path 
 = 
 "saved_model" 
 

Download and save the model

This notebook uses the auto classes from Hugging Face to instantly load the model in memory. Later, the model is saved to the path defined previously.

  model 
 = 
 AutoModelForSeq2SeqLM 
 . 
 from_pretrained 
 ( 
 model_name 
 , 
 torch_dtype 
 = 
 torch 
 . 
 bfloat16 
 ) 
 directory 
 = 
 os 
 . 
 path 
 . 
 dirname 
 ( 
 state_dict_path 
 ) 
 torch 
 . 
 save 
 ( 
 model 
 . 
 state_dict 
 (), 
 state_dict_path 
 ) 
 

Define utility functions

The input and output for the google/flan-t5-small model are token tensors. These utility functions are used for the conversion of text to token tensors and then back to text.

  def 
  
 to_tensors 
 ( 
 input_text 
 : 
 str 
 , 
 tokenizer 
 ) 
 - 
> torch 
 . 
 Tensor 
 : 
  
 """Encodes input text into token tensors. 
 Args: 
 input_text: Input text for the LLM model. 
 tokenizer: Tokenizer for the LLM model. 
 Returns: Tokenized input tokens. 
 """ 
 return 
 tokenizer 
 ( 
 input_text 
 , 
 return_tensors 
 = 
 "pt" 
 ) 
 . 
 input_ids 
 [ 
 0 
 ] 
 def 
  
 from_tensors 
 ( 
 result 
 : 
 PredictionResult 
 , 
 tokenizer 
 ) 
 - 
> str 
 : 
  
 """Decodes output token tensors into text. 
 Args: 
 result: Prediction results from the RunInference transform. 
 tokenizer: Tokenizer for the LLM model. 
 Returns: The model's response as text. 
 """ 
 output_tokens 
 = 
 result 
 . 
 inference 
 return 
 tokenizer 
 . 
 decode 
 ( 
 output_tokens 
 , 
 skip_special_tokens 
 = 
 True 
 ) 
 
  # Load the tokenizer. 
 tokenizer 
 = 
 AutoTokenizer 
 . 
 from_pretrained 
 ( 
 model_name 
 ) 
 # Create an instance of the PyTorch model handler. 
 model_handler 
 = 
 PytorchModelHandlerTensor 
 ( 
 state_dict_path 
 = 
 state_dict_path 
 , 
 model_class 
 = 
 AutoModelForSeq2SeqLM 
 . 
 from_config 
 , 
 model_params 
 = 
 { 
 "config" 
 : 
 AutoConfig 
 . 
 from_pretrained 
 ( 
 model_name 
 )}, 
 inference_fn 
 = 
 make_tensor_model_fn 
 ( 
 "generate" 
 ), 
 ) 
 

Run the Pipeline

  example 
 = 
 [ 
 "translate English to Spanish: We are in New York City." 
 ] 
 pipeline 
 = 
 beam 
 . 
 Pipeline 
 ( 
 options 
 = 
 PipelineOptions 
 ( 
 save_main_session 
 = 
 True 
 , 
 pickle_library 
 = 
 "cloudpickle" 
 )) 
 with 
 pipeline 
 as 
 p 
 : 
 _ 
 = 
 ( 
 p 
 | 
 "Create Examples" 
>> beam 
 . 
 Create 
 ( 
 example 
 ) 
 | 
 "To tensors" 
>> beam 
 . 
 Map 
 ( 
 to_tensors 
 , 
 tokenizer 
 ) 
 | 
 "RunInference" 
>> RunInference 
 ( 
 model_handler 
 , 
 inference_args 
 = 
 { 
 "max_new_tokens" 
 : 
 MAX_RESPONSE_TOKENS 
 }, 
 ) 
 | 
 "From tensors" 
>> beam 
 . 
 Map 
 ( 
 from_tensors 
 , 
 tokenizer 
 ) 
 | 
 "Print" 
>> beam 
 . 
 Map 
 ( 
 print 
 ) 
 ) 
 
Estamos en Nueva York City.
Create a Mobile Website
View Site in Mobile | Classic
Share by: