Advertisement
Guest User

LOG_GRABBER

a guest
Jul 24th, 2024
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.01 KB | None | 0 0
  1. import subprocess
  2. import time
  3. from kubernetes import client, config, watch
  4. import boto3
  5. import os
  6. import requests
  7.  
  8. # Load Kubernetes configuration
  9. config.load_incluster_config()
  10. v1 = client.CoreV1Api()
  11.  
  12. # Initialize S3 client using environment variables
  13. s3 = boto3.client(
  14. 's3',
  15. aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
  16. aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
  17. )
  18. bucket_name = 'k8-crash-logs'
  19.  
  20. # RingCentral webhook URL
  21. ringcentral_webhook_url = 'https://hooks.ringcentral.com/'
  22.  
  23. # Function to get pod logs
  24. def get_pod_logs(pod_name, namespace='default', previous=False):
  25. logs = v1.read_namespaced_pod_log(name=pod_name, namespace=namespace, previous=previous)
  26. return logs
  27.  
  28. # Function to upload logs to S3 and generate a presigned URL valid for 5 days
  29. def upload_to_s3_and_get_url(log_data, bucket_name, namespace, pod_name, container_name):
  30. try:
  31. timestamp = int(time.time())
  32. file_name = f"{namespace}/{pod_name}-{container_name}-restart-log-{timestamp}.txt"
  33. s3.put_object(Body=log_data, Bucket=bucket_name, Key=file_name)
  34. url = s3.generate_presigned_url(
  35. 'get_object',
  36. Params={'Bucket': bucket_name, 'Key': file_name},
  37. ExpiresIn=5 * 24 * 60 * 60 # URL valid for 5 days (in seconds)
  38. )
  39. return url
  40. except s3.exceptions.NoSuchBucket:
  41. print(f"Bucket {bucket_name} does not exist. Please create it or check the bucket name.")
  42. return None
  43.  
  44. # Function to analyze log file and remove color encoding
  45. def analyze_log(log_data):
  46. try:
  47. # Run grep command to filter log data and remove color encoding
  48. grep_command = "grep -P -A 2 'stack:' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 3"
  49. result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
  50. # Check if the result is empty
  51. if result.stdout.strip() == "":
  52. # Update grep command
  53. grep_command = "grep -P '(WARN|ERROR|^\tat |Exception|^Caused by: |\t... \d+ more)' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 1"
  54.  
  55. # Re-run the updated grep command
  56. result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
  57. if result.stdout.strip() == "":
  58. # Update grep command
  59. grep_command = "grep -P -A 2 'rror:' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 3"
  60.  
  61. # Re-run the updated grep command
  62. result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
  63. if result.stdout.strip() == "":
  64. # Update grep command
  65. grep_command = "grep -P -A 2 'rror' | sed 's/\x1b\[[0-9;]*m//g' | tail -n 3"
  66.  
  67. # Re-run the updated grep command
  68. result = subprocess.run(grep_command, input=log_data, text=True, shell=True, capture_output=True)
  69.  
  70. filtered_logs = result.stdout.strip()
  71. if len(filtered_logs) < 400:
  72. return filtered_logs
  73. else:
  74. return None
  75. except Exception as e:
  76. print(f"Error analyzing log: {e}")
  77. return None
  78.  
  79. # Function to send a message to RingCentral webhook with log analysis result
  80. def notify_ringcentral(pod_name, obj_url, log_analysis_result):
  81. if log_analysis_result:
  82. adaptive_card_json = {
  83. "attachments": [
  84. {
  85. "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
  86. "type": "AdaptiveCard",
  87. "version": "1.0",
  88. "body": [
  89. {
  90. "type": "TextBlock",
  91. "text": f"[{pod_name} crashed]({obj_url})",
  92. "weight": "bolder",
  93. "size": "medium",
  94. "wrap": True
  95. },
  96. {
  97. "type": "FactSet",
  98. "facts": [
  99. {
  100. "title": f"{log_analysis_result}",
  101. "value": ""
  102. }
  103. ]
  104. }
  105. ]
  106. }
  107. ]
  108. }
  109. else:
  110. adaptive_card_json = {
  111. "attachments": [
  112. {
  113. "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
  114. "type": "AdaptiveCard",
  115. "version": "1.0",
  116. "body": [
  117. {
  118. "type": "TextBlock",
  119. "text": f"[{pod_name} crashed]({obj_url})",
  120. "weight": "bolder",
  121. "size": "medium",
  122. "wrap": True
  123. }
  124. ]
  125. }
  126. ]
  127. }
  128.  
  129. headers = {
  130. 'Content-Type': 'application/json'
  131. }
  132.  
  133. response = requests.post(ringcentral_webhook_url, json=adaptive_card_json, headers=headers)
  134.  
  135. if response.status_code == 200:
  136. print(f"Notification sent to RingCentral: [{pod_name} crashed]({obj_url})")
  137. else:
  138. print(f"Failed to send notification to RingCentral: {response.text}")
  139.  
  140. # Store previous restart counts to detect restarts
  141. pod_restart_counts = {}
  142.  
  143. last_seen_version = ""
  144.  
  145. # Watch for pod events in all namespaces
  146. w = watch.Watch()
  147.  
  148. try:
  149. while True:
  150. # Use the last_seen_version in the stream call
  151. for event in w.stream(v1.list_pod_for_all_namespaces, resource_version=last_seen_version):
  152. pod = event['object']
  153. pod_name = pod.metadata.name
  154. namespace = pod.metadata.namespace
  155.  
  156. # Skip processing if the namespace is "log-grabber" or "datastore-worker"
  157. if namespace in ["log-grabber", "datastore-worker"]:
  158. continue
  159.  
  160. # Update last_seen_version after processing each event
  161. last_seen_version = pod.metadata.resource_version
  162.  
  163. if event['type'] == 'MODIFIED' and pod.status.container_statuses is not None:
  164. for container_status in pod.status.container_statuses:
  165. container_name = container_status.name
  166. restart_count = container_status.restart_count
  167. previous_restart_count = pod_restart_counts.get((namespace, pod_name, container_name), 0)
  168.  
  169. if restart_count > previous_restart_count:
  170. print(f"Container {container_name} in pod {pod_name} in namespace {namespace} has restarted.")
  171. pod_restart_counts[(namespace, pod_name, container_name)] = restart_count
  172.  
  173. try:
  174. logs = get_pod_logs(pod_name, namespace, previous=True)
  175. obj_url = upload_to_s3_and_get_url(logs, bucket_name, namespace, pod_name, container_name)
  176.  
  177. if obj_url:
  178. log_analysis_result = analyze_log(logs)
  179. notify_ringcentral(pod_name, obj_url, log_analysis_result)
  180. else:
  181. print(f"Failed to generate presigned URL for {file_name}")
  182. except Exception as e:
  183. print(f"Failed to get or upload logs for pod {pod_name} in namespace {namespace}: {e}")
  184.  
  185. # Update the pod restart counts
  186. if pod.status.container_statuses is not None:
  187. for container_status in pod.status.container_statuses:
  188. container_name = container_status.name
  189. restart_count = container_status.restart_count
  190. pod_restart_counts[(namespace, pod_name, container_name)] = restart_count
  191.  
  192. except Exception as e:
  193. print(f"An error occurred: {e}")
  194. time.sleep(10) # Add a delay before retrying
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement