Bring your own ML model to Beam RunInference

This notebook demonstrates how to run inference on your custom framework using the ModelHandler class.

Named-entity recognition (NER) is one of the most common tasks for natural language processing (NLP). NLP locates named entities in unstructured text and classifies the entities using pre-defined labels, such as person name, organization, date, and so on.

This example illustrates how to use the popular spaCy package to load a machine learning (ML) model and perform inference in an Apache Beam pipeline using the RunInference PTransform . For more information about the RunInference API, see About Beam ML in the Apache Beam documentation.

Install package dependencies

The RunInference library is available in Apache Beam versions 2.40 and later.

For this example, you need to install spaCy and pandas . A small NER model, en_core_web_sm , is also installed, but you can use any valid spaCy model.

  # Uncomment the following lines to install the required packages. 
 # %pip install spacy pandas 
 # %pip install "apache-beam[gcp, dataframe, interactive]" 
 # !python -m spacy download en_core_web_sm 
 

Learn about spaCy

To learn more about spaCy , create a spaCy language object in memory using spaCy 's trained models. You can install these models as Python packages. For more information, see spaCy's Models and Languages documentation.

  import 
  
 spacy 
 nlp 
 = 
 spacy 
 . 
 load 
 ( 
 "en_core_web_sm" 
 ) 
 
  # Add text strings. 
 text_strings 
 = 
 [ 
 "The New York Times is an American daily newspaper based in New York City with a worldwide readership." 
 , 
 "It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company." 
 ] 
 
  # Check which entities spaCy can recognize. 
 doc 
 = 
 nlp 
 ( 
 text_strings 
 [ 
 0 
 ]) 
 
  for 
 ent 
 in 
 doc 
 . 
 ents 
 : 
 print 
 ( 
 ent 
 . 
 text 
 , 
 ent 
 . 
 start_char 
 , 
 ent 
 . 
 end_char 
 , 
 ent 
 . 
 label_ 
 ) 
 
The New York Times 0 18 ORG
American 25 33 NORP
daily 34 39 DATE
New York City 59 72 GPE
  # Visualize the results. 
 from 
  
 spacy 
  
 import 
 displacy 
 displacy 
 . 
 render 
 ( 
 doc 
 , 
 style 
 = 
 "ent" 
 ) 
 
  # Visualize another example. 
 displacy 
 . 
 render 
 ( 
 nlp 
 ( 
 text_strings 
 [ 
 1 
 ]), 
 style 
 = 
 "ent" 
 ) 
 

Create a model handler

This section demonstrates how to create your own ModelHandler so that you can use spaCy for inference.

  import 
  
 apache_beam 
  
 as 
  
 beam 
 from 
  
 apache_beam.options.pipeline_options 
  
 import 
 PipelineOptions 
 import 
  
 warnings 
 warnings 
 . 
 filterwarnings 
 ( 
 "ignore" 
 ) 
 pipeline 
 = 
 beam 
 . 
 Pipeline 
 () 
 # Print the results for verification. 
 with 
 pipeline 
 as 
 p 
 : 
 ( 
 p 
 | 
 "CreateSentences" 
>> beam 
 . 
 Create 
 ( 
 text_strings 
 ) 
 | 
 beam 
 . 
 Map 
 ( 
 print 
 ) 
 ) 
 
The New York Times is an American daily newspaper based in New York City with a worldwide readership.
It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.
  # Define `SpacyModelHandler` to load the model and perform the inference. 
 from 
  
 apache_beam.ml.inference.base 
  
 import 
 RunInference 
 from 
  
 apache_beam.ml.inference.base 
  
 import 
 ModelHandler 
 from 
  
 apache_beam.ml.inference.base 
  
 import 
 PredictionResult 
 from 
  
 spacy 
  
 import 
 Language 
 from 
  
 typing 
  
 import 
 Any 
 from 
  
 typing 
  
 import 
 Dict 
 from 
  
 typing 
  
 import 
 Iterable 
 from 
  
 typing 
  
 import 
 Optional 
 from 
  
 typing 
  
 import 
 Sequence 
 class 
  
 SpacyModelHandler 
 ( 
 ModelHandler 
 [ 
 str 
 , 
 PredictionResult 
 , 
 Language 
 ]): 
 def 
  
 __init__ 
 ( 
 self 
 , 
 model_name 
 : 
 str 
 = 
 "en_core_web_sm" 
 , 
 ): 
  
 """ Implementation of the ModelHandler interface for spaCy using text as input. 
 Example Usage:: 
 pcoll | RunInference(SpacyModelHandler()) 
 Args: 
 model_name: The spaCy model name. Default is en_core_web_sm. 
 """ 
 self 
 . 
 _model_name 
 = 
 model_name 
 self 
 . 
 _env_vars 
 = 
 {} 
 def 
  
 load_model 
 ( 
 self 
 ) 
 - 
