Filebeat7 Kafka Gunicorn Flask Web应用程序日志采集,,本文的内容如何用fi
Filebeat7 Kafka Gunicorn Flask Web应用程序日志采集,,本文的内容如何用fi
本文的内容
如何用filebeat kafka es做一个好用,好管理的日志收集工具放弃logstash,使用elastic pipelinegunicron日志格式与filebeat/es配置flask日志格式与异常日志采集与filebeat/es配置以上的配置概况
我有一个HTTP请求,经过的路径为
Gateway(kong)-->WebContainer(gunicorn)-->WebApp(flask)
我准备以下流向处理我的日志
file --> filebeat --> kafka topic--> filebeat --> elastic pipeline --> elasticsearch | | ----------> HBase
为什么这么做
Logstash去哪里了?
Logstash太重了,不过这不是问题,也就是多个机器加点钱的问题。能把事情处理就行。Logstash不美,Logstash虽然是集中管理配置,但是一个logstash好像总是不够,Logstash好像可以分开配置,但是你永远不知道如何划分哪些配置应该放在一个配置文件,哪些应该分开。删除一个配置?不可能的,我怎么知道应该删除什么配置。如果用了Logstash. As a ‘poor Ops guys having to understand and keep up with all the crazy input possibilities. ^_^Filebeat的痛处
看看这个Issue吧, 万人血书让filebeat支持grok, 但是就是不支持,不过给了我们两条路,比如你可以用存JSON的日志啊, 或者用pipelineFilebeat以前是没有一个好的kafka-input。只能自己写kafka-es的转发工具简单点
我想要的日志采集就是简简单单,或者说微服务的内聚力。 一条日志采集线就不该和其他业务混合。最好的就是以下这种状态
onefile -> filebeat_config -> kafka_topic -> filebeat_config -> elastic pipepline -> es index
Gunicorn日志
gunicorn日志
gunicorn日志采集如下的信息
timeclient_iphttp methodhttp schemeurlurl query stringresponse status codeclient namerttrace idremote ips
日志格式
%(t)s [%(h)s] [%(m)s] [%(H)s] [%(U)s] [%(q)s] [%(s)s] [%(a)s] [%(D)s] [%({Kong-Request-ID}i)s] [%({X-Forwarded-For}i)s]
日志例子
[15/Nov/2019:10:23:37 +0000] [172.31.37.123] [GET] [HTTP/1.1] [/api/v1/_instance/json_schema/Team/list] [a=1] [200] [Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36] [936] [9cbf6a3b-9c3a-4835-a2ef-02e03ee826d7#16] [137.59.103.3, 172.30.17.253, 172.30.18.12]
Es processing解析
es processing是6.0之后的功能,相当于es之前自带了一个logstash.对于复杂日志有多种processing,
可以使用grok或者dissect.某些情况下dissect更加快一些.
经过kafka,再有filebeat打到ES, 需要删除多余的信息
PUT _ingest/pipeline/gunicorn{ "description" : "devops gunicorn pipeline", "processors" : [ { "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]} }, { "json": { "field": "message", "add_to_root": true } }, { "remove": {"field": ["@metadata", "ecs", "agent", "input"]} }, { "dissect" : { "field": "message", "pattern": "[%{@timestamp}] [%{client_ip}] [%{method}] [%{scheme}] [%{path}] [%{query_string}] [%{status}] [%{client}] [%{rt_millo}] [%{trace_id}] [%{remote_ips}]" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed-{{ _index }}" } } ]}
Es mapping
这里比较关键的是ES时间格式文档的定义, 如果某些字段我们觉得有必要分词,就是用text。否则使用keyword。这样可以更加
方便的聚合和查询日志数据, 开启_source方便做一些数据统计
PUT _template/gunicorn{ "index_patterns": ["*gunicorn*"], "settings": { "number_of_shards": 1 }, "version": 1, "mappings": { "_source": { "enabled": true }, "properties": { "@timestamp": { "type": "date", "format": "dd/LLL/yyyy:HH:mm:ss Z" }, "client_ip": {"type": "ip" }, "method": { "type": "keyword" }, "scheme": { "type": "keyword" }, "path": { "type": "text" }, "query_string": { "type": "text" }, "status": { "type": "integer" }, "client": { "type": "text" }, "rt_millo": { "type": "long" }, "trace_id": { "type": "keyword" }, "remote_ips": { "type": "text" } } }}
filebeat 采集到kafka配置文件
filebeat.inputs: - type: log paths: - /yourpath/gunicorn-access.log multiline.pattern: '^\[' multiline.negate: true multiline.match: after tail_files: truequeue.mem: events: 4096 flush.min_events: 512 flush.timeout: 5soutput.kafka: hosts: ["kafka-01","kafka-02","kafka-03"] topic: 'gunicron_access' required_acks: 1 compression: gzip max_message_bytes: 1000000
filebeat 从kafka消费配置文件
filebeat.inputs:- type: kafka hosts: ["kafka-01","kafka-02","kafka-03"] topics: ["gunicron_access"] group_id: "filebeat_gunicron"output.elasticsearch: hosts: ["es-url"] pipeline: "gunicorn" index: "gunicorn-%{+yyyy.MM.dd}" setup.template.name: "gunicorn"setup.template.pattern: "gunicorn-*"setup.ilm.enabled: falsesetup.template.enabled: false
Flask日志
Flask日志是我们程序打印的,用于查看一些异常和错误的日志。在上线初期,info日志是可以打开debug的日志的。这样方便我们进行调试。
在稳定之后应该将日志接受级别调高。info日志不适合做统计,只是除了问题我们可以快速定位问题所在。 异常应该打到info日志中
INFO日志可以使用我建议的格式。我们关心
timelevelname: 日志级别host, process, thread: 用于定位到某台机器的某个进程下的某个线程(一些复杂的bug需要,或者开启了异步进程)name, funcname, filename, lineno: 用于定位日志发生的代码位置message: 日志内容日志格式
{ "format": "[%(asctime)s.%(msecs)03d] [%(levelname)s] [{}:%(process)d:%(thread)d] [%(name)s:%(funcName)s] [%(filename)s:%(lineno)d] %(message)s".format(HOST), "datefmt": "%Y-%m-%d %H:%M:%S"}
日志例子
[2019-11-18 08:47:49.424] [INFO] [cmdb-008069:5990:140482161399552] [cmdb:execute_global_worker] [standalone_scheduler.py:116] RUN_INFO: tiny_collector_ali starting at 2019-11-18 08:47:49, next run will be at approximately 2019-11-18 09:47:49[2019-11-18 08:11:27.715] [ERROR] [cmdb-008069:5985:140184204932928] [cmdb:common_handler] [error.py:48] 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.Traceback (most recent call last): File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1805, in full_dispatch_request rv = self.dispatch_request() File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1783, in dispatch_request self.raise_routing_exception(req) File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1766, in raise_routing_exception raise request.routing_exception File "/home/server/venv3/lib/python3.6/site-packages/flask/ctx.py", line 336, in match_request self.url_adapter.match(return_rule=True) File "/home/server/venv3/lib/python3.6/site-packages/werkzeug/routing.py", line 1799, in match raise NotFound()werkzeug.exceptions.NotFound: 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
Es processing解析
经过kafka,再有filebeat打到ES, 需要删除多余的信息
PUT _ingest/pipeline/info{ "description" : "devops info pipeline", "processors" : [ { "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]} }, { "json": { "field": "message", "add_to_root": true } }, { "remove": {"field": ["@metadata", "ecs", "agent", "input"]} }, { "dissect" : { "field": "message", "pattern": "[%{@timestamp}] [%{level}] [%{host}:%{process_id}:%{thread_id}] [%{name}:%{func_name}] [%{file}:%{line_no}] %{content}" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed-{{ _index }}" } } ]}
Es mapping
thread_id 要给一个long字段, python如果获取不到会给一个超出integer范围的数字
PUT _template/info{ "index_patterns": ["*info*"], "settings": { "number_of_shards": 1 }, "version": 1, "mappings": { "_source": { "enabled": true }, "properties": { "@timestamp": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss.SSS" }, "level": { "type": "keyword" }, "host": { "type": "keyword" }, "process_id": { "type": "integer" }, "thread_id": { "type": "long" }, "name": { "type": "keyword" }, "func_name": { "type": "keyword" }, "file": { "type": "keyword" }, "line_no": { "type": "integer" }, "content": { "type": "text" } } }}
filebeat 采集到Kafka配置文件
这里采用^\[20\d{2}来区分行首
filebeat.inputs: - type: log paths: - /you_path/app.log multiline.pattern: '^\[20\d{2}' multiline.negate: true multiline.match: after tail_files: truequeue.mem: events: 4096 flush.min_events: 512 flush.timeout: 5soutput.kafka: hosts: ["kafka-01", "kafka-02", "kafka-03"] topic: 'devops_app' required_acks: 1 compression: gzip max_message_bytes: 1000000
filebeat 从kafka消费配置文件
filebeat.inputs:- type: kafka hosts: ["kafka-01", "kafka-02", "kafka-03"] topics: ["devops_app"] group_id: "filebeat_app"output.elasticsearch: hosts: ["es_url"] pipeline: "info" index: "app-info-%{+yyyy.MM.dd}" setup.template.name: "info"setup.template.pattern: "app-info-*"setup.ilm.enabled: falsesetup.template.enabled: false
Filebeat7 Kafka Gunicorn Flask Web应用程序日志采集
相关内容
- python之random函数,pythonrandom函数, # random各
- python总结一,,1.python中的
- 使用python的几个小经验(查看文档),,好久没有水博客
- python算法习题1,,题目:有一组“+”和
- python爬虫所遇问题列举,,1、通过python
- python写一个随机点名软件,python随机点名,最近有个随机
- Python之游戏开发-飞机大战,Python游戏开发,#!/usr/bin
- ubuntu16 安装python3.6,ubuntu如何安装python,ubuntu16 默
- Python基础知识,python基础知识测试,1.初识python
- Python之路(六)---> 函数、变量,变量与函数,
评论关闭