python实现skywalking的trace模块过滤和报警(实例代码)

  skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发。所以自己就用python对skywalking做了二次数据清洗实现。项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习。下面简单列出了代码内容:

sw-trace.py

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
import smtplib
from email.mime.text import MIMEText
import re
def interface_content_filter(trace_id):
    \'\'\'
    对详细日志内容(业务逻辑报错)进行过滤
    :param trace_id:
    :return: 【1|0】
    \'\'\'
    url = \"http://172.16.53.232:50001/query\"
    params = {
        \"trace_id\": trace_id
    }
    detail_trace_id_log = requests.request(method=\"GET\",url=url,params=params)
    detail_trace_id_log = detail_trace_id_log.text
    print(detail_trace_id_log)
    print(type(detail_trace_id_log))
    with open(\"blackname_keyword_list\",\"r\",encoding=\"utf-8\") as f:
        for line in f:
            print(line)
            result = re.search(line.strip(),detail_trace_id_log)
            print(result)
            if result != None:
                print(\"哥们匹配到日志黑名单关键字了:%s\" % line)
                return 0
    print(\"提示:%s不在关键字黑名单中\" % trace_id)
    return 1
def interface_filter(endpointName):
    \"\"\"
    设置接口黑名单
    :param endpointName:
    :return: 【1|0】
    \"\"\"
    endpointName = re.sub(\"\\(|\\)\",\".\",endpointName)
    with open(\"blackname_list\",\"r\",encoding=\"utf-8\") as f:
        bn_list = f.read()
    match_result = re.search(endpointName.strip(),bn_list)
    if match_result == None:
        print(\"提示:接口不存在黑名单中\")
        return 1
    print(\"提示:接口在黑名单中\")
    return 0
def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr):
    \"\"\"
    skywalking trace功能对错误接口进行过滤,默认最大一次获取2000条数据,每分钟执行一次
    :param start_time:
    :param end_time:
    :return:
    \"\"\"
    url = sw_url
    data = {
          \"query\": \"query queryTraces($condition: TraceQueryCondition) {\\n  data: queryBasicTraces(condition: $condition) {\\n    traces {\\n      key: segmentId\\n      endpointNames\\n      duration\\n      start\\n      isError\\n      traceIds\\n    }\\n    total\\n  }}\",
          \"variables\": {
            \"condition\": {
              \"queryDuration\": {
                \"start\": start_time, #\"2021-12-07 1734\"
                \"end\": end_time,
                \"step\": \"MINUTE\"
              },
              \"traceState\": \"ERROR\",
              \"paging\": {
                \"pageNum\": 1,
                \"pageSize\": per_page_size,
                \"needTotal\": \"true\"
              },
              \"queryOrder\": \"BY_START_TIME\"
              # \"traceId\": \"b669d0069be84fce82261901de412e7c.430.16388637511348105\"
            }
          }
        }
    result = requests.request(method=\"post\",url=url,json=data)
    i = 0
    # print(result.content)
    # print(time.strftime(\"%Y-%m-%d %H:%M:%S\",time.localtime(float(\"%s.%s\" % (trace[\"start\"][0:10],trace[\"start\"][10:])))))
    with open(\"mail.html\",\"w\",encoding=\"utf-8\") as f:
        f.write(\'<head><meta charset=\"UTF-8\"><title>Title</title><style>.t {border-right: 2px solid black;border-bottom: 2px solid black;}.t th,td {border-top: 2px solid black;border-left: 2px solid black;font-size: 10px;}</style></head><body><div style=\"color:red;font-size=15px;\">最近15分钟统计:</div><table class=\"t\" border=\"0\" cellspacing=\"0\" cellpadding=\"10px\"><thead><tr style=\"<th style=\"width: 100px;\">时间</th><th>持续时长</th><th>接口名称</th><th>追踪ID</th></tr></thead><tbody>\')
    for trace in result.json()[\"data\"][\"data\"][\"traces\"]:
        # print(trace[\"endpointNames\"])
        print(\"时间:%s\\n\" % time.strftime(\"%Y-%m-%d %H:%M:%S\",time.localtime(float(\"%s.%s\" % (trace[\"start\"][0:10],trace[\"start\"][10:])))),
              \"持续时长:%s\\n\" % trace[\"duration\"],
              \"接口名称:%s\\n\" % trace[\"endpointNames\"][0],
              \"跟踪ID:%s\" % trace[\"traceIds\"][0])
        # print(time.localtime(1638869640.194))
        i+=1
        print(i)
        s_time = time.strftime(\"%Y-%m-%d %H:%M:%S\",time.localtime(float(\"%s.%s\" % (trace[\"start\"][0:10],trace[\"start\"][10:]))))
        dur_time =  trace[\"duration\"]
        endpointName = trace[\"endpointNames\"][0]
        trace_id = trace[\"traceIds\"][0]
        # 调用接口黑名单过滤功能
        result = interface_filter(endpointName)
        if result == 0:
            print(\"哥们进入黑名单了!\",endpointName)
            continue
        # 调用关键字黑名单过滤功能
        keyword_result = interface_content_filter(trace_id)
        if keyword_result == 0:
            print(\"哥们进入关键字黑名单了!\", trace_id)
            continue
        with open(\"mail.html\",\"a\",encoding=\"utf-8\") as f:
            f.write(\'<tr><td>%s</td><td>%s</td><td>%s</td><td><a href=\"http://%s/query?trace_id=%s\" rel=\"external nofollow\" >%s</a></td></tr>\' %(s_time,dur_time,endpointName,trace_detail_addr,trace_id,trace_id))
    with open(\"mail.html\",\"a\",encoding=\"utf-8\") as f:
        f.write(\'</tbody></table></body>\')
def send_mail(receiver):
    \"\"\"
    发送报错接口邮件
    :return:
    \"\"\"
    server = \"mail.test.com\"
    sender = \"sa@test.com\"
    sender_pwd = \"1qaz@WSX\"
    send_addr = \"sa@test.com\"
    receiver = receiver
    with open(\"mail.html\",\"r\",encoding=\"utf-8\") as f:
        content = f.read()
    if re.search(\"<td>\",content) == None:
        print(\"无报错接口!\",content)
        return 0
    print(\"邮件前\",content)
    msg_mail = MIMEText(content,\"html\",\"utf-8\")
    msg_mail[\"Subject\"] = \"Skywalking报错接口统计\"
    msg_mail[\"From\"] = sender
    msg_mail[\"To\"] = receiver
    server_obj = smtplib.SMTP_SSL(server)
    server_obj.connect(server,465)
    server_obj.login(sender,sender_pwd)
    server_obj.sendmail(send_addr,receiver,msg_mail.as_string())
if __name__ == \"__main__\":
    # 设定查询时间间隔,默认900s(15min)
    end_time = time.time()
    start_time = end_time - 900
    start_time=time.strftime(\"%Y-%m-%d %H%M\",time.localtime(start_time))
    end_time = time.strftime(\"%Y-%m-%d %H%M\", time.localtime(end_time))
    print(start_time)
    print(end_time)
    sw_url = \"http://172.16.53.232:9412/graphql\" # skywalking的前端服务的地址和端口
    per_page_size = 5000  #指定一次获取endpoint接口的数目
    trace_detail_addr = \"127.0.0.1:5000\" #指定查询指定trace_id详细日志
    receiver = \"shy@test.com\"  #报警邮件接收人地址
    trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr)
    send_mail(receiver)
    # interface_filter()
    # interface_content_filter(\"3c4212dd2dd548d394ba312c4619405d.104.16390380592724487\")

sw-trace-id.py

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
from flask import Flask,request
app = Flask(__name__)
@app.route(\"/query\",methods=[\"get\"])
def trace_id_query():
    \"\"\"
    查询指定trace_id详细日志信息
    :return: f.read()
    \"\"\"
    trace_id = request.args.get(\"trace_id\")
    url=\"http://172.16.53.232:9412/graphql\"
    # url=\"http://skywalking.roulw.com/graphql\"
    data = {
      \"query\": \"query queryTrace($traceId: ID!) {\\n  trace: queryTrace(traceId: $traceId) {\\n    spans {\\n      traceId\\n      segmentId\\n      spanId\\n      parentSpanId\\n      refs {\\n        traceId\\n        parentSegmentId\\n        parentSpanId\\n        type\\n      }\\n      serviceCode\\n      serviceInstanceName\\n      startTime\\n      endTime\\n      endpointName\\n      type\\n      peer\\n      component\\n      isError\\n      layer\\n      tags {\\n        key\\n        value\\n      }\\n      logs {\\n        time\\n        data {\\n          key\\n          value\\n        }\\n      }\\n    }\\n  }\\n  }\",
      \"variables\": {
        \"traceId\": trace_id
      }
    }
    result = requests.request(method=\"post\",url=url,json=data)
    with open(\"detail_log\", \"w\", encoding=\"utf-8\") as f:
        f.write(\"<div style=\'color: red;font-size: 30px;\'>生产Skywalking报错接口跟踪日志日志:<br /></div>\")
    for trace_id in result.json()[\"data\"][\"trace\"][\"spans\"]:
        if trace_id[\"isError\"]:
            # print(trace_id)
            print(\"服务名称:%s\\n\" % trace_id[\"serviceCode\"],
                  \"开始时间:%s\\n\" % trace_id[\"startTime\"],
                  \"接口名称:%s\\n\" % trace_id[\"endpointName\"],
                  \"peer名称:%s\\n\" % trace_id[\"peer\"],
                  \"tags名称:%s\\n\" % trace_id[\"tags\"],
                  \"详细日志:%s\" % trace_id[\"logs\"])
            content = \"服务名称:%s<br />开始时间:%s<br />接口名称:%s<br />peer名称:%s<br />tags名称:%s\" % (trace_id[\"serviceCode\"],trace_id[\"startTime\"],trace_id[\"endpointName\"],trace_id[\"peer\"],trace_id[\"tags\"])
            with open(\"detail_log\",\"a\",encoding=\"utf-8\") as f:
                f.write(content)
                f.write(\"<br />********详细日志**********<br />\")
            for logs in trace_id[\"logs\"]:
                for log in logs[\"data\"]:
                    if log[\"key\"] == \"message\":
                        print(log[\"value\"])
                        with open(\"detail_log\",\"a\",encoding=\"utf-8\") as f:
                            f.write(log[\"value\"])
                        # return log[\"value\"]
                    elif log[\"key\"] == \"stack\":
                        print(log[\"value\"])
                        with open(\"detail_log\",\"a\",encoding=\"utf-8\") as f:
                            f.write(log[\"value\"])
            with open(\"detail_log\", \"a\", encoding=\"utf-8\") as f:
                f.write(\"<div style=\'color: red;font-size: 20px;\'><br />========下一个接口信息=========<br /></div>\")
    with open(\"detail_log\",\"r\",encoding=\"utf-8\") as f:
        return f.read()
if __name__ == \"__main__\":
    # trace_id = \"14447ae7199c40a2b9862411daba180b.2142.16388920322367785\"
    # trace_id_query(trace_id)
    app.run()
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容