"""Detects uses of deprecated labels in charts, scorecards, alerts, and groups.
More information about this deprecation at
https://cloud.google.com/stackdriver/docs/deprecations/metadata-labels.
"""
import re
import sys
from google.cloud import monitoring_v3
from google.cloud.monitoring_dashboard import v1
from google.protobuf.json_format import MessageToDict
MIGRATION = [
{
'resource_type': 'cloud_tasks_queue',
'label': 'metadata.system_labels.state'
},
{
'resource_type': 'gce_instance',
'label': 'metadata.system_labels.maintenance_mode'
},
{
'resource_type': 'l7_lb_rule',
'label': 'metadata.system_labels.state'
},
{
'resource_type': 'dataflow_job',
'label': 'metadata.system_labels.name'
},
{
'resource_type': 'dataflow_job',
'label': 'metadata.system_labels.state'
},
{
'resource_type': '*',
'label': 'metadata.system_labels.cloud_account'
},
{
'resource_type': 'vpn_gateway',
'label': 'metadata.system_labels.region'
},
{
'resource_type': 'gce_instance',
'label': 'metadata.user_labels.name'
},
{
'resource_type': 'gce_disk',
'label': 'metadata.user_labels.name'
},
{
'resource_type': 'cloud_tasks_queue',
'label': 'metadata.system_labels.name'
},
{
'resource_type': 'gae_app',
'label': 'metadata.system_labels.gaeapp'
},
{
'resource_type': 'gae_app',
'label': 'metadata.system_labels.gaemodule'
},
{
'resource_type': 'gae_app',
'label': 'metadata.system_labels.gaeversion'
},
{
'resource_type': 'pubsub_subscription',
'label': 'metadata.system_labels.name',
'pubsub_lowercase': True
},
{
'resource_type': 'pubsub_topic',
'label': 'metadata.system_labels.name',
'pubsub_lowercase': True
},
]
def filters_match_rule_type(filters, resource_type):
"""Determine whether the given filters always match the given resource type."""
def has_resource_type(filter_str):
return re.search(r'resource.type', filter_str) is not None
def is_aws_resource_type(filter_str):
return re.search(r'resource.type\s*=\s*(aws_\w+|"aws_\w+")',
filter_str) is not None
def match_resource_type(resource_type, filter_str):
regex = r'resource.type\s*=\s*' + f'({resource_type}|"{resource_type}")'
return re.search(regex, filter_str) is not None
if resource_type == '*':
return all(not is_aws_resource_type(filter_str) for filter_str in filters)
return any(
match_resource_type(resource_type, filter_str)
for filter_str in filters) or all(
not has_resource_type(filter_str) for filter_str in filters)
def filters_match_rule_label(filters, label):
"""Determine whether the given filters always match the given label."""
def has_metadata_user_label(label, input_str):
regex = f'metadata.(user_labels|userLabels).({label}|"{label}")'
regex += '|' + f'resource.metadata.tag.({label}|"{label}")'
return re.search(regex, input_str) is not None
def has_metadata_system_label(label, input_str):
regex = f'metadata.(system_labels|systemLabels).({label}|"{label}")'
regex += '|' + f'resource.metadata.({label}|"{label}")'
return re.search(regex, input_str) is not None
parts = label.split('.')
if parts[1] == 'system_labels':
return any(
has_metadata_system_label(parts[2], input_str) for input_str in filters)
else:
return any(
has_metadata_user_label(parts[2], input_str) for input_str in filters)
def rule_to_message(rule, object_type, has_resource_type):
"""Define the message to print when a rule is triggered."""
resource_type = ''
condition = ''
if rule['resource_type'] == '*':
resource_type = 'non-AWS resource types'
condition = f'if this {object_type} is used for non-AWS resources'
else:
resource_type = rule['resource_type']
condition = f'if this {object_type} is used for {resource_type} resources'
if 'pubsub_lowercase' in rule:
return (
f'Values for {rule["label"]} for {rule["resource_type"]} will no '
f'longer be automatically converted to lowercase. Please manually '
f'update this {object_type} if it applies to {rule["resource_type"]} '
f'resources with uppercase letters.')
action = ''
if 'migrate_to' in rule:
if has_resource_type:
action = f'This reference will be auto-migrated to {rule["migrate_to"]}'
else:
action = f'Please manually migrate to {rule["migrate_to"]} {condition}'
else:
if has_resource_type:
action = 'Please manually remove the reference'
else:
action = f'Please manually remove the reference {condition}'
return f'{rule["label"]} is deprecated for {resource_type}. {action}.'
def apply_rule(rule, obj):
"""Applies a single rule to an object with filters and group-by fields."""
labels_input = obj['filters'].copy()
if 'group_by_fields' in obj and 'pubsub_lowercase' not in rule:
labels_input += obj['group_by_fields']
def has_resource_type(filters):
return any(
re.search(r'resource.type', filter_str) is not None
for filter_str in filters)
affected = filters_match_rule_type(
obj['filters'], rule['resource_type']) and filters_match_rule_label(
labels_input, rule['label'])
return affected, rule_to_message(rule, obj['object_type'],
has_resource_type(obj['filters']))
def apply_rules(obj):
"""Applies all the rules to an object."""
msgs = []
affected = False
for rule in MIGRATION:
result, msg = apply_rule(rule, obj)
if result:
msgs.append(msg)
affected |= result
return affected, msgs
def get_filters(cond):
"""Get the list of filters from an alert policy condition."""
filters = []
if cond.HasField('condition_threshold'):
filters.append(cond.condition_threshold.filter)
if cond.condition_threshold.denominator_filter:
filters.append(cond.condition_threshold.denominator_filter)
if cond.HasField('condition_absent'):
filters.append(cond.condition_absent.filter)
return filters
def get_group_by_fields(cond):
"""Get the list of group-by fields from an alert policy condition."""
aggs = []
if cond.HasField('condition_threshold'):
aggs += cond.condition_threshold.aggregations
aggs += cond.condition_threshold.denominator_aggregations
if cond.HasField('condition_absent'):
aggs += cond.condition_absent.aggregations
group_by_fields = []
for agg in aggs:
group_by_fields += agg.group_by_fields
return group_by_fields
def get_alert_policy_conditions_for_project(name):
"""Get a map from alert policy names to their filters and group-by fields."""
client = monitoring_v3.AlertPolicyServiceClient()
conditions = []
for policy in client.list_alert_policies(client.project_path(name)):
for condition in policy.conditions:
conditions.append({
'name':
policy.name,
'filters':
get_filters(condition),
'group_by_fields':
get_group_by_fields(condition),
'display_name':
f'{condition.display_name}" in alert policy "{policy.display_name}',
'url':
f'https://console.cloud.google.com/monitoring/alerting/policies/{policy.name.split("/")[-1]}?project={name}',
'object_type':
'condition',
})
return conditions
def get_groups_for_project(name):
"""Get a map from group names to their filters."""
client = monitoring_v3.GroupServiceClient()
groups = []
for group in client.list_groups(client.project_path(name)):
groups.append({
'name':
group.name,
'filters': [group.filter],
'display_name':
group.display_name,
'url':
f'https://console.cloud.google.com/monitoring/groups/{group.name.split("/")[-1]}?project={name}',
'object_type':
'group',
})
return groups
def get_dashboard_widgets(dashboard):
"""Get the list of widgets in a dashboard."""
if dashboard.HasField('grid_layout'):
return dashboard.grid_layout.widgets
elif dashboard.HasField('row_layout'):
widgets = []
for row in dashboard.row_layout.rows:
widgets.extend(row.widgets)
return widgets
else: # must be column_layout
widgets = []
for column in dashboard.column_layout.columns:
widgets.extend(column.widgets)
return widgets
def get_dataset_queries(dataset, display_name, name):
"""Get the time series queries from a dataset."""
if dataset.time_series_query.HasField('time_series_filter'):
return [dataset.time_series_query.time_series_filter]
elif dataset.time_series_query.HasField('time_series_filter_ratio'):
return [dataset.time_series_query.time_series_filter_ratio.numerator,
dataset.time_series_query.time_series_filter_ratio.denominator]
else: # must be time_series_query_language
print(f'WARNING: Dashboard {display_name} ({name}) uses an MQL query. This '
f'script does not detect labels used in MQL queries.\n')
return []
def get_widget_queries(widget, dashboard_display_name, dashboard_name):
"""Get the list of time series queries in a dashboard widget."""
if widget.HasField('xy_chart'):
queries = []
for dataset in widget.xy_chart.data_sets:
queries.extend(
get_dataset_queries(dataset, dashboard_display_name, dashboard_name))
return queries
elif widget.HasField('scorecard'):
return [widget.scorecard.time_series_query]
else:
return []
def get_fields_from_dashboard_query_helper(field, results, dictionary):
"""Add all instances of the given field to the set of results."""
for k, v in dictionary.items():
if k == field:
if isinstance(v, list):
results += v
else:
results.append(v)
elif isinstance(v, dict):
get_fields_from_dashboard_query_helper(field, results, v)
elif isinstance(v, list):
for i in v:
if isinstance(i, dict):
get_fields_from_dashboard_query_helper(field, results, i)
def get_fields_from_dashboard_query(field, query):
"""Get all instances of the given field in the given widget."""
dictionary = MessageToDict(query)
results = []
get_fields_from_dashboard_query_helper(field, results, dictionary)
return results
def widget_name(widget):
if widget.title is not None:
return widget.title
else:
return '(unnamed)'
def get_dashboard_queries_for_project(name):
"""Get a map from dashboard names to their filters."""
client = v1.DashboardsServiceClient()
queries = []
for dashboard in client.list_dashboards('projects/' + name):
for widget in get_dashboard_widgets(dashboard):
for query in get_widget_queries(widget, dashboard.display_name,
dashboard.name):
queries.append({
'name':
dashboard.name,
'filters':
get_fields_from_dashboard_query('filter', query),
'group_by_fields':
get_fields_from_dashboard_query('groupByFields', query),
'display_name':
f'{widget_name(widget)}" in dashboard "{dashboard.display_name}',
'url':
f'https://console.cloud.google.com/monitoring/dashboards/custom/{dashboard.name.split("/")[-1]}?project={name}',
'object_type':
widget.WhichOneof('content'),
})
return queries
def report(resource_name, resource_obj, msgs):
"""Report migration message."""
def has_resource_type(filter_str):
return re.search(r'resource.type', filter_str) is not None
missing_resource_type = all(not has_resource_type(filter_str)
for filter_str in resource_obj['filters'])
migration = '\n'.join('\t' + msg for msg in msgs)
summary = 'is affected'
if missing_resource_type:
summary = ('might be affected, depending on the type of resources it is '
'used for')
return (f'{resource_obj["object_type"].capitalize()} '
f'"{resource_obj["display_name"]}" ({resource_name}) {summary}.'
f'\nURL: {resource_obj["url"]}\n{migration}\n')
def detect(name):
"""Detect if resources in the given project are potentially affected by the migration."""
items = [
*get_alert_policy_conditions_for_project(name),
*get_groups_for_project(name),
*get_dashboard_queries_for_project(name)
]
any_affected = False
for item in items:
affected, msgs = apply_rules(item)
if affected:
print(report(item['name'], item, msgs))
any_affected = any_affected or affected
if not any_affected:
print('This project does not use any deprecated labels.')
if __name__ == '__main__':
if len(sys.argv) != 2:
print('usage: python3 metadata_label_detection.py [project id or number]')
exit(1)
detect(sys.argv[1])