Workflows connector that defines the built-in function used to access BigQuery Data Transfer within a workflow.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
YAML
# This workflow creates a new dataset and a new table inside that dataset, which are required
# for the BigQuery Data Transfer Job to run. It creates a new TransferJob configuration and starts
# a manual run of the transfer (30 seconds after the config is created).
# The transferRun is a blocking LRO.
# All resources get deleted once the transfer run completes.
#
# On success, it returns "SUCCESS".
#
# Features included in this test:
# - BigQuery Data Transfer connector
# - Waiting for long-running transfer run to complete
#
# This workflow expects following items to be provided through input argument for execution:
# - projectID (string)
# - The user project ID.
# - datasetID (string)
# - The dataset name, expected to have an unique value to avoid the
# instance being referred by multiple tests.
# - tableID (string)
# - The table name, expected to have an unique value to avoid the
# instance being referred by multiple tests.
# - runConfigDisplayName (string)
# - The transfer run configuration display name.
#
# Expected successful output: "SUCCESS"
main
:
params
:
[
args
]
steps
:
-
init
:
assign
:
-
project_id
:
${args.projectID}
-
destination_dataset
:
${args.datasetID}
-
destination_table
:
${args.tableID}
-
run_config_display_name
:
${args.runConfigDisplayName}
-
run_config_data_source_id
:
"google_cloud_storage"
-
location
:
"us"
-
data_path_template
:
"gs://xxxxxx-bucket/xxxxx/xxxx"
-
create_dataset
:
call
:
googleapis.bigquery.v2.datasets.insert
args
:
projectId
:
${project_id}
body
:
datasetReference
:
datasetId
:
${destination_dataset}
projectId
:
${project_id}
-
create_table
:
call
:
googleapis.bigquery.v2.tables.insert
args
:
datasetId
:
${destination_dataset}
projectId
:
${project_id}
body
:
tableReference
:
datasetId
:
${destination_dataset}
projectId
:
${project_id}
tableId
:
${destination_table}
schema
:
fields
:
-
name
:
"column1"
type
:
"STRING"
-
name
:
"column2"
type
:
"STRING"
-
list_config
:
call
:
googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.list
args
:
parent
:
${"projects/" + project_id + "/locations/us"}
-
create_run_config
:
call
:
googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
args
:
parent
:
${"projects/" + project_id + "/locations/" + location}
body
:
displayName
:
${run_config_display_name}
schedule
:
"every
day
19:22"
scheduleOptions
:
disableAutoScheduling
:
true
destinationDatasetId
:
${destination_dataset}
dataSourceId
:
${run_config_data_source_id}
params
:
destination_table_name_template
:
${destination_table}
file_format
:
"CSV"
data_path_template
:
${data_path_template}
result
:
config
-
get_time_in_30s
:
assign
:
-
now_plus_30s
:
${time.format(sys.now() + 30)}
-
start_run
:
call
:
googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args
:
parent
:
${config.name}
body
:
requestedRunTime
:
${now_plus_30s}
result
:
runsResp
-
remove_run_config
:
call
:
googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
args
:
name
:
${config.name}
-
delete_table
:
call
:
googleapis.bigquery.v2.tables.delete
args
:
datasetId
:
${destination_dataset}
projectId
:
${project_id}
tableId
:
${destination_table}
-
delete_dataset
:
call
:
googleapis.bigquery.v2.datasets.delete
args
:
projectId
:
${project_id}
datasetId
:
${destination_dataset}
-
the_end
:
return
:
"SUCCESS"
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

