重点URL访问监控-当天和前一天相同时间段内的访问对比,url当天,该脚本实现当天的重点UR
重点URL访问监控-当天和前一天相同时间段内的访问对比,url当天,该脚本实现当天的重点UR
该脚本实现当天的重点URL访问量和头一天相同小时时间段内的访问量对比,超过指定的阈值则报警通知并且记录日志
#! /usr/local/bin/python3 import bz2 import re import contextlib import shelve import datetime import sys #enviroment.py为提供一些公共函数的函数库,部分使用到的函数下文给出 import enviroment as eo ''' def get_log(log_filename,log_format='[%(asctime)-15s] [%(levelname)s] [%(filename)s] [%(funcName)s] [%(lineno)d] %(message)s',log_level='debug'): if log_level=='debug': logging.basicConfig(format=log_format,filename=log_filename,level=logging.DEBUG) def get_local_ip_tag(): short_ip=subprocess.check_output('/sbin/ifconfig | grep eth0 -A1 | tail -1 | awk \\'{print $2}\\' | awk -F "." \\'{print $4}\\'',shell=True).decode().strip() return short_ip+'log:' #下文中的 eo.sms_XX(message) 函数都是对该函数的再包装,提供receivers def send_message(receivers,message): for receiver in receivers: warn_url='http_sendmessage_interface?' query_args={'username':receivers[receiver],'message':get_local_ip_tag()+message} encoded_args=urllib.parse.urlencode(query_args) warn_url=warn_url+encoded_args response=urllib.request.urlopen(warn_url) '''def generate_previous_hour(): format='%d/%b/%Y:%H' previous_hour_time=datetime.datetime.today().timetuple().tm_hour-1 return previous_hour_time,(datetime.datetime.today()-datetime.timedelta(hours=1)).strftime(format) def check_hour_logs(logpath,keys,examine_hour): result={key:0 for key in keys} regex_previous_hour=re.compile(examine_hour) #由于日志是按照时间顺序,而且时间戳字段line.split(' ')[3]在一个月内可以按照字典顺序比较,这里可以改进成时间比较。cron的本身暂时设定为0点小时不执行,所以不会触发跨月的这个漏洞,这里不进行时间比较还有跟实际使用脚本相关的其他原因,字典顺序够用。搜索指定小时的日志,可以跨越式前进搜索,这里增量设定为10M,可以根据实际的日志大小进行设定 step=10*1024*1024 with open(logpath) as file: line=file.readline() while line: time_line=line.split(' ')[3][1:] if time_line>examine_hour: file.seek(file.tell()-step-10240) #再读一行保证下一行为完整行 file.readline() break file.seek(file.tell()+step) file.readline() line=file.readline().strip() for line in file: line=line.strip() for key in keys: regex_key=re.compile(key) words=line.split(' ') #words的第四个字段为时间戳字段、第七个字段为访问URL if regex_previous_hour.search(words[3]) and regex_key.search(words[6]): result[key]+=1 return result def compare_data(db,previous_hour_time,examine_keys): with contextlib.closing(shelve.open(db)) as dba: yesterday_key='yesterday' today_key='today' if yesterday_key in dba: try: yesterday=dba[yesterday_key] today=dba[today_key] for key in examine_keys: #报警阈值为今天的某个URL访问量大于400,并且是昨天相同时间段的两倍 if int(today[previous_hour_time][key]) > 2*int(yesterday[previous_hour_time][key]) and int(today[previous_hour_time][key])>400: eo.logging.error('alarm. %s too large.the number of visits is %s',key,today[previous_hour_time][key]) message='alarm. {0} too large.the number of visits is {1}'.format(key,today[previous_hour_time][key]) eo.sms_XX(message) except KeyError: eo.logging.warning('%s',sys.exc_info()[0:2]) def store_data(db,previous_hour_data,previous_hour_time): yesterday_key='yesterday' today_key='today' with contextlib.closing(shelve.open(db,writeback=True)) as dba: if today_key not in dba: today={} else: today=dba[today_key] today[previous_hour_time]=previous_hour_data if int(previous_hour_time)==23: dba[yesterday_key]=dba[today_key].copy() #以下函数可以提供给其他脚本使用,方便操作db,比如使用print_all_data可以打印出存储数据,这些函数也可以放在environment.py中def put_data(filename,key,saved_data): with contextlib.closing(shelve.open(filename)) as dba: dba[key]=saved_data def get_data(filename,key): with contextlib.closing(shelve.open(filename)) as dba: return dba[key] def print_all_data(filename): with contextlib.closing(shelve.open(filename)) as dba: for key in dba: print(key+':'+str(dba[key])) def delete_data(filename,key=0): with contextlib.closing(shelve.open(filename)) as dba: try: if key: del dba[key] else: dba.clear() except KeyError: print('find a KeyError no key:'+key) def main(log_name,db,examine_keys,my_log): eo.get_log(my_log) previous_hour_time,previous_hour=generate_previous_hour() previous_hour_data=check_hour_logs(log_name,examine_keys,previous_hour) store_data(db,previous_hour_data,previous_hour_time) compare_data(db,previous_hour_time,examine_keys) if __name__ == '__main__': log_name='' basedir='' db=basedir+'/examine_important_url_hours.db' my_log=basedir+'/run.log' examine_keys=[] main(log_name,db,examine_keys,my_log)#该片段来自于http://byrx.net
相关内容
- 给出目录下包括子目录所有文件的绝对路径,目录绝对
- windows下Python通过PIL写入字体出现“The _imagingft C module
- 对字典的简单实用,字典简单实用,#coding=utf8
- python创建一个最简单的http webserver服务器,pythonwebserv
- LU decomposition of symetric pentadiagonal matrix in Python,,''' d,
- python中使用尾递归代码范例,python尾递归范例,# This p
- python提取url中的域名和端口号,pythonurl,import urlli
- python执行get提交的操作,pythonget提交,import sys,
- django获得用户ip地址,django获得ip,def get_clie
- python 装饰器记录日志,python日志,from time im
评论关闭