This notebook demonstrates the use of the RunInference transform for scikit-learn , also called sklearn. Apache Beam RunInference has implementations of the ModelHandler class prebuilt for scikit-learn. For more information about using RunInference, see Get started with AI/ML pipelines in the Apache Beam documentation.
You can choose the appropriate model handler based on your input data type:
With RunInference, these model handlers manage batching, vectorization, and prediction optimization for your scikit-learn pipeline or model.
This notebook demonstrates the following common RunInference patterns:
- Generate predictions.
- Postprocess results after RunInference.
- Run inference with multiple models in the same pipeline.
The linear regression models used in these samples are trained on data that correspondes to the 5 and 10 times tables; that is, y = 5x
and y = 10x
respectively.
Before you begin
Complete the following setup steps:
- Install dependencies for Apache Beam.
- Authenticate with Google Cloud.
- Specify your project and bucket. You use the project and bucket to save and load models.
pip install google-api-core --quiet
pip install google-cloud-pubsub google-cloud-bigquery-storage --quiet
pip install apache-beam [ gcp,dataframe ] --quiet
About scikit-learn versions
scikit-learn
is a build-dependency of Apache Beam. If you need to install a different version of sklearn , use %pip install scikit-learn==<version>
from
google.colab
import
auth
auth
.
authenticate_user
()
import
pickle
from
sklearn
import
linear_model
from
typing
import
Tuple
import
numpy
as
np
import
apache_beam
as
beam
from
apache_beam.ml.inference.sklearn_inference
import
ModelFileType
from
apache_beam.ml.inference.sklearn_inference
import
SklearnModelHandlerNumpy
from
apache_beam.ml.inference.base
import
KeyedModelHandler
from
apache_beam.ml.inference.base
import
PredictionResult
from
apache_beam.ml.inference.base
import
RunInference
from
apache_beam.options.pipeline_options
import
PipelineOptions
# NOTE: If an error occurs, restart your runtime.
import
os
# Constants
project
=
"<PROJECT_ID>"
# @param {type:'string'}
bucket
=
"<BUCKET_NAME>"
# @param {type:'string'}
# To avoid warnings, set the project.
os
.
environ
[
'GOOGLE_CLOUD_PROJECT'
]
=
project
Create the data and the scikit-learn model
This section demonstrates the following steps:
- Create the data to train the scikit-learn linear regression model.
- Train the linear regression model.
- Save the scikit-learn model using
pickle
.
In this example, you create two models, one with the 5 times model and a second with the 10 times model.
# Input data to train the sklearn model for the 5 times table.
x
=
np
.
arange
(
0
,
100
,
dtype
=
np
.
float32
)
.
reshape
(
-
1
,
1
)
y
=
(
x
*
5
)
.
reshape
(
-
1
,
1
)
def
train_and_save_model
(
x
,
y
,
model_file_name
):
regression
=
linear_model
.
LinearRegression
()
regression
.
fit
(
x
,
y
)
with
open
(
model_file_name
,
'wb'
)
as
f
:
pickle
.
dump
(
regression
,
f
)
five_times_model_filename
=
'sklearn_5x_model.pkl'
train_and_save_model
(
x
,
y
,
five_times_model_filename
)
# Change y to be 10 times, and output a 10 times table.
ten_times_model_filename
=
'sklearn_10x_model.pkl'
train_and_save_model
(
x
,
y
,
ten_times_model_filename
)
y
=
(
x
*
10
)
.
reshape
(
-
1
,
1
)
train_and_save_model
(
x
,
y
,
'sklearn_10x_model.pkl'
)
Create a scikit-learn RunInference pipeline
This section demonstrates how to do the following:
- Define a scikit-learn model handler that accepts an
array_like
object as input. - Read the data from BigQuery.
- Use the scikit-learn trained model and the scikit-learn RunInference transform on unkeyed data.
%
pip
install
--
upgrade
google
-
cloud
-
bigquery
--
quiet
gcloud
config
set
project
$project
Updated property [core/project].
# Populated BigQuery table
from
google.cloud
import
bigquery
client
=
bigquery
.
Client
(
project
=
project
)
# Make sure the dataset_id is unique in your project.
dataset_id
=
'
{project}
.maths'
.
format
(
project
=
project
)
dataset
=
bigquery
.
Dataset
(
dataset_id
)
# Modify the location based on your project configuration.
dataset
.
location
=
'US'
dataset
=
client
.
create_dataset
(
dataset
,
exists_ok
=
True
)
# Table name in the BigQuery dataset.
table_name
=
'maths_problems_1'
query
=
"""
CREATE OR REPLACE TABLE
{project}
.maths.
{table}
( key STRING OPTIONS(description="A unique key for the maths problem"),
value FLOAT64 OPTIONS(description="Our maths problem" ) );
INSERT INTO maths.
{table}
VALUES
("first_example", 105.00),
("second_example", 108.00),
("third_example", 1000.00),
("fourth_example", 1013.00)
"""
.
format
(
project
=
project
,
table
=
table_name
)
create_job
=
client
.
query
(
query
)
create_job
.
result
()
<google.cloud.bigquery.table._EmptyRowIterator at 0x7f97abb4e850>
sklearn_model_handler
=
SklearnModelHandlerNumpy
(
model_uri
=
five_times_model_filename
)
pipeline_options
=
PipelineOptions
()
.
from_dictionary
(
{
'temp_location'
:
f
'gs://
{
bucket
}
/tmp'
})
# Define the BigQuery table specification.
table_name
=
'maths_problems_1'
table_spec
=
f
'
{
project
}
:maths.
{
table_name
}
'
with
beam
.
Pipeline
(
options
=
pipeline_options
)
as
p
:
(
p
|
"ReadFromBQ"
>> beam
.
io
.
ReadFromBigQuery
(
table
=
table_spec
)
|
"ExtractInputs"
>> beam
.
Map
(
lambda
x
:
[
x
[
'value'
]])
|
"RunInferenceSklearn"
>> RunInference
(
model_handler
=
sklearn_model_handler
)
|
beam
.
Map
(
print
)
)
PredictionResult(example=[1000.0], inference=array([5000.])) PredictionResult(example=[1013.0], inference=array([5065.])) PredictionResult(example=[108.0], inference=array([540.])) PredictionResult(example=[105.0], inference=array([525.]))
Use sklearn RunInference on keyed inputs
This section demonstrates how to do the following:
- Wrap the
SklearnModelHandlerNumpy
object aroundKeyedModelHandler
to handle keyed data. - Read the data from BigQuery.
- Use the sklearn trained model and the sklearn RunInference transform on a keyed data.
sklearn_model_handler
=
SklearnModelHandlerNumpy
(
model_uri
=
five_times_model_filename
)
keyed_sklearn_model_handler
=
KeyedModelHandler
(
sklearn_model_handler
)
pipeline_options
=
PipelineOptions
()
.
from_dictionary
(
{
'temp_location'
:
f
'gs://
{
bucket
}
/tmp'
})
with
beam
.
Pipeline
(
options
=
pipeline_options
)
as
p
:
(
p
|
"ReadFromBQ"
>> beam
.
io
.
ReadFromBigQuery
(
table
=
table_spec
)
|
"ExtractInputs"
>> beam
.
Map
(
lambda
x
:
(
x
[
'key'
],
[
x
[
'value'
]]))
|
"RunInferenceSklearn"
>> RunInference
(
model_handler
=
keyed_sklearn_model_handler
)
|
beam
.
Map
(
print
)
)
('third_example', PredictionResult(example=[1000.0], inference=array([5000.]))) ('fourth_example', PredictionResult(example=[1013.0], inference=array([5065.]))) ('second_example', PredictionResult(example=[108.0], inference=array([540.]))) ('first_example', PredictionResult(example=[105.0], inference=array([525.])))
Run multiple models
This code creates a pipeline that takes two RunInference transforms with different models and then combines the output.
from
typing
import
Tuple
def
format_output
(
run_inference_output
)
-
> str
:
"""Takes input from RunInference for scikit-learn and extracts the output."""
key
,
prediction_result
=
run_inference_output
example
=
prediction_result
.
example
[
0
]
prediction
=
prediction_result
.
inference
[
0
]
return
f
"key =
{
key
}
, example =
{
example
}
-> predictions
{
prediction
}
"
five_times_model_handler
=
KeyedModelHandler
(
SklearnModelHandlerNumpy
(
model_uri
=
five_times_model_filename
))
ten_times_model_handler
=
KeyedModelHandler
(
SklearnModelHandlerNumpy
(
model_uri
=
ten_times_model_filename
))
pipeline_options
=
PipelineOptions
()
.
from_dictionary
(
{
'temp_location'
:
f
'gs://
{
bucket
}
/tmp'
})
with
beam
.
Pipeline
(
options
=
pipeline_options
)
as
p
:
inputs
=
(
p
|
"ReadFromBQ"
>> beam
.
io
.
ReadFromBigQuery
(
table
=
table_spec
))
five_times
=
(
inputs
|
"Extract For 5"
>> beam
.
Map
(
lambda
x
:
(
'
{}
{}
'
.
format
(
x
[
'key'
],
'* 5'
),
[
x
[
'value'
]]))
|
"5 times"
>> RunInference
(
model_handler
=
five_times_model_handler
))
ten_times
=
(
inputs
|
"Extract For 10"
>> beam
.
Map
(
lambda
x
:
(
'
{}
{}
'
.
format
(
x
[
'key'
],
'* 10'
),
[
x
[
'value'
]]))
|
"10 times"
>> RunInference
(
model_handler
=
ten_times_model_handler
))
_
=
((
five_times
,
ten_times
)
|
"Flattened"
>> beam
.
Flatten
()
|
"format output"
>> beam
.
Map
(
format_output
)
|
"Print"
>> beam
.
Map
(
print
))
key = third_example * 10, example = 1000.0 -> predictions 10000.0 key = fourth_example * 10, example = 1013.0 -> predictions 10130.0 key = second_example * 10, example = 108.0 -> predictions 1080.0 key = first_example * 10, example = 105.0 -> predictions 1050.0 key = third_example * 5, example = 1000.0 -> predictions 5000.0 key = fourth_example * 5, example = 1013.0 -> predictions 5065.0 key = second_example * 5, example = 108.0 -> predictions 540.0 key = first_example * 5, example = 105.0 -> predictions 525.0