In a Databricks operations script, you need to process exported log lines that may be either JSON or CSV, keep only valid records for a target service, and compute summary metrics.
Write a function that reads a list of log lines and returns aggregated metrics for records that match the requested service and have status >= min_status.
Implement aggregate_log_metrics(lines, service, min_status).
lines: list of strings, where each string is either:
timestamp, service, status, latency_mstimestamp,service,status,latency_msservice: string target service namemin_status: integer thresholdReturn a dictionary with:
count: number of matching valid recordstotal_latency: sum of latency_msavg_latency: floor division average latency, or 0 if no records matchstatus_counts: dictionary mapping each matching status code to its frequencylatest_timestamp: lexicographically largest timestamp among matching records, or "" if noneIgnore malformed lines, records with missing fields, non-integer status or latency_ms, or negative latency.
Example 1
Input: lines = ['{"timestamp":"2024-01-01T00:00:00Z","service":"jobs","status":200,"latency_ms":120}', '2024-01-01T00:01:00Z,jobs,500,300', '2024-01-01T00:02:00Z,sql,200,50'], service = 'jobs', min_status = 400
Output: {'count': 1, 'total_latency': 300, 'avg_latency': 300, 'status_counts': {'500': 1}, 'latest_timestamp': '2024-01-01T00:01:00Z'}
Explanation: Only the CSV jobs record with status 500 matches the filter.
Example 2
Input: lines = ['bad line', '2024-01-01T00:00:00Z,jobs,200,-5', '{"timestamp":"2024-01-01T00:03:00Z","service":"jobs","status":404,"latency_ms":80}'], service = 'jobs', min_status = 200
Output: {'count': 1, 'total_latency': 80, 'avg_latency': 80, 'status_counts': {'404': 1}, 'latest_timestamp': '2024-01-01T00:03:00Z'}
Explanation: Malformed input and negative latency are ignored.
1 <= len(lines) <= 10^50 <= len(lines[i]) <= 10^4100 <= min_status <= 599