Sends a batch (asynchronous) processing request to a processor.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Java
For more information, see the Document AI Java API reference documentation .
To authenticate to Document AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
com.google.api.gax.longrunning. OperationFuture
;
import
com.google.api.gax.paging. Page
;
import
com.google.cloud.documentai.v1. BatchDocumentsInputConfig
;
import
com.google.cloud.documentai.v1. BatchProcessMetadata
;
import
com.google.cloud.documentai.v1. BatchProcessRequest
;
import
com.google.cloud.documentai.v1. BatchProcessResponse
;
import
com.google.cloud.documentai.v1. Document
;
import
com.google.cloud.documentai.v1. DocumentOutputConfig
;
import
com.google.cloud.documentai.v1. DocumentOutputConfig
. GcsOutputConfig
;
import
com.google.cloud.documentai.v1. DocumentProcessorServiceClient
;
import
com.google.cloud.documentai.v1. DocumentProcessorServiceSettings
;
import
com.google.cloud.documentai.v1. GcsDocument
;
import
com.google.cloud.documentai.v1. GcsDocuments
;
import
com.google.cloud.storage. Blob
;
import
com.google.cloud.storage. BlobId
;
import
com.google.cloud.storage. Bucket
;
import
com.google.cloud.storage. Storage
;
import
com.google.cloud.storage. StorageOptions
;
import
com.google.protobuf.util. JsonFormat
;
import
java.io.File
;
import
java.io.FileReader
;
import
java.io.IOException
;
import
java.util.List
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
public
class
BatchProcessDocument
{
public
static
void
batchProcessDocument
()
throws
IOException
,
InterruptedException
,
TimeoutException
,
ExecutionException
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
String
location
=
"your-project-location"
;
// Format is "us" or "eu".
String
processerId
=
"your-processor-id"
;
String
outputGcsBucketName
=
"your-gcs-bucket-name"
;
String
outputGcsPrefix
=
"PREFIX"
;
String
inputGcsUri
=
"gs://your-gcs-bucket/path/to/input/file.pdf"
;
batchProcessDocument
(
projectId
,
location
,
processerId
,
inputGcsUri
,
outputGcsBucketName
,
outputGcsPrefix
);
}
public
static
void
batchProcessDocument
(
String
projectId
,
String
location
,
String
processorId
,
String
gcsInputUri
,
String
gcsOutputBucketName
,
String
gcsOutputUriPrefix
)
throws
IOException
,
InterruptedException
,
TimeoutException
,
ExecutionException
{
// Initialize client that will be used to send requests. This client only needs
// to be created
// once, and can be reused for multiple requests. After completing all of your
// requests, call
// the "close" method on the client to safely clean up any remaining background
// resources.
String
endpoint
=
String
.
format
(
"%s-documentai.googleapis.com:443"
,
location
);
DocumentProcessorServiceSettings
settings
=
DocumentProcessorServiceSettings
.
newBuilder
().
setEndpoint
(
endpoint
).
build
();
try
(
DocumentProcessorServiceClient
client
=
DocumentProcessorServiceClient
.
create
(
settings
))
{
// The full resource name of the processor, e.g.:
// projects/project-id/locations/location/processor/processor-id
// You must create new processors in the Cloud Console first
String
name
=
String
.
format
(
"projects/%s/locations/%s/processors/%s"
,
projectId
,
location
,
processorId
);
GcsDocument
gcsDocument
=
GcsDocument
.
newBuilder
().
setGcsUri
(
gcsInputUri
).
setMimeType
(
"application/pdf"
).
build
();
GcsDocuments
gcsDocuments
=
GcsDocuments
.
newBuilder
().
addDocuments
(
gcsDocument
).
build
();
BatchDocumentsInputConfig
inputConfig
=
BatchDocumentsInputConfig
.
newBuilder
().
setGcsDocuments
(
gcsDocuments
).
build
();
String
fullGcsPath
=
String
.
format
(
"gs://%s/%s/"
,
gcsOutputBucketName
,
gcsOutputUriPrefix
);
GcsOutputConfig
gcsOutputConfig
=
GcsOutputConfig
.
newBuilder
().
setGcsUri
(
fullGcsPath
).
build
();
DocumentOutputConfig
documentOutputConfig
=
DocumentOutputConfig
.
newBuilder
().
setGcsOutputConfig
(
gcsOutputConfig
).
build
();
// Configure the batch process request.
BatchProcessRequest
request
=
BatchProcessRequest
.
newBuilder
()
.
setName
(
name
)
.
setInputDocuments
(
inputConfig
)
.
setDocumentOutputConfig
(
documentOutputConfig
)
.
build
();
OperationFuture<BatchProcessResponse
,
BatchProcessMetadata
>
future
=
client
.
batchProcessDocumentsAsync
(
request
);
// Batch process document using a long-running operation.
// You can wait for now, or get results later.
// Note: first request to the service takes longer than subsequent
// requests.
System
.
out
.
println
(
"Waiting for operation to complete..."
);
future
.
get
();
System
.
out
.
println
(
"Document processing complete."
);
Storage
storage
=
StorageOptions
.
newBuilder
().
setProjectId
(
projectId
).
build
().
getService
();
Bucket
bucket
=
storage
.
get
(
gcsOutputBucketName
);
// List all of the files in the Storage bucket.
Page<Blob>
blobs
=
bucket
.
list
(
Storage
.
BlobListOption
.
prefix
(
gcsOutputUriPrefix
+
"/"
));
int
idx
=
0
;
for
(
Blob
blob
:
blobs
.
iterateAll
())
{
if
(
!
blob
.
isDirectory
())
{
System
.
out
.
printf
(
"Fetched file #%d\n"
,
++
idx
);
// Read the results
// Download and store json data in a temp file.
File
tempFile
=
File
.
createTempFile
(
"file"
,
".json"
);
Blob
fileInfo
=
storage
.
get
(
BlobId
.
of
(
gcsOutputBucketName
,
blob
.
get
Name ()));
fileInfo
.
downloadTo
(
tempFile
.
toPath
());
// Parse json file into Document.
FileReader
reader
=
new
FileReader
(
tempFile
);
Document
.
Builder
builder
=
Document
.
newBuilder
();
JsonFormat
.
parser
().
merge
(
reader
,
builder
);
Document
document
=
builder
.
build
();
// Get all of the document text as one big string.
String
text
=
document
.
getText
();
// Read the text recognition output from the processor
System
.
out
.
println
(
"The document contains the following paragraphs:"
);
Document
.
Page
page1
=
document
.
getPages
(
0
);
List<Document
.
Page
.
Paragraph
>
paragraphList
=
page1
.
getParagraphsList
();
for
(
Document
.
Page
.
Paragraph
paragraph
:
paragraphList
)
{
String
paragraphText
=
getText
(
paragraph
.
getLayout
().
getTextAnchor
(),
text
);
System
.
out
.
printf
(
"Paragraph text:%s\n"
,
paragraphText
);
}
// Form parsing provides additional output about
// form-formatted PDFs. You must create a form
// processor in the Cloud Console to see full field details.
System
.
out
.
println
(
"The following form key/value pairs were detected:"
);
for
(
Document
.
Page
.
FormField
field
:
page1
.
getFormFieldsList
())
{
String
fieldName
=
getText
(
field
.
getFieldName
().
getTextAnchor
(),
text
);
String
fieldValue
=
getText
(
field
.
getFieldValue
().
getTextAnchor
(),
text
);
System
.
out
.
println
(
"Extracted form fields pair:"
);
System
.
out
.
printf
(
"\t(%s, %s))"
,
fieldName
,
fieldValue
);
}
// Clean up temp file.
tempFile
.
deleteOnExit
();
}
}
}
}
// Extract shards from the text field
private
static
String
getText
(
Document
.
TextAnchor
textAnchor
,
String
text
)
{
if
(
textAnchor
.
getTextSegmentsList
().
size
()
>
0
)
{
int
startIdx
=
(
int
)
textAnchor
.
getTextSegments
(
0
).
getStartIndex
();
int
endIdx
=
(
int
)
textAnchor
.
getTextSegments
(
0
).
getEndIndex
();
return
text
.
substring
(
startIdx
,
endIdx
);
}
return
"[NO TEXT]"
;
}
}
Node.js
For more information, see the Document AI Node.js API reference documentation .
To authenticate to Document AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const location = 'YOUR_PROJECT_LOCATION'; // Format is 'us' or 'eu'
// const processorId = 'YOUR_PROCESSOR_ID';
// const gcsInputUri = 'YOUR_SOURCE_PDF';
// const gcsOutputUri = 'YOUR_STORAGE_BUCKET';
// const gcsOutputUriPrefix = 'YOUR_STORAGE_PREFIX';
// Imports the Google Cloud client library
const
{
DocumentProcessorServiceClient
}
=
require
(
' @google-cloud/documentai
'
).
v1
;
const
{
Storage
}
=
require
(
' @google-cloud/storage
'
);
// Instantiates Document AI, Storage clients
const
client
=
new
DocumentProcessorServiceClient
();
const
storage
=
new
Storage
();
const
{
default
:
PQueue
}
=
require
(
'p-queue'
);
async
function
batchProcessDocument
()
{
const
name
=
`projects/
${
projectId
}
/locations/
${
location
}
/processors/
${
processorId
}
`
;
// Configure the batch process request.
const
request
=
{
name
,
inputDocuments
:
{
gcsDocuments
:
{
documents
:
[
{
gcsUri
:
gcsInputUri
,
mimeType
:
'application/pdf'
,
},
],
},
},
documentOutputConfig
:
{
gcsOutputConfig
:
{
gcsUri
:
`
${
gcsOutputUri
}
/
${
gcsOutputUriPrefix
}
/`
,
},
},
};
// Batch process document using a long-running operation.
// You can wait for now, or get results later.
// Note: first request to the service takes longer than subsequent
// requests.
const
[
operation
]
=
await
client
.
batchProcessDocuments
(
request
);
// Wait for operation to complete.
await
operation
.
promise
();
console
.
log
(
'Document processing complete.'
);
// Query Storage bucket for the results file(s).
const
query
=
{
prefix
:
gcsOutputUriPrefix
,
};
console
.
log
(
'Fetching results ...'
);
// List all of the files in the Storage bucket
const
[
files
]
=
await
storage
.
bucket
(
gcsOutputUri
).
getFiles
(
query
);
// Add all asynchronous downloads to queue for execution.
const
queue
=
new
PQueue
({
concurrency
:
15
});
const
tasks
=
files
.
map
((
fileInfo
,
index
)
=
>
async
()
=
>
{
// Get the file as a buffer
const
[
file
]
=
await
fileInfo
.
download
();
console
.
log
(
`Fetched file #
${
index
+
1
}
:`
);
// The results stored in the output Storage location
// are formatted as a document object.
const
document
=
JSON
.
parse
(
file
.
toString
());
const
{
text
}
=
document
;
// Extract shards from the text field
const
getText
=
textAnchor
=
>
{
if
(
!
textAnchor
.
textSegments
||
textAnchor
.
textSegments
.
length
===
0
)
{
return
''
;
}
// First shard in document doesn't have startIndex property
const
startIndex
=
textAnchor
.
textSegments
[
0
].
startIndex
||
0
;
const
endIndex
=
textAnchor
.
textSegments
[
0
].
endIndex
;
return
text
.
substring
(
startIndex
,
endIndex
);
};
// Read the text recognition output from the processor
console
.
log
(
'The document contains the following paragraphs:'
);
const
[
page1
]
=
document
.
pages
;
const
{
paragraphs
}
=
page1
;
for
(
const
paragraph
of
paragraphs
)
{
const
paragraphText
=
getText
(
paragraph
.
layout
.
textAnchor
);
console
.
log
(
`Paragraph text:\n
${
paragraphText
}
`
);
}
// Form parsing provides additional output about
// form-formatted PDFs. You must create a form
// processor in the Cloud Console to see full field details.
console
.
log
(
'\nThe following form key/value pairs were detected:'
);
const
{
formFields
}
=
page1
;
for
(
const
field
of
formFields
)
{
const
fieldName
=
getText
(
field
.
fieldName
.
textAnchor
);
const
fieldValue
=
getText
(
field
.
fieldValue
.
textAnchor
);
console
.
log
(
'Extracted key value pair:'
);
console
.
log
(
`\t(
${
fieldName
}
,
${
fieldValue
}
)`
);
}
});
await
queue
.
addAll
(
tasks
);
}
Python
For more information, see the Document AI Python API reference documentation .
To authenticate to Document AI, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
re
from
typing
import
Optional
from
google.api_core.client_options
import
ClientOptions
from
google.api_core.exceptions
import
InternalServerError
from
google.api_core.exceptions
import
RetryError
from
google.cloud
import
documentai
# type: ignore
from
google.cloud
import
storage
# TODO(developer): Uncomment these variables before running the sample.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu"
# processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample
# gcs_output_uri = "YOUR_OUTPUT_URI" # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Example: pretrained-ocr-v1.0-2020-09-23
# TODO(developer): You must specify either `gcs_input_uri` and `mime_type` or `gcs_input_prefix`
# gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf
# input_mime_type = "application/pdf"
# gcs_input_prefix = "YOUR_INPUT_URI_PREFIX" # Format: gs://bucket/directory/
# field_mask = "text,entities,pages.pageNumber" # Optional. The fields to return in the Document object.
def
batch_process_documents
(
project_id
:
str
,
location
:
str
,
processor_id
:
str
,
gcs_output_uri
:
str
,
processor_version_id
:
Optional
[
str
]
=
None
,
gcs_input_uri
:
Optional
[
str
]
=
None
,
input_mime_type
:
Optional
[
str
]
=
None
,
gcs_input_prefix
:
Optional
[
str
]
=
None
,
field_mask
:
Optional
[
str
]
=
None
,
timeout
:
int
=
400
,
)
-
> None
:
# You must set the `api_endpoint` if you use a location other than "us".
opts
=
ClientOptions
(
api_endpoint
=
f
"
{
location
}
-documentai.googleapis.com"
)
client
=
documentai
.
DocumentProcessorServiceClient
(
client_options
=
opts
)
if
gcs_input_uri
:
# Specify specific GCS URIs to process individual documents
gcs_document
=
documentai
.
GcsDocument
(
gcs_uri
=
gcs_input_uri
,
mime_type
=
input_mime_type
)
# Load GCS Input URI into a List of document files
gcs_documents
=
documentai
.
GcsDocuments
(
documents
=
[
gcs_document
])
input_config
=
documentai
.
BatchDocumentsInputConfig
(
gcs_documents
=
gcs_documents
)
else
:
# Specify a GCS URI Prefix to process an entire directory
gcs_prefix
=
documentai
.
GcsPrefix
(
gcs_uri_prefix
=
gcs_input_prefix
)
input_config
=
documentai
.
BatchDocumentsInputConfig
(
gcs_prefix
=
gcs_prefix
)
# Cloud Storage URI for the Output Directory
gcs_output_config
=
documentai
.
DocumentOutputConfig
.
GcsOutputConfig
(
gcs_uri
=
gcs_output_uri
,
field_mask
=
field_mask
)
# Where to write results
output_config
=
documentai
.
DocumentOutputConfig
(
gcs_output_config
=
gcs_output_config
)
if
processor_version_id
:
# The full resource name of the processor version, e.g.:
# projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
name
=
client
.
processor_version_path
(
project_id
,
location
,
processor_id
,
processor_version_id
)
else
:
# The full resource name of the processor, e.g.:
# projects/{project_id}/locations/{location}/processors/{processor_id}
name
=
client
.
processor_path
(
project_id
,
location
,
processor_id
)
request
=
documentai
.
BatchProcessRequest
(
name
=
name
,
input_documents
=
input_config
,
document_output_config
=
output_config
,
)
# BatchProcess returns a Long Running Operation (LRO)
operation
=
client
.
batch_process_documents
(
request
)
# Continually polls the operation until it is complete.
# This could take some time for larger files
# Format: projects/{project_id}/locations/{location}/operations/{operation_id}
try
:
print
(
f
"Waiting for operation
{
operation
.
operation
.
name
}
to complete..."
)
operation
.
result
(
timeout
=
timeout
)
# Catch exception when operation doesn't finish before timeout
except
(
RetryError
,
InternalServerError
)
as
e
:
print
(
e
.
message
)
# NOTE: Can also use callbacks for asynchronous processing
#
# def my_callback(future):
# result = future.result()
#
# operation.add_done_callback(my_callback)
# After the operation is complete,
# get output document information from operation metadata
metadata
=
documentai
.
BatchProcessMetadata
(
operation
.
metadata
)
if
metadata
.
state
!=
documentai
.
BatchProcessMetadata
.
State
.
SUCCEEDED
:
raise
ValueError
(
f
"Batch Process Failed:
{
metadata
.
state_message
}
"
)
storage_client
=
storage
.
Client
()
print
(
"Output files:"
)
# One process per Input Document
for
process
in
list
(
metadata
.
individual_process_statuses
):
# output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
# The Cloud Storage API requires the bucket name and URI prefix separately
matches
=
re
.
match
(
r
"gs://(.*?)/(.*)"
,
process
.
output_gcs_destination
)
if
not
matches
:
print
(
"Could not parse output GCS destination:"
,
process
.
output_gcs_destination
,
)
continue
output_bucket
,
output_prefix
=
matches
.
groups
()
# Get List of Document Objects from the Output Bucket
output_blobs
=
storage_client
.
list_blobs
(
output_bucket
,
prefix
=
output_prefix
)
# Document AI may output multiple JSON files per source file
for
blob
in
output_blobs
:
# Document AI should only output JSON files to GCS
if
blob
.
content_type
!=
"application/json"
:
print
(
f
"Skipping non-supported file:
{
blob
.
name
}
- Mimetype:
{
blob
.
content_type
}
"
)
continue
# Download JSON File as bytes object and convert to Document Object
print
(
f
"Fetching
{
blob
.
name
}
"
)
document
=
documentai
.
Document
.
from_json
(
blob
.
download_as_bytes
(),
ignore_unknown_fields
=
True
)
# For a full list of Document object attributes, please reference this page:
# https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document
# Read the text recognition output from the processor
print
(
"The document contains the following text:"
)
print
(
document
.
text
)
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .