1.背景
剛到一家公司需要寫一個實時分析tshark捕捉到的數據,tshark一直往文本裡面寫數據,寫一個程序要實時獲取到添加的數據並進行分析處理最後入庫。此時思緒狂飛,想了一些比較挫的方法。
本人想到的方法:
1.每隔一定時間去查看下文件的mtime,如果有改動則讀取數據,並記錄讀取的行數。下次再去讀這個文件的數據則上次記錄的行數開始繼續讀。當文件行數太大的時候這個程序的效率就很慢了,也可以記錄上次讀取的字節數,然後使用linux下的open系統系統中的seek從指定位置處讀取。但是要是用C語言來寫的話後面的字符串處理和分析就比較麻煩了果斷放棄這個方案。
我們經理的方法:
1.利用linux中的pipe,這個方法很是有用。pipe就是管道,一個輸入端,一個輸出端。只要有數據輸入就立馬送到輸出端進行處理。此辦法效率還不錯。最終被設計成這樣:
1、tshark捕捉數據重定向到pipe中。
2、一個線程從pipe讀取數據處理後放入到隊列中。
3、另外一個線程從隊中取數據進行最後的 處理和入庫。
存在的問題:
此法很好,後來當我處理到一個問題的時候需要對實時對apache的日志分析並入庫,此時我就准備直接把經理寫好的程序demo拿來改改用。但是發現apache無法往pipe中寫,我將apache日志文件刪除後,重新建立成pipe格式的文件,然後寫程序讀這個pipe,發現讀不了。
可能是apache不支持往pipe中寫日志吧。這下歇菜了。
《Python開發技術詳解》.( 周偉,宗傑).[高清PDF掃描版+隨書視頻+代碼] http://www.linuxidc.com/Linux/2013-11/92693.htm
Python腳本獲取Linux系統信息 http://www.linuxidc.com/Linux/2013-08/88531.htm
Python下使用MySQLdb模塊 http://www.linuxidc.com/Linux/2012-06/63620.htm
最終的解決方案:
tail -f + pipe 搞定。我們都知道tail -f 一個文件的時候會實時顯示文件新增的內容。然後python中的subprocess.Popen中可以指定輸出到PIPE中stdout=subprocess.PIPE,這樣的話只要一個while每次去讀pipe中數據就OK了。
2.方法
經理的方法(為了保密代碼做了一定的處理):
#!/usr/bin/env python2.7
import datetime
import time
from CURD import Field, Model, Database
import threading
import Queue
import os
CONFIG = {
}
class dami_log(Model):
date = Field()
netloc = Field()
ip = Field()
path = Field()
cookie = Field()
mac = Field()
cid = Field()
class dami_case(Model):
id = Field()
case_name = Field()
save_time = Field()
is_current = Field()
def thread_read_pipe(q):
if not os.path.exists(CONFIG['pipe']):
os.mkfifo(CONFIG['pipe'], 0777)
f = file(CONFIG['pipe'], 'r')
rest = ''
while True:
讀取數據
dat = f.readline()
數據處理.....
放入隊列
q.put(dat)
def thread_dump(q,cid):
while True:
try:
從隊列中獲取數據
line = q.get()
數據處理
dat = line.split('\t')
if any(map(lambda x:dat[3].endswith(x) or (x+"?" in dat[3]),["js","css","ico","jpg","png","gif"])) or dat[3]=="*" or "192.168.10.1" in dat[1]:
print line
else:
payload = {
'date': dat[0] \
if '.' not in dat[0] \
else datetime.datetime.fromtimestamp(
time.mktime(time.strptime(dat[0].rsplit('.', 1)[0],
'%b %d, %Y %X'))),
'netloc': dat[1],
'ip': dat[2],
'path': dat[3],
'cookie': dat[4],
'mac': dat[5],
'cid':cid
}
dami_log(**payload).save()
except Exception as e:
print e
pass
def _main():
CONFIG['db_user'] = '***'
CONFIG['db_passwd'] = '***'
CONFIG['db'] = '***'
CONFIG['pipe'] = '/var/www/script/tsharklog/log.log'
Database.config(user=CONFIG['db_user'],
passwd=CONFIG['db_passwd'],
db=CONFIG['db'])
cid = dami_case.where(is_current=1).getone()
q = Queue.Queue()
trp = threading.Thread(target=thread_read_pipe, args=(q,))
trp.start()
td = threading.Thread(target=thread_dump, args=(q,cid.id))
td.start()
if __name__ == '__main__':
_main()
更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-07/104741p2.htm