ELK-Python(一),,不具有通用性,留作纪


不具有通用性,留作纪念。

[[email protected] python]# cat insert_active_user.py #!/usr/bin/env python# -*- coding:utf-8 -*-from datetime import *from with_conn_to_db import conn_to_mysqlimport urllib2,jsonimport time###define yestoday 0-24 hours delta part##########today = date.today()yestoday = today - timedelta(days=1)#print today,yestodaya = str(yestoday) + ‘ ‘ + ‘00:00:00‘b = str(today) + ‘ ‘ + ‘00:00:00‘timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S")timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S")start_time = int(time.mktime(timeArray1)) * 1000end_time = int(time.mktime(timeArray2)) * 1000#####define es index and search part########server = ‘http://elk.xkops.com:9200/‘#stat_index = ‘client-visit-*‘index=‘client-*‘#start_time = 1459146210879#stop_time = 1459147110879url = server + index + "/_search?pretty=true"query_date={  "query": {    "filtered": {      "query": {        "query_string": {          "query": "*",          "analyze_wildcard": True        }      },      "filter": {        "bool": {          "must": [            {              "range": {                "@timestamp": {                  "gte": start_time,                  "lte": end_time,                  "format": "epoch_millis"                }              }            }          ],          "must_not": []        }      }    }  },  "size": 0,  "aggs": {    "2": {      "terms": {        "field": "visit_tenant_id",        "size": 10000000,        "order": {          "_count": "desc"        }      },      "aggs": {        "3": {          "terms": {            "field": "user_id",            "size": 0,            "order": {              "_count": "desc"            }          },          "aggs": {            "4": {              "terms": {                "field": "ip_address",                "size": 1,                "order": {                  "_count": "desc"                }              }            }          }        }      }    }  }}query_date = json.dumps(query_date)req = urllib2.Request(url,query_date)response = urllib2.urlopen(req)page = response.read()#print pageresult = json.loads(page)###避免当天多次插入,插入前先删除#######sql = "delete from active_user where create_time = ‘%s‘" % (yestoday)with conn_to_mysql(‘logstash‘) as db:    db.execute(sql)for s in result[‘aggregations‘][‘2‘][‘buckets‘]:    #print s    tenant_id =  s[‘key‘]    if len(s[‘3‘][‘buckets‘]) != 0:        for a in  range(len(s[‘3‘][‘buckets‘])):            user_id = s[‘3‘][‘buckets‘][a][‘key‘]            ip_address = s[‘3‘][‘buckets‘][a][‘4‘][‘buckets‘][0][‘key‘]            #print tenant_id,user_id,ip_address            sql = "insert into active_user(tenant_id,create_time,user_id,ip_addr) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘)" %(tenant_id,yestoday,user_id,ip_address)            #print sql            with conn_to_mysql(‘logstash‘) as db:                db.execute(sql)    else:        continue

ELK-Python(一)

相关内容

    暂无相关文章

评论关闭