Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import subprocess
- import time
- from kubernetes import client, config, watch
- import boto3
- import os
- import requests
- # Load Kubernetes configuration
- config.load_incluster_config()
- v1 = client.CoreV1Api()
- # Initialize S3 client using environment variables
- s3 = boto3.client(
- 's3',
- aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
- aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
- )
- bucket_name = 'k8-crash-logs'
- # RingCentral webhook URL
- ringcentral_webhook_url = 'https://hooks.ringcentral.com/'
- # Function to get pod logs
- def get_pod_logs(pod_name, namespace='default', previous=False):
- logs = v1.read_namespaced_pod_log(name=pod_name, namespace=namespace, previous=previous)
- return logs
- # Function to upload logs to S3 and generate a presigned URL valid for 5 days
- def upload_to_s3_and_get_url(log_data, bucket_name, namespace, pod_name, container_name):
- try:
- timestamp = int(time.time())
- file_name = f"{namespace}/{pod_name}-{container_name}-restart-log-{timestamp}.txt"
- s3.put_object(Body=log_data, Bucket=bucket_name, Key=file_name)
- url = s3.generate_presigned_url(
- 'get_object',
- Params={'Bucket': bucket_name, 'Key': file_name},
- ExpiresIn=5 * 24 * 60 * 60 # URL valid for 5 days (in seconds)
- )
- return url
- except s3.exceptions.NoSuchBucket:
- print(f"Bucket {bucket_name} does not exist. Please create it or check the bucket name.")
- return None
- # Function to analyze log file and remove color encoding
- def analyze_log(log_data):
- try:
- # Run grep command to filter log data and remove color encoding
- grep_command = "grep -P -A 2 'stack:' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 3"
- result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
- # Check if the result is empty
- if result.stdout.strip() == "":
- # Update grep command
- grep_command = "grep -P '(WARN|ERROR|^\tat |Exception|^Caused by: |\t... \d+ more)' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 1"
- # Re-run the updated grep command
- result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
- if result.stdout.strip() == "":
- # Update grep command
- grep_command = "grep -P -A 2 'rror:' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 3"
- # Re-run the updated grep command
- result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
- if result.stdout.strip() == "":
- # Update grep command
- grep_command = "grep -P -A 2 'rror' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 3"
- # Re-run the updated grep command
- result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
- filtered_logs = result.stdout.strip()
- if len(filtered_logs) < 400:
- return filtered_logs
- else:
- return None
- except Exception as e:
- print(f"Error analyzing log: {e}")
- return None
- # Function to send a message to RingCentral webhook with log analysis result
- def notify_ringcentral(pod_name, obj_url, log_analysis_result):
- if log_analysis_result:
- adaptive_card_json = {
- "attachments": [
- {
- "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
- "type": "AdaptiveCard",
- "version": "1.0",
- "body": [
- {
- "type": "TextBlock",
- "text": f"[{pod_name} crashed]({obj_url})",
- "weight": "bolder",
- "size": "medium",
- "wrap": True
- },
- {
- "type": "FactSet",
- "facts": [
- {
- "title": f"{log_analysis_result}",
- "value": ""
- }
- ]
- }
- ]
- }
- ]
- }
- else:
- adaptive_card_json = {
- "attachments": [
- {
- "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
- "type": "AdaptiveCard",
- "version": "1.0",
- "body": [
- {
- "type": "TextBlock",
- "text": f"[{pod_name} crashed]({obj_url})",
- "weight": "bolder",
- "size": "medium",
- "wrap": True
- }
- ]
- }
- ]
- }
- headers = {
- 'Content-Type': 'application/json'
- }
- response = requests.post(ringcentral_webhook_url, json=adaptive_card_json, headers=headers)
- if response.status_code == 200:
- print(f"Notification sent to RingCentral: [{pod_name} crashed]({obj_url})")
- else:
- print(f"Failed to send notification to RingCentral: {response.text}")
- # Store previous restart counts to detect restarts
- pod_restart_counts = {}
- last_seen_version = ""
- # Watch for pod events in all namespaces
- w = watch.Watch()
- try:
- while True:
- # Use the last_seen_version in the stream call
- for event in w.stream(v1.list_pod_for_all_namespaces, resource_version=last_seen_version):
- pod = event['object']
- pod_name = pod.metadata.name
- namespace = pod.metadata.namespace
- # Skip processing if the namespace is "log-grabber" or "datastore-worker"
- if namespace in ["log-grabber", "datastore-worker"]:
- continue
- # Update last_seen_version after processing each event
- last_seen_version = pod.metadata.resource_version
- if event['type'] == 'MODIFIED' and pod.status.container_statuses is not None:
- for container_status in pod.status.container_statuses:
- container_name = container_status.name
- restart_count = container_status.restart_count
- previous_restart_count = pod_restart_counts.get((namespace, pod_name, container_name), 0)
- if restart_count > previous_restart_count:
- print(f"Container {container_name} in pod {pod_name} in namespace {namespace} has restarted.")
- pod_restart_counts[(namespace, pod_name, container_name)] = restart_count
- try:
- logs = get_pod_logs(pod_name, namespace, previous=True)
- obj_url = upload_to_s3_and_get_url(logs, bucket_name, namespace, pod_name, container_name)
- if obj_url:
- log_analysis_result = analyze_log(logs)
- notify_ringcentral(pod_name, obj_url, log_analysis_result)
- else:
- print(f"Failed to generate presigned URL for {file_name}")
- except Exception as e:
- print(f"Failed to get or upload logs for pod {pod_name} in namespace {namespace}: {e}")
- # Update the pod restart counts
- if pod.status.container_statuses is not None:
- for container_status in pod.status.container_statuses:
- container_name = container_status.name
- restart_count = container_status.restart_count
- pod_restart_counts[(namespace, pod_name, container_name)] = restart_count
- except Exception as e:
- print(f"An error occurred: {e}")
- time.sleep(10) # Add a delay before retrying
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement