Package dataflux provides an easy way to parallelize listing in Google Cloud Storage.
More information about Google Cloud Storage is available at https://cloud.google.com/storage/docs .
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
NOTE: This package is in preview. It is not stable, and is likely to change.
Lister
type
Lister
struct
{
// contains filtered or unexported fields
}
Lister is used for interacting with Dataflux fast-listing. The caller should initialize it with NewLister() instead of creating it directly.
Example
Example
package
main
import
(
"context"
"log"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/dataflux"
"google.golang.org/api/iterator"
)
func
main
()
{
ctx
:=
context
.
Background
()
// Pass in any client opts or set retry policy here.
client
,
err
:=
storage
.
NewClient
(
ctx
)
if
err
!=
nil
{
// handle error
}
// Create dataflux fast-list input and provide desired options,
// including number of workers, batch size, query to filer objects, etc.
in
:=
& dataflux
.
ListerInput
{
BucketName
:
"mybucket"
,
// Optionally specify params to apply to lister.
Parallelism
:
100
,
BatchSize
:
500000
,
Query
:
storage
.
Query
{},
SkipDirectoryObjects
:
false
,
}
// Create Lister with fast-list input.
df
:=
dataflux
.
NewLister
(
client
,
in
)
defer
df
.
Close
()
var
numOfObjects
int
for
{
objects
,
err
:=
df
.
NextBatch
(
ctx
)
if
err
!=
nil
{
// handle error
}
if
err
==
iterator
.
Done
{
numOfObjects
+=
len
(
objects
)
// No more objects in the bucket to list.
break
}
if
err
!=
nil
{
// handle error
}
numOfObjects
+=
len
(
objects
)
}
log
.
Printf
(
"listing %d objects in bucket %q is complete."
,
numOfObjects
,
in
.
BucketName
)
}
func NewLister
func
NewLister
(
c
*
storage
.
Client
,
in
*
ListerInput
)
*
Lister
NewLister creates a new [Lister] that can be used to list objects in the given bucket.
func (*Lister) Close
func
(
c
*
Lister
)
Close
()
Close is used to close the Lister.
func (*Lister) NextBatch
NextBatch returns the next N objects in the bucket, where N is [ListerInput.BatchSize]. In case of failure, all processes are stopped and an error is returned immediately. Create a new Lister to retry. For the first batch, both worksteal listing and sequential listing runs in parallel to quickly list N number of objects in the bucket. For subsequent batches, only the method which returned object faster in the first batch is used. For smaller dataset, sequential listing is expected to be faster. For larger dataset, worksteal listing is expected to be faster.
Worksteal algorithm list objects in GCS bucket in parallel using multiple parallel workers and each worker in the list operation is able to steal work from its siblings once it has finished all currently slated listing work.
ListerInput
type
ListerInput
struct
{
// BucketName is the name of the bucket to list objects from. Required.
BucketName
string
// Parallelism is number of parallel workers to use for listing.
// Default value is 10x number of available CPU. Optional.
Parallelism
int
// BatchSize is the minimum number of objects to list in each batch.
// The number of objects returned in a batch will be rounded up to
// include all the objects received in the last request to GCS.
// By default, the Lister returns all objects in one batch.
// Optional.
BatchSize
int
// Query is the query to filter objects for listing. Default value is nil.
// Use ProjectionNoACL for faster listing. Including ACLs increases
// latency while fetching objects. Optional.
Query
storage
.
Query
// SkipDirectoryObjects is to indicate whether to list directory objects.
// Note: Even if directory objects are excluded, they contribute to the
// [ListerInput.BatchSize] count. Default value is false. Optional.
SkipDirectoryObjects
bool
}
ListerInput contains options for listing objects.