Appendix

Impala QLI Script

from subprocess import call
import os
from datetime import datetime, timedelta, date
import calendar
import sys
import socket


def get_filenames_in_dir(dir_path):
    file_name_list = []
    for (dirpath, dirnames, filenames) in os.walk(dir_path):
        file_name_list.extend(filenames)
        break
    return file_name_list


def grant_full_access_for_dir(dir_path):
    # grant access permission for the tmp file
    cmd = ['sudo', 'chmod', '-R', '777', dir_path]
    print ' '.join(cmd)
    call(cmd)


def copy_file_to_hdfs(file_path, dst_hdfs_path):
    try:
        # overwrite the file in HDFS if it exists already
        cmd = ['hadoop', 'fs', '-copyFromLocal', '-f', file_path, dst_hdfs_path]
        print ' '.join(cmd)
        call(cmd)
    except OSError:
        print 'Cannot run hadoop command in the host. Cmd: ' + ' '.join(cmd)


def copy_logs_to_hdfs(src_log_dir_path, dst_hdfs_dir_path, start_time, end_time):
    # scan audit logs and find the logs created on a specified date
    file_name_list = get_filenames_in_dir(src_log_dir_path)

    # verify the file name, and only process log files
    # example name: impala_audit_event_log_1.0-1464403015360
    prefix = 'impala_audit_event_log'
    version = '1.0'
    prefix_len = len(prefix)
    version_len = len(version)
    timestamp_len = 13
    file_name_len = prefix_len + version_len + timestamp_len + 2
    version_offset = prefix_len + 1
    timestamp_offset = version_offset + version_len + 1

    # the file just before the start_time
    pre_file_time = 0
    pre_file_name = None

    # find all files created inside [start_time, end_time]
    # and the file created just before start_time (because this file should be updated here)
    for file_name in file_name_list:
        # skip the file if the file name length does not match
        if len(file_name) != file_name_len:
            continue

        # skip the file if the file name format does not match
        if file_name[version_offset - 1:version_offset] != '_' or \
                file_name[timestamp_offset - 1:timestamp_offset] != '-' or \
                file_name[0:prefix_len] != prefix:
            continue

        # extract the version of audit log
        current_version = file_name[version_offset:version_offset + version_len]
        if current_version != version:
            print 'Audit log version does not match, file name: ' + current_version
            continue

        # extract time stamp
        timestamp = -1
        try:
            timestamp_str = file_name[timestamp_offset:timestamp_offset + timestamp_len]
            timestamp = long(timestamp_str)
        except ValueError:
            continue

        if timestamp < start_time and pre_file_time < timestamp:
            pre_file_name = file_name
            pre_file_time = timestamp
            continue

        # if the timestamp is outside [start_time, end_time], then skip
        if timestamp < start_time or timestamp >= end_time:
            continue

        # for legal log file names whose timestamp is inside [start_time, end_time]
        file_path = src_log_dir_path + '/' + file_name
        dst_hdfs_path = dst_hdfs_dir_path + '/' + file_name
        copy_file_to_hdfs(file_path, dst_hdfs_path)

    # copy the file created just before the start_time to hdfs
    if pre_file_name is not None:
        file_path = src_log_dir_path + '/' + pre_file_name
        dst_hdfs_path = dst_hdfs_dir_path + '/' + pre_file_name
        copy_file_to_hdfs(file_path, dst_hdfs_path)

if __name__ == "__main__":
    host_id = socket.gethostbyname(socket.gethostname())

    if len(sys.argv) < 3:
        print 'Please specify the source log directory and HDFS log directory. For example,\n' + \
            'python fetch-log source-log-dir hdfs-log-dir\n' + \
            'or\n' + \
            'python fetch-log source-log-dir hdfs-log-dir start-date end-date\n' + \
            'date inputs are in format YYYY-MM-DD'
        sys.exit(0)

    src_log_dir_path = sys.argv[1]
    dst_hdfs_dir_path = sys.argv[2] + '/' + str(host_id)

    today_date = date.today()
    one_day = timedelta(days=1)
    yesterday_date = today_date - one_day

    # by default, the script only fetches log files for folder for yesterday
    start_date = yesterday_date
    end_date = today_date

    try:
        if len(sys.argv) == 5:
            start_date = datetime.strptime(sys.argv[3], "%Y-%m-%d")
            end_date = datetime.strptime(sys.argv[4], "%Y-%m-%d")
    except ValueError:
        print 'Please input the dates in YYYY-MM-DD format.'
        sys.exit(0)

    # start time is the 00:00:00 of the start date in milliseconds
    start_time = calendar.timegm(start_date.timetuple()) * 1000

    # end time is the 00:00:00 of the end date in milliseconds
    end_time = calendar.timegm(end_date.timetuple()) * 1000

    print 'starting from ' + str(start_time) + ' ms'
    print 'ending at ' + str(end_time) + ' ms'

    # create the directory of dst_hdfs_dir_path in hdfs
    try:
        cmd = ['hadoop', 'fs', '-test', '-e', dst_hdfs_dir_path]
        if call(cmd) != 0:
            cmd = ['hadoop', 'fs', '-mkdir', dst_hdfs_dir_path]
            print ' '.join(cmd)
            call(cmd)
    except OSError:
        print 'Cannot run hadoop command in the host. Cmd: ' + ' '.join(cmd)
        sys.exit(0)

    # hadoop user may not have access to audit log, so we grant full access to it.
    # customers may want to modify this according to their security policy.
    grant_full_access_for_dir(src_log_dir_path)

    # put log files in tmp dir to hadoop
    copy_logs_to_hdfs(src_log_dir_path, dst_hdfs_dir_path, start_time, end_time)