This notebook shows how to use the Apache Beam RunInference
transform for TensorFlow
.
Apache Beam has built-in support for two TensorFlow model handlers: TFModelHandlerNumpy
and TFModelHandlerTensor
.
- Use
TFModelHandlerNumpy
to run inference on models that use anumpy
array as an input. - Use
TFModelHandlerTensor
to run inference on models that use atf.Tensor
as an input.
If your model uses tf.Example
as an input, see the Apache Beam RunInference with tfx-bsl
notebook.
There are three ways to load a TensorFlow model:
- Provide a path to the saved model.
- Provide a path to the saved weights of the model.
- Provide a URL for pretrained model on TensorFlow Hub. For an example workflow, see Apache Beam RunInference with TensorFlow and TensorFlow Hub .
This notebook demonstrates the following steps:
- Build a simple TensorFlow model.
- Set up example data.
- Run those examples with the built-in model handlers using one of the following methods, and then get a prediction inside an Apache Beam pipeline.:
- a saved model
- saved weights
For more information about using RunInference, see Get started with AI/ML pipelines in the Apache Beam documentation.
Before you begin
Set up your environment and download dependencies.
Install Apache Beam
To use RunInference with the built-in Tensorflow model handler, install Apache Beam version 2.46.0 or later.
!
pip
install
protobuf
--
quiet
!
pip
install
apache_beam
==
2.46.0
--
quiet
# To use the newly installed versions, restart the runtime.
exit
()
Authenticate with Google Cloud
This notebook relies on saving your model to Google Cloud. To use your Google Cloud account, authenticate this notebook.
from
google.colab
import
auth
auth
.
authenticate_user
()
Import dependencies and set up your bucket
Use the following code to import dependencies and to set up your Google Cloud Storage bucket.
Replace PROJECT_ID
and BUCKET_NAME
with the ID of your project and the name of your bucket.
import
argparse
from
typing
import
Dict
,
Text
,
Any
,
Tuple
,
List
import
numpy
from
google.protobuf
import
text_format
import
tensorflow
as
tf
from
tensorflow
import
keras
import
apache_beam
as
beam
from
apache_beam.ml.inference.base
import
RunInference
from
apache_beam.ml.inference.base
import
KeyedModelHandler
from
apache_beam.ml.inference.tensorflow_inference
import
TFModelHandlerNumpy
from
apache_beam.ml.inference.tensorflow_inference
import
TFModelHandlerTensor
from
apache_beam.options.pipeline_options
import
PipelineOptions
project
=
"PROJECT_ID"
# @param {type:'string'}
bucket
=
"BUCKET_NAME"
# @param {type:'string'}
save_model_dir_multiply
=
f
'gs://
{
bucket
}
/tf-inference/model/multiply_five/v1/'
save_weights_dir_multiply
=
f
'gs://
{
bucket
}
/tf-inference/weights/multiply_five/v1/'
Create and test a simple model
This step creates and tests a model that predicts the 5 times table.
Create the model
Create training data and build a linear regression model.
# Create training data that represents the 5 times multiplication table for the numbers 0 to 99.
# x is the data and y is the labels.
x
=
numpy
.
arange
(
0
,
100
)
# Examples
y
=
x
*
5
# Labels
# Use create_model to build a simple linear regression model.
# Note that the model has a shape of (1) for its input layer and expects a single int64 value.
def
create_model
():
input_layer
=
keras
.
layers
.
Input
(
shape
=
(
1
),
dtype
=
tf
.
float32
,
name
=
'x'
)
output_layer
=
keras
.
layers
.
Dense
(
1
)(
input_layer
)
model
=
keras
.
Model
(
input_layer
,
output_layer
)
model
.
compile
(
optimizer
=
tf
.
optimizers
.
Adam
(),
loss
=
'mean_absolute_error'
)
return
model
model
=
create_model
()
model
.
summary
()
Model: "model_1" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= x (InputLayer) [(None, 1)] 0 dense_1 (Dense) (None, 1) 2 ================================================================= Total params: 2 Trainable params: 2 Non-trainable params: 0 _________________________________________________________________
Test the model
This step tests the model that you created.
model
.
fit
(
x
,
y
,
epochs
=
500
,
verbose
=
0
)
test_examples
=
[
20
,
40
,
60
,
90
]
value_to_predict
=
numpy
.
array
(
test_examples
,
dtype
=
numpy
.
float32
)
predictions
=
model
.
predict
(
value_to_predict
)
print
(
'Test Examples '
+
str
(
test_examples
))
print
(
'Predictions '
+
str
(
predictions
))
1/1 [==============================] - 0s 38ms/step Test Examples [20, 40, 60, 90] Predictions [[21.896107] [41.795692] [61.69528 ] [91.544655]]
Save the model
This step shows how to save your model.
model
.
save
(
save_model_dir_multiply
)
Instead of saving the entire model, you can save the model weights for inference . You can use this method when you need the model for inference but don't need any compilation information or optimizer state. In addition, when using transfer learning applications, you can use this method to load the weights with new models.
With this approach, you need to pass the function to build the TensorFlow model to the TFModelHandler
class that you're using, either TFModelHandlerNumpy
or TFModelHandlerTensor
. You also need to pass model_type=ModelType.SAVED_WEIGHTS
to the class.
model_handler
=
TFModelHandlerNumpy
(
path_to_weights
,
model_type
=
ModelType
.
SAVED_WEIGHTS
,
create_model_fn
=
build_tensorflow_model
)
model
.
save_weights
(
save_weights_dir_multiply
)
Run the pipeline
Use the following code to run the pipeline by specifying path to the trained TensorFlow model.
class
FormatOutput
(
beam
.
DoFn
):
def
process
(
self
,
element
,
*
args
,
**
kwargs
):
yield
"example is
{example}
prediction is
{prediction}
"
.
format
(
example
=
element
.
example
,
prediction
=
element
.
inference
)
examples
=
numpy
.
array
([
20
,
40
,
60
,
90
],
dtype
=
numpy
.
float32
)
model_handler
=
TFModelHandlerNumpy
(
save_model_dir_multiply
)
with
beam
.
Pipeline
()
as
p
:
_
=
(
p
|
beam
.
Create
(
examples
)
|
RunInference
(
model_handler
)
|
beam
.
ParDo
(
FormatOutput
())
|
beam
.
Map
(
print
)
)
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. example is 20.0 prediction is [21.896107] example is 40.0 prediction is [41.795692] example is 60.0 prediction is [61.69528] example is 90.0 prediction is [91.544655]
Use the following code to run the pipeline with the saved weights of a TensorFlow model.
To load the model with saved weights, the TFModelHandlerNumpy
class requires a create_model
function that builds and returns a TensorFlow model that is compatible with the saved weights.
from
apache_beam.ml.inference.tensorflow_inference
import
ModelType
examples
=
numpy
.
array
([
20
,
40
,
60
,
90
],
dtype
=
numpy
.
float32
)
model_handler
=
TFModelHandlerNumpy
(
save_weights_dir_multiply
,
model_type
=
ModelType
.
SAVED_WEIGHTS
,
create_model_fn
=
create_model
)
with
beam
.
Pipeline
()
as
p
:
_
=
(
p
|
beam
.
Create
(
examples
)
|
RunInference
(
model_handler
)
|
beam
.
ParDo
(
FormatOutput
())
|
beam
.
Map
(
print
)
)
example is 20.0 prediction is [21.896107] example is 40.0 prediction is [41.795692] example is 60.0 prediction is [61.69528] example is 90.0 prediction is [91.544655]
Use a keyed model handler
To use a keyed model handler, use KeyedModelHandler
with TensorFlow by using TFModelHandlerNumpy
.
By default, the ModelHandler
does not expect a key.
- If you know that keys are associated with your examples, use
beam.KeyedModelHandler
to wrap the model handler. - If you don't know whether keys are associated with your examples, use
beam.MaybeKeyedModelHandler
.
class
FormatOutputKeyed
(
FormatOutput
):
# To simplify, inherit from FormatOutput.
def
process
(
self
,
tuple_in
:
Tuple
):
key
,
element
=
tuple_in
output
=
super
()
.
process
(
element
)
yield
"
{}
:
{}
"
.
format
(
key
,
[
op
for
op
in
output
])
examples
=
numpy
.
array
([(
1
,
20
),
(
2
,
40
),
(
3
,
60
),
(
4
,
90
)],
dtype
=
numpy
.
float32
)
keyed_model_handler
=
KeyedModelHandler
(
TFModelHandlerNumpy
(
save_model_dir_multiply
))
with
beam
.
Pipeline
()
as
p
:
_
=
(
p
|
'CreateExamples'
>> beam
.
Create
(
examples
)
|
RunInference
(
keyed_model_handler
)
|
beam
.
ParDo
(
FormatOutputKeyed
())
|
beam
.
Map
(
print
)
)
1.0 : ['example is 20.0 prediction is [51.815357]'] 2.0 : ['example is 40.0 prediction is [101.63492]'] 3.0 : ['example is 60.0 prediction is [151.45448]'] 4.0 : ['example is 90.0 prediction is [226.18384]']