> Language 
 : 
  
 """Loads and initializes a model for processing.""" 
 return 
 spacy 
 . 
 load 
 ( 
 self 
 . 
 _model_name 
 ) 
 def 
  
 run_inference 
 ( 
 self 
 , 
 batch 
 : 
 Sequence 
 [ 
 str 
 ], 
 model 
 : 
 Language 
 , 
 inference_args 
 : 
 Optional 
 [ 
 Dict 
 [ 
 str 
 , 
 Any 
 ]] 
 = 
 None 
 ) 
 - 
> Iterable 
 [ 
 PredictionResult 
 ]: 
  
 """Runs inferences on a batch of text strings. 
 Args: 
 batch: A sequence of examples as text strings. 
 model: A spaCy language model 
 inference_args: Any additional arguments for an inference. 
 Returns: 
 An Iterable of type PredictionResult. 
 """ 
 # Loop each text string, and use a tuple to store the inference results. 
 predictions 
 = 
 [] 
 for 
 one_text 
 in 
 batch 
 : 
 doc 
 = 
 model 
 ( 
 one_text 
 ) 
 predictions 
 . 
 append 
 ( 
 [( 
 ent 
 . 
 text 
 , 
 ent 
 . 
 start_char 
 , 
 ent 
 . 
 end_char 
 , 
 ent 
 . 
 label_ 
 ) 
 for 
 ent 
 in 
 doc 
 . 
 ents 
 ]) 
 return 
 [ 
 PredictionResult 
 ( 
 x 
 , 
 y 
 ) 
 for 
 x 
 , 
 y 
 in 
 zip 
 ( 
 batch 
 , 
 predictions 
 )] 
 
  # Verify that the inference results are correct. 
 with 
 pipeline 
 as 
 p 
 : 
 ( 
 p 
 | 
 "CreateSentences" 
>> beam 
 . 
 Create 
 ( 
 text_strings 
 ) 
 | 
 "RunInferenceSpacy" 
>> RunInference 
 ( 
 SpacyModelHandler 
 ( 
 "en_core_web_sm" 
 )) 
 | 
 beam 
 . 
 Map 
 ( 
 print 
 ) 
 ) 
 
The New York Times is an American daily newspaper based in New York City with a worldwide readership.
It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.
PredictionResult(example='The New York Times is an American daily newspaper based in New York City with a worldwide readership.', inference=[('The New York Times', 0, 18, 'ORG'), ('American', 25, 33, 'NORP'), ('daily', 34, 39, 'DATE'), ('New York City', 59, 72, 'GPE')])
PredictionResult(example='It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.', inference=[('1851', 18, 22, 'DATE'), ('Henry Jarvis', 26, 38, 'PERSON'), ('Raymond', 39, 46, 'PERSON'), ('George Jones', 51, 63, 'PERSON'), ('Raymond, Jones & Company', 96, 120, 'ORG')])

Use KeyedModelHandler to handle keyed data

If you have keyed data, use KeyedModelHandler .

  # You can use these text strings with keys to distinguish examples. 
 text_strings_with_keys 
 = 
 [ 
 ( 
 "example_0" 
 , 
 "The New York Times is an American daily newspaper based in New York City with a worldwide readership." 
 ), 
 ( 
 "example_1" 
 , 
 "It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company." 
 ) 
 ] 
 
  from 
  
 apache_beam.runners.interactive.interactive_runner 
  
 import 
 InteractiveRunner 
 from 
  
 apache_beam.ml.inference.base 
  
 import 
 KeyedModelHandler 
 from 
  
 apache_beam.dataframe.convert 
  
 import 
 to_dataframe 
 pipeline 
 = 
 beam 
 . 
 Pipeline 
 ( 
 InteractiveRunner 
 ()) 
 keyed_spacy_model_handler 
 = 
 KeyedModelHandler 
 ( 
 SpacyModelHandler 
 ( 
 "en_core_web_sm" 
 )) 
 # Verify that the inference results are correct. 
 with 
 pipeline 
 as 
 p 
 : 
 results 
 = 
 ( 
 p 
 | 
 "CreateSentences" 
>> beam 
 . 
 Create 
 ( 
 text_strings_with_keys 
 ) 
 | 
 "RunInferenceSpacy" 
>> RunInference 
 ( 
 keyed_spacy_model_handler 
 ) 
 # Generate a schema suitable for conversion to a dataframe using Map to Row objects. 
 | 
 'ToRows' 
>> beam 
 . 
 Map 
 ( 
 lambda 
 row 
 : 
 beam 
 . 
 Row 
 ( 
 key 
 = 
 row 
 [ 
 0 
 ], 
 text 
 = 
 row 
 [ 
 1 
 ][ 
 0 
 ], 
 predictions 
 = 
 row 
 [ 
 1 
 ][ 
 1 
 ])) 
 ) 
 
  # Convert the results to a pandas dataframe. 
 import 
  
 apache_beam.runners.interactive.interactive_beam 
  
 as 
  
 ib 
 beam_df 
 = 
 to_dataframe 
 ( 
 results 
 ) 
 df 
 = 
 ib 
 . 
 collect 
 ( 
 beam_df 
 ) 
 
  df 
 
Design a Mobile Site
View Site in Mobile | Classic
Share by: