Send feedback
Stream from Pub/Sub to BigQuery Stay organized with collections
Save and categorize content based on your preferences.
This tutorial uses the Pub/Sub Subscription to BigQuery
template
to create and run a Dataflow template
job using the Google Cloud console or Google Cloud CLI. The tutorial
walks you through a streaming pipeline example that reads JSON-encoded
messages from Pub/Sub
and writes them to a BigQuery
table.
Streaming analytics and data integration
pipelines use Pub/Sub to ingest and distribute data.
Pub/Sub enables you to create systems of event producers and
consumers, called publishers and subscribers . Publishers send events to
the Pub/Sub service asynchronously, and Pub/Sub delivers the events to all
services that need to react to them.
Dataflow is a fully-managed service for transforming and
enriching data in stream (real-time) and batch modes. It provides a simplified pipeline
development environment that uses the Apache Beam SDK to transform incoming data and
then output the transformed data.
The benefit of this workflow is that you can use UDFs to transform the message
data before it is written to BigQuery.
Before running a Dataflow pipeline for this scenario, consider
whether a Pub/Sub
BigQuery subscription
with a UDF
meets your requirements.
Objectives
Create a Pub/Sub topic.
Create a BigQuery dataset with a table and schema.
Use a Google-provided streaming template to stream data from your
Pub/Sub subscription to BigQuery by using Dataflow.
Costs
In this document, you use the following billable components of Google Cloud:
Dataflow
Pub/Sub
Cloud Storage
BigQuery
To generate a cost estimate based on your projected usage,
use the pricing calculator
.
New Google Cloud users might be eligible for a free trial
.
When you finish the tasks that are described in this document, you can avoid
continued billing by deleting the resources that you created. For more information, see Clean up
.
Before you begin
This section shows you how to select a project, enable APIs, and grant the
appropriate roles to your user account and to the worker service account
.
Console
Sign in to your Google Cloud account. If you're new to
Google Cloud, create an account
to evaluate how our products perform in
real-world scenarios. New customers also get $300 in free credits to
run, test, and deploy workloads.
In the Google Cloud console, on the project selector page,
select or create a Google Cloud project.
Roles required to select or create a project
Select a project
: Selecting a project doesn't require a specific
IAM role—you can select any project that you've been
granted a role on.
Create a project
: To create a project, you need the Project Creator role
( roles/resourcemanager.projectCreator
), which contains the resourcemanager.projects.create
permission. Learn how to grant
roles
.
Note
: If you don't plan to keep the
resources that you create in this procedure, create a project instead of
selecting an existing project. After you finish these steps, you can
delete the project, removing all resources associated with the project.
Go to project selector
Verify that billing is enabled for your Google Cloud project
.
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role ( roles/serviceusage.serviceUsageAdmin
), which
contains the serviceusage.services.enable
permission. Learn how to grant
roles
.
Enable the APIs
In the Google Cloud console, on the project selector page,
select or create a Google Cloud project.
Roles required to select or create a project
Select a project
: Selecting a project doesn't require a specific
IAM role—you can select any project that you've been
granted a role on.
Create a project
: To create a project, you need the Project Creator role
( roles/resourcemanager.projectCreator
), which contains the resourcemanager.projects.create
permission. Learn how to grant
roles
.
Note
: If you don't plan to keep the
resources that you create in this procedure, create a project instead of
selecting an existing project. After you finish these steps, you can
delete the project, removing all resources associated with the project.
Go to project selector
Verify that billing is enabled for your Google Cloud project
.
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role ( roles/serviceusage.serviceUsageAdmin
), which
contains the serviceusage.services.enable
permission. Learn how to grant
roles
.
Enable the APIs
To complete the steps in this tutorial, your user account must have the Service Account User
role.
The Compute Engine default service account
must have the following roles: Dataflow Worker
, Dataflow Admin
,
Pub/Sub Editor, Storage Object Admin, and BigQuery Data Editor.
To add the required roles in the Google Cloud console:
In the Google Cloud console, go to the IAM
page.
Go to IAM
Select your project.
In the row containing your user account, click edit
Edit principal
,
and then click add
Add another role
.
In the drop-down list, select the role Service Account User
.
In the row containing the Compute Engine default service account, click edit
Edit principal
,
and then click add
Add another role
.
In the drop-down list, select the role Dataflow Worker
.
Repeat for the Dataflow Admin
, the Pub/Sub Editor
,
the Storage Object Admin
, and the BigQuery Data Editor
roles,
and then click Save
.
For more information about
granting roles, see Grant an IAM role by using the console
.
gcloud
Sign in to your Google Cloud account. If you're new to
Google Cloud, create an account
to evaluate how our products perform in
real-world scenarios. New customers also get $300 in free credits to
run, test, and deploy workloads.
Install
the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity
.
To initialize
the gcloud CLI, run the following command:
gcloud
init
Create or select a Google Cloud project
.
Roles required to select or create a project
Select a project
: Selecting a project doesn't require a specific
IAM role—you can select any project that you've been
granted a role on.
Create a project
: To create a project, you need the Project Creator role
( roles/resourcemanager.projectCreator
), which contains the resourcemanager.projects.create
permission. Learn how to grant
roles
.
Note
: If you don't plan to keep the
resources that you create in this procedure, create a project instead of
selecting an existing project. After you finish these steps, you can
delete the project, removing all resources associated with the project.
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace PROJECT_ID
with a name for the Google Cloud project you are creating.
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace PROJECT_ID
with your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project
.
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role ( roles/serviceusage.serviceUsageAdmin
), which contains the serviceusage.services.enable
permission. Learn how to grant
roles
.
gcloud
services
enable
compute.googleapis.com
dataflow.googleapis.com
logging.googleapis.com
bigquery.googleapis.com
pubsub.googleapis.com
storage.googleapis.com
cloudresourcemanager.googleapis.com
If you're using a local shell, then create local authentication credentials for your user
account:
gcloud
auth
application-default
login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider
(IdP), confirm that you have signed in to the gcloud CLI with your federated identity
.
Grant roles to your user account. Run the following command once for each of the following
IAM roles: roles/iam.serviceAccountUser
gcloud
projects
add-iam-policy-binding
PROJECT_ID
--member =
"user: USER_IDENTIFIER
"
--role =
ROLE
Replace the following:
PROJECT_ID
: Your project ID.
USER_IDENTIFIER
: The identifier for your user
account. For example, myemail@example.com
.
ROLE
: The IAM role that you grant to your user account.
Install
the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity
.
To initialize
the gcloud CLI, run the following command:
gcloud
init
Create or select a Google Cloud project
.
Roles required to select or create a project
Select a project
: Selecting a project doesn't require a specific
IAM role—you can select any project that you've been
granted a role on.
Create a project
: To create a project, you need the Project Creator role
( roles/resourcemanager.projectCreator
), which contains the resourcemanager.projects.create
permission. Learn how to grant
roles
.
Note
: If you don't plan to keep the
resources that you create in this procedure, create a project instead of
selecting an existing project. After you finish these steps, you can
delete the project, removing all resources associated with the project.
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace PROJECT_ID
with a name for the Google Cloud project you are creating.
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace PROJECT_ID
with your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project
.
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role ( roles/serviceusage.serviceUsageAdmin
), which contains the serviceusage.services.enable
permission. Learn how to grant
roles
.
gcloud
services
enable
compute.googleapis.com
dataflow.googleapis.com
logging.googleapis.com
bigquery.googleapis.com
pubsub.googleapis.com
storage.googleapis.com
cloudresourcemanager.googleapis.com
If you're using a local shell, then create local authentication credentials for your user
account:
gcloud
auth
application-default
login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider
(IdP), confirm that you have signed in to the gcloud CLI with your federated identity
.
Grant roles to your user account. Run the following command once for each of the following
IAM roles: roles/iam.serviceAccountUser
gcloud
projects
add-iam-policy-binding
PROJECT_ID
--member =
"user: USER_IDENTIFIER
"
--role =
ROLE
Replace the following:
PROJECT_ID
: Your project ID.
USER_IDENTIFIER
: The identifier for your user
account. For example, myemail@example.com
.
ROLE
: The IAM role that you grant to your user account.
Grant roles to your Compute Engine default service account. Run the
following command once for each of the following IAM roles:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud
projects
add-iam-policy-binding
PROJECT_ID
--member =
"serviceAccount: PROJECT_NUMBER
-compute@developer.gserviceaccount.com"
--role =
SERVICE_ACCOUNT_ROLE
Replace the following:
PROJECT_ID
: your project ID.
PROJECT_NUMBER
: your project number.
To find your project number, use the gcloud projects describe
command
.
SERVICE_ACCOUNT_ROLE
: each individual role.
Create a Cloud Storage bucket
Begin by creating a Cloud Storage bucket using the Google Cloud console or
Google Cloud CLI. The Dataflow pipeline uses this bucket as a
temporary storage location.
Console
In the Google Cloud console, go to the Cloud Storage Buckets page.
Go to Buckets
Click Create .
On the Create a bucket page, for Name your bucket , enter a name
that meets the bucket naming requirements
.
Cloud Storage bucket names must be globally unique.
Don't select the other options.
Click Create .
Create a Pub/Sub topic and subscription
Create a Pub/Sub topic and then create a subscription to that topic.
Console
To create a topic, complete the following steps.
In the Google Cloud console, go to the Pub/Sub Topics page.
Go to Topics
Click Create topic .
In the Topic ID field, enter an ID for your topic. For
information about how to name a topic, see Guidelines to name a topic or a subscription
.
Retain the option Add a default subscription .
Don't select the other options.
Click Create .
In the topic details page, the name of the subscription that was created
is listed under Subscription ID . Note this value for later steps.
Create a BigQuery table
In this step, you create a BigQuery table with the following
schema:
If you don't already have a BigQuery dataset, first create one.
For more information, see Create datasets
. Then create a new
empty table:
Console
Go to the BigQuery page.
Go to BigQuery
In the Explorer pane, expand your project, and then select a dataset.
In the Dataset info section, click add_box
Create table .
In the Create table from list, select Empty table .
In the Table box, enter the name of the table.
In the Schema section, click Edit as text .
Paste in the following schema definition:
name:STRING,
customer_id:INTEGER
Click Create table .
gcloud
Use the bq mk
command.
bq
mk
--table
\
PROJECT_ID
: DATASET_NAME
. TABLE_NAME
\
name:STRING,customer_id:INTEGER
Replace the following:
PROJECT_ID
: your project ID
DATASET_NAME
: the name of the dataset
TABLE_NAME
: the name of the table to create
Run the pipeline
Run a streaming pipeline using the Google-provided
Pub/Sub Subscription to BigQuery template.
The pipeline gets incoming data from the Pub/Sub topic and outputs the data to
your BigQuery dataset.
Console
In the Google Cloud console, go to the Dataflow Jobs page.
Go to Jobs
Click Create job from template .
Enter a Job name for your Dataflow job.
For Regional endpoint , select a region for your Dataflow
job.
For Dataflow template , select the Pub/Sub Subscription to
BigQuery template.
For BigQuery output table , select Browse and select your
BigQuery table.
In the Pub/Sub input subscription list, select the
Pub/Sub subscription.
For Temporary location , enter the following:
gs:// BUCKET_NAME
/temp/
Replace BUCKET_NAME
with the name of your
Cloud Storage bucket. The temp
folder stores temporary files for
the Dataflow jobs.
Click Run job .
gcloud
Note: To use the Google Cloud CLI to run classic templates, you must have gcloud CLI
version 138.0.0 or later.
To run the template in your shell or terminal, use the gcloud dataflow jobs run
command.
gcloud
dataflow
jobs
run
JOB_NAME
\
--gcs-location
gs://dataflow-templates- DATAFLOW_REGION
/latest/PubSub_Subscription_to_BigQuery
\
--region
DATAFLOW_REGION
\
--staging-location
gs:// BUCKET_NAME
/temp
\
--parameters
\
inputSubscription
=
projects/ PROJECT_ID
/subscriptions/ SUBSCRIPTION_ID
, \
outputTableSpec
=
PROJECT_ID
: DATASET_NAME
. TABLE_NAME
Replace the following variables:
JOB_NAME
. a name for the job
DATAFLOW_REGION
: a region
for the job
PROJECT_ID
: the name of your Google Cloud project
SUBSCRIPTION_ID
: the name of your
Pub/Sub subscription
DATASET_NAME
: the name of your
BigQuery dataset
TABLE_NAME
: the name of your
BigQuery table
Publish messages to Pub/Sub
After the Dataflow job starts, you can publish messages to
Pub/Sub, and the pipeline writes them to BigQuery.
Console
In the Google Cloud console, go to the Pub/Sub >
Topics page.
Go to Topics
In the topic list, click the name of your topic.
Click Messages .
Click Publish messages .
For Number of messages , enter 10
.
For Message body , enter {"name": "Alice", "customer_id": 1}
.
Click Publish .
gcloud
To publish messages to your topic, use the gcloud pubsub topics publish
command.
for
run
in
{
1
..10 }
;
do
gcloud
pubsub
topics
publish
TOPIC_ID
--message =
'{"name": "Alice", "customer_id": 1}'
done
Replace TOPIC_ID
with the name of your topic.
View your results
View the data written to your BigQuery table. It can take up to a
minute for data to start appearing in your table.
Console
In the Google Cloud console, go to the BigQuery page. Go to the BigQuery page
In the query editor, run the following query:
SELECT
*
FROM
`
PROJECT_ID
.
DATASET_NAME
.
TABLE_NAME
`
LIMIT
1000
Replace the following variables:
PROJECT_ID
: the name of your Google Cloud
project
DATASET_NAME
: the name of your
BigQuery dataset
TABLE_NAME
: the name of your
BigQuery table
gcloud
Check the results in BigQuery by running the following query:
bq
query
--use_legacy_sql =
false
'SELECT * FROM ` PROJECT_ID
. DATASET_NAME
. TABLE_NAME
`'
Replace the following variables:
PROJECT_ID
: the name of your Google Cloud
project
DATASET_NAME
: the name of your
BigQuery dataset
TABLE_NAME
: the name of your
BigQuery table
This tutorial assumes that the Pub/Sub messages are formatted as
JSON, and that the BigQuery table schema matches the JSON data.
Optionally, you can provide a JavaScript user-defined function (UDF) that
transforms the data before it is written to BigQuery.
The UDF can perform additional processing, such as filtering, removing personal
identifiable information (PII), or enriching the data with additional fields.
For more information, see Create user-defined functions for Dataflow templates
.
Use a dead-letter table
While the job is running, the pipeline might fail to write individual messages
to BigQuery. Possible errors include:
Serialization errors, including badly-formatted JSON.
Type conversion errors, caused by a mismatch in the table schema and the JSON
data.
Extra fields in the JSON data that are not present in the table schema.
The pipeline writes these errors to a dead-letter table
in
BigQuery. By default, the pipeline automatically creates a
dead-letter table named TABLE_NAME
_error_records
,
where TABLE_NAME
is the name of the output table.
To use a different name, set the outputDeadletterTable
template parameter.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this
tutorial, either delete the project that contains the resources, or keep the project and
delete the individual resources.
Delete the project
The easiest way to eliminate billing is to delete the Google Cloud project that you created
for the tutorial.
Console
Caution : Deleting a project has the following effects: Everything in the project is deleted. If you used an existing project for
the tasks in this document, when you delete it, you also delete any other work you've
done in the project.
Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in
the future. To preserve the URLs that use the project ID, such as an appspot.com
URL, delete selected resources inside the project instead of deleting the whole project.
If you plan to explore multiple architectures, tutorials, or quickstarts, reusing projects
can help you avoid exceeding project quota limits.
In the Google Cloud console, go to the Manage resources
page. Go to Manage resources
In the project list, select the project that you
want to delete, and then click Delete
.
In the dialog, type the project ID, and then click Shut down
to delete the project.
gcloud
Caution : Deleting a project has the following effects: Everything in the project is deleted. If you used an existing project for
the tasks in this document, when you delete it, you also delete any other work you've
done in the project.
Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in
the future. To preserve the URLs that use the project ID, such as an appspot.com
URL, delete selected resources inside the project instead of deleting the whole project.
If you plan to explore multiple architectures, tutorials, or quickstarts, reusing projects
can help you avoid exceeding project quota limits.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Delete the individual resources
If you want to reuse the project later, you can keep the project but delete
the resources that you created during the tutorial.
Stop the Dataflow pipeline
Console
In the Google Cloud console, go to the Dataflow Jobs page.
Go to Jobs
Click the job that you want to stop.
To stop a job, the status of the job must be running .
In the job details page, click Stop .
Click Cancel .
To confirm your choice, click Stop Job .
gcloud
To cancel your Dataflow job, use the gcloud dataflow jobs
command.
gcloud
dataflow
jobs
list
\
--filter
'NAME= JOB_NAME
AND STATE=Running'
\
--format
'value(JOB_ID)'
\
--region
" DATAFLOW_REGION
"
\
|
xargs
gcloud
dataflow
jobs
cancel
--region
" DATAFLOW_REGION
"
Clean up Google Cloud project resources
Console
Delete the Pub/Sub topic and subscription.
Go to the Pub/Sub Topics page in the Google Cloud console.
Go to Topics
Select the topic that you created.
Click Delete to permanently delete the topic.
Go to the Pub/Sub Subscriptions page in the Google Cloud console.
Go to Subscriptions
Select the subscription created with your topic.
Click Delete to permanently delete the subscription.
Delete the BigQuery table and dataset.
In the Google Cloud console, go to the BigQuery
page.
Go to
BigQuery
In the Explorer
panel, expand your project.
Next to the dataset you want to delete, click more_vert
View actions
, and then
click delete
.
Delete the Cloud Storage bucket.
In the Google Cloud console, go to the Cloud Storage Buckets page.
Go to Buckets
Select the bucket that you want to delete, click delete
Delete , and then follow the
instructions.
gcloud
To delete the Pub/Sub subscription and topic, use the gcloud pubsub subscriptions delete
and the gcloud pubsub topics delete
commands.
gcloud
pubsub
subscriptions
delete
SUBSCRIPTION_ID
gcloud
pubsub
topics
delete
TOPIC_ID
To delete the BigQuery table, use the bq rm
command.
bq
rm
-f
-t
PROJECT_ID
:tutorial_dataset.tutorial
Delete the BigQuery dataset. The dataset alone does not incur any charges.
Caution:
The following command also deletes all tables in the dataset.
The tables and data cannot be recovered.
bq
rm
-r
-f
-d
PROJECT_ID
:tutorial_dataset
To delete the Cloud Storage bucket and its objects, use the gcloud storage rm
command
. The bucket
alone does not incur any charges.
gcloud
storage
rm
gs:// BUCKET_NAME
--recursive
Revoke credentials
Console
If you keep your project, revoke the roles that you granted to the Compute Engine default service account.
In the Google Cloud console, go to the IAM
page.
Go to IAM
Select a project, folder, or organization.
Find the row containing the principal whose access you want to revoke.
In that row, click edit
Edit principal .
Click the Delete delete
button for
each role you want to revoke, and then click Save .
gcloud
If you keep your project, revoke the roles that you granted to the
Compute Engine default service account. Run the following command one
time for each of the following IAM roles: roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud
projects
remove-iam-policy-binding
<var>PROJECT_ID</var>
\
--member =
serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com
\
--role =
<var>ROLE</var>
Optional: Revoke the authentication credentials that you created, and delete the local
credential file.
gcloud
auth
application-default
revoke
Optional: Revoke credentials from the gcloud CLI.
gcloud
auth
revoke
Send feedback
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License
, and code samples are licensed under the Apache 2.0 License
. For details, see the Google Developers Site Policies
. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-05-29 UTC.
Need to tell us more?
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2026-05-29 UTC."],[],[]]