from subprocess import call
import os
from datetime import datetime, timedelta, date
import calendar
import sys
import socket
def get_filenames_in_dir(dir_path):
file_name_list = []
for (dirpath, dirnames, filenames) in os.walk(dir_path):
file_name_list.extend(filenames)
break
return file_name_list
def grant_full_access_for_dir(dir_path):
# grant access permission for the tmp file
cmd = ['sudo', 'chmod', '-R', '777', dir_path]
print ' '.join(cmd)
call(cmd)
def copy_file_to_hdfs(file_path, dst_hdfs_path):
try:
# overwrite the file in HDFS if it exists already
cmd = ['hadoop', 'fs', '-copyFromLocal', '-f', file_path, dst_hdfs_path]
print ' '.join(cmd)
call(cmd)
except OSError:
print 'Cannot run hadoop command in the host. Cmd: ' + ' '.join(cmd)
def copy_logs_to_hdfs(src_log_dir_path, dst_hdfs_dir_path, start_time, end_time):
# scan audit logs and find the logs created on a specified date
file_name_list = get_filenames_in_dir(src_log_dir_path)
# verify the file name, and only process log files
# example name: impala_audit_event_log_1.0-1464403015360
prefix = 'impala_audit_event_log'
version = '1.0'
prefix_len = len(prefix)
version_len = len(version)
timestamp_len = 13
file_name_len = prefix_len + version_len + timestamp_len + 2
version_offset = prefix_len + 1
timestamp_offset = version_offset + version_len + 1
# the file just before the start_time
pre_file_time = 0
pre_file_name = None
# find all files created inside [start_time, end_time]
# and the file created just before start_time (because this file should be updated here)
for file_name in file_name_list:
# skip the file if the file name length does not match
if len(file_name) != file_name_len:
continue
# skip the file if the file name format does not match
if file_name[version_offset - 1:version_offset] != '_' or \
file_name[timestamp_offset - 1:timestamp_offset] != '-' or \
file_name[0:prefix_len] != prefix:
continue
# extract the version of audit log
current_version = file_name[version_offset:version_offset + version_len]
if current_version != version:
print 'Audit log version does not match, file name: ' + current_version
continue
# extract time stamp
timestamp = -1
try:
timestamp_str = file_name[timestamp_offset:timestamp_offset + timestamp_len]
timestamp = long(timestamp_str)
except ValueError:
continue
if timestamp < start_time and pre_file_time < timestamp:
pre_file_name = file_name
pre_file_time = timestamp
continue
# if the timestamp is outside [start_time, end_time], then skip
if timestamp < start_time or timestamp >= end_time:
continue
# for legal log file names whose timestamp is inside [start_time, end_time]
file_path = src_log_dir_path + '/' + file_name
dst_hdfs_path = dst_hdfs_dir_path + '/' + file_name
copy_file_to_hdfs(file_path, dst_hdfs_path)
# copy the file created just before the start_time to hdfs
if pre_file_name is not None:
file_path = src_log_dir_path + '/' + pre_file_name
dst_hdfs_path = dst_hdfs_dir_path + '/' + pre_file_name
copy_file_to_hdfs(file_path, dst_hdfs_path)
if __name__ == "__main__":
host_id = socket.gethostbyname(socket.gethostname())
if len(sys.argv) < 3:
print 'Please specify the source log directory and HDFS log directory. For example,\n' + \
'python fetch-log source-log-dir hdfs-log-dir\n' + \
'or\n' + \
'python fetch-log source-log-dir hdfs-log-dir start-date end-date\n' + \
'date inputs are in format YYYY-MM-DD'
sys.exit(0)
src_log_dir_path = sys.argv[1]
dst_hdfs_dir_path = sys.argv[2] + '/' + str(host_id)
today_date = date.today()
one_day = timedelta(days=1)
yesterday_date = today_date - one_day
# by default, the script only fetches log files for folder for yesterday
start_date = yesterday_date
end_date = today_date
try:
if len(sys.argv) == 5:
start_date = datetime.strptime(sys.argv[3], "%Y-%m-%d")
end_date = datetime.strptime(sys.argv[4], "%Y-%m-%d")
except ValueError:
print 'Please input the dates in YYYY-MM-DD format.'
sys.exit(0)
# start time is the 00:00:00 of the start date in milliseconds
start_time = calendar.timegm(start_date.timetuple()) * 1000
# end time is the 00:00:00 of the end date in milliseconds
end_time = calendar.timegm(end_date.timetuple()) * 1000
print 'starting from ' + str(start_time) + ' ms'
print 'ending at ' + str(end_time) + ' ms'
# create the directory of dst_hdfs_dir_path in hdfs
try:
cmd = ['hadoop', 'fs', '-test', '-e', dst_hdfs_dir_path]
if call(cmd) != 0:
cmd = ['hadoop', 'fs', '-mkdir', dst_hdfs_dir_path]
print ' '.join(cmd)
call(cmd)
except OSError:
print 'Cannot run hadoop command in the host. Cmd: ' + ' '.join(cmd)
sys.exit(0)
# hadoop user may not have access to audit log, so we grant full access to it.
# customers may want to modify this according to their security policy.
grant_full_access_for_dir(src_log_dir_path)
# put log files in tmp dir to hadoop
copy_logs_to_hdfs(src_log_dir_path, dst_hdfs_dir_path, start_time, end_time)