"""Detects uses of deprecated labels in charts, scorecards, alerts, and groups. More information about this deprecation at https://cloud.google.com/stackdriver/docs/deprecations/metadata-labels. """ import re import sys from google.cloud import monitoring_v3 from google.cloud.monitoring_dashboard import v1 from google.protobuf.json_format import MessageToDict MIGRATION = [ { 'resource_type': 'cloud_tasks_queue', 'label': 'metadata.system_labels.state' }, { 'resource_type': 'gce_instance', 'label': 'metadata.system_labels.maintenance_mode' }, { 'resource_type': 'l7_lb_rule', 'label': 'metadata.system_labels.state' }, { 'resource_type': 'dataflow_job', 'label': 'metadata.system_labels.name' }, { 'resource_type': 'dataflow_job', 'label': 'metadata.system_labels.state' }, { 'resource_type': '*', 'label': 'metadata.system_labels.cloud_account' }, { 'resource_type': 'vpn_gateway', 'label': 'metadata.system_labels.region' }, { 'resource_type': 'gce_instance', 'label': 'metadata.user_labels.name' }, { 'resource_type': 'gce_disk', 'label': 'metadata.user_labels.name' }, { 'resource_type': 'cloud_tasks_queue', 'label': 'metadata.system_labels.name' }, { 'resource_type': 'gae_app', 'label': 'metadata.system_labels.gaeapp' }, { 'resource_type': 'gae_app', 'label': 'metadata.system_labels.gaemodule' }, { 'resource_type': 'gae_app', 'label': 'metadata.system_labels.gaeversion' }, { 'resource_type': 'pubsub_subscription', 'label': 'metadata.system_labels.name', 'pubsub_lowercase': True }, { 'resource_type': 'pubsub_topic', 'label': 'metadata.system_labels.name', 'pubsub_lowercase': True }, ] def filters_match_rule_type(filters, resource_type): """Determine whether the given filters always match the given resource type.""" def has_resource_type(filter_str): return re.search(r'resource.type', filter_str) is not None def is_aws_resource_type(filter_str): return re.search(r'resource.type\s*=\s*(aws_\w+|"aws_\w+")', filter_str) is not None def match_resource_type(resource_type, filter_str): regex = r'resource.type\s*=\s*' + f'({resource_type}|"{resource_type}")' return re.search(regex, filter_str) is not None if resource_type == '*': return all(not is_aws_resource_type(filter_str) for filter_str in filters) return any( match_resource_type(resource_type, filter_str) for filter_str in filters) or all( not has_resource_type(filter_str) for filter_str in filters) def filters_match_rule_label(filters, label): """Determine whether the given filters always match the given label.""" def has_metadata_user_label(label, input_str): regex = f'metadata.(user_labels|userLabels).({label}|"{label}")' regex += '|' + f'resource.metadata.tag.({label}|"{label}")' return re.search(regex, input_str) is not None def has_metadata_system_label(label, input_str): regex = f'metadata.(system_labels|systemLabels).({label}|"{label}")' regex += '|' + f'resource.metadata.({label}|"{label}")' return re.search(regex, input_str) is not None parts = label.split('.') if parts[1] == 'system_labels': return any( has_metadata_system_label(parts[2], input_str) for input_str in filters) else: return any( has_metadata_user_label(parts[2], input_str) for input_str in filters) def rule_to_message(rule, object_type, has_resource_type): """Define the message to print when a rule is triggered.""" resource_type = '' condition = '' if rule['resource_type'] == '*': resource_type = 'non-AWS resource types' condition = f'if this {object_type} is used for non-AWS resources' else: resource_type = rule['resource_type'] condition = f'if this {object_type} is used for {resource_type} resources' if 'pubsub_lowercase' in rule: return ( f'Values for {rule["label"]} for {rule["resource_type"]} will no ' f'longer be automatically converted to lowercase. Please manually ' f'update this {object_type} if it applies to {rule["resource_type"]} ' f'resources with uppercase letters.') action = '' if 'migrate_to' in rule: if has_resource_type: action = f'This reference will be auto-migrated to {rule["migrate_to"]}' else: action = f'Please manually migrate to {rule["migrate_to"]} {condition}' else: if has_resource_type: action = 'Please manually remove the reference' else: action = f'Please manually remove the reference {condition}' return f'{rule["label"]} is deprecated for {resource_type}. {action}.' def apply_rule(rule, obj): """Applies a single rule to an object with filters and group-by fields.""" labels_input = obj['filters'].copy() if 'group_by_fields' in obj and 'pubsub_lowercase' not in rule: labels_input += obj['group_by_fields'] def has_resource_type(filters): return any( re.search(r'resource.type', filter_str) is not None for filter_str in filters) affected = filters_match_rule_type( obj['filters'], rule['resource_type']) and filters_match_rule_label( labels_input, rule['label']) return affected, rule_to_message(rule, obj['object_type'], has_resource_type(obj['filters'])) def apply_rules(obj): """Applies all the rules to an object.""" msgs = [] affected = False for rule in MIGRATION: result, msg = apply_rule(rule, obj) if result: msgs.append(msg) affected |= result return affected, msgs def get_filters(cond): """Get the list of filters from an alert policy condition.""" filters = [] if cond.HasField('condition_threshold'): filters.append(cond.condition_threshold.filter) if cond.condition_threshold.denominator_filter: filters.append(cond.condition_threshold.denominator_filter) if cond.HasField('condition_absent'): filters.append(cond.condition_absent.filter) return filters def get_group_by_fields(cond): """Get the list of group-by fields from an alert policy condition.""" aggs = [] if cond.HasField('condition_threshold'): aggs += cond.condition_threshold.aggregations aggs += cond.condition_threshold.denominator_aggregations if cond.HasField('condition_absent'): aggs += cond.condition_absent.aggregations group_by_fields = [] for agg in aggs: group_by_fields += agg.group_by_fields return group_by_fields def get_alert_policy_conditions_for_project(name): """Get a map from alert policy names to their filters and group-by fields.""" client = monitoring_v3.AlertPolicyServiceClient() conditions = [] for policy in client.list_alert_policies(client.project_path(name)): for condition in policy.conditions: conditions.append({ 'name': policy.name, 'filters': get_filters(condition), 'group_by_fields': get_group_by_fields(condition), 'display_name': f'{condition.display_name}" in alert policy "{policy.display_name}', 'url': f'https://console.cloud.google.com/monitoring/alerting/policies/{policy.name.split("/")[-1]}?project={name}', 'object_type': 'condition', }) return conditions def get_groups_for_project(name): """Get a map from group names to their filters.""" client = monitoring_v3.GroupServiceClient() groups = [] for group in client.list_groups(client.project_path(name)): groups.append({ 'name': group.name, 'filters': [group.filter], 'display_name': group.display_name, 'url': f'https://console.cloud.google.com/monitoring/groups/{group.name.split("/")[-1]}?project={name}', 'object_type': 'group', }) return groups def get_dashboard_widgets(dashboard): """Get the list of widgets in a dashboard.""" if dashboard.HasField('grid_layout'): return dashboard.grid_layout.widgets elif dashboard.HasField('row_layout'): widgets = [] for row in dashboard.row_layout.rows: widgets.extend(row.widgets) return widgets else: # must be column_layout widgets = [] for column in dashboard.column_layout.columns: widgets.extend(column.widgets) return widgets def get_dataset_queries(dataset, display_name, name): """Get the time series queries from a dataset.""" if dataset.time_series_query.HasField('time_series_filter'): return [dataset.time_series_query.time_series_filter] elif dataset.time_series_query.HasField('time_series_filter_ratio'): return [dataset.time_series_query.time_series_filter_ratio.numerator, dataset.time_series_query.time_series_filter_ratio.denominator] else: # must be time_series_query_language print(f'WARNING: Dashboard {display_name} ({name}) uses an MQL query. This ' f'script does not detect labels used in MQL queries.\n') return [] def get_widget_queries(widget, dashboard_display_name, dashboard_name): """Get the list of time series queries in a dashboard widget.""" if widget.HasField('xy_chart'): queries = [] for dataset in widget.xy_chart.data_sets: queries.extend( get_dataset_queries(dataset, dashboard_display_name, dashboard_name)) return queries elif widget.HasField('scorecard'): return [widget.scorecard.time_series_query] else: return [] def get_fields_from_dashboard_query_helper(field, results, dictionary): """Add all instances of the given field to the set of results.""" for k, v in dictionary.items(): if k == field: if isinstance(v, list): results += v else: results.append(v) elif isinstance(v, dict): get_fields_from_dashboard_query_helper(field, results, v) elif isinstance(v, list): for i in v: if isinstance(i, dict): get_fields_from_dashboard_query_helper(field, results, i) def get_fields_from_dashboard_query(field, query): """Get all instances of the given field in the given widget.""" dictionary = MessageToDict(query) results = [] get_fields_from_dashboard_query_helper(field, results, dictionary) return results def widget_name(widget): if widget.title is not None: return widget.title else: return '(unnamed)' def get_dashboard_queries_for_project(name): """Get a map from dashboard names to their filters.""" client = v1.DashboardsServiceClient() queries = [] for dashboard in client.list_dashboards('projects/' + name): for widget in get_dashboard_widgets(dashboard): for query in get_widget_queries(widget, dashboard.display_name, dashboard.name): queries.append({ 'name': dashboard.name, 'filters': get_fields_from_dashboard_query('filter', query), 'group_by_fields': get_fields_from_dashboard_query('groupByFields', query), 'display_name': f'{widget_name(widget)}" in dashboard "{dashboard.display_name}', 'url': f'https://console.cloud.google.com/monitoring/dashboards/custom/{dashboard.name.split("/")[-1]}?project={name}', 'object_type': widget.WhichOneof('content'), }) return queries def report(resource_name, resource_obj, msgs): """Report migration message.""" def has_resource_type(filter_str): return re.search(r'resource.type', filter_str) is not None missing_resource_type = all(not has_resource_type(filter_str) for filter_str in resource_obj['filters']) migration = '\n'.join('\t' + msg for msg in msgs) summary = 'is affected' if missing_resource_type: summary = ('might be affected, depending on the type of resources it is ' 'used for') return (f'{resource_obj["object_type"].capitalize()} ' f'"{resource_obj["display_name"]}" ({resource_name}) {summary}.' f'\nURL: {resource_obj["url"]}\n{migration}\n') def detect(name): """Detect if resources in the given project are potentially affected by the migration.""" items = [ *get_alert_policy_conditions_for_project(name), *get_groups_for_project(name), *get_dashboard_queries_for_project(name) ] any_affected = False for item in items: affected, msgs = apply_rules(item) if affected: print(report(item['name'], item, msgs)) any_affected = any_affected or affected if not any_affected: print('This project does not use any deprecated labels.') if __name__ == '__main__': if len(sys.argv) != 2: print('usage: python3 metadata_label_detection.py [project id or number]') exit(1) detect(sys.argv[1])
Design a Mobile Site
View Site in Mobile | Classic
Share by: