__init__.py 7.2 KB
# -*- coding: utf-8 -*-
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
""" This module is a collection of methods commonly used in this project. """
import collections
import functools
import json
import logging
import os
import os.path
import re
import shlex
import subprocess
import sys

ENVIRONMENT_KEY = 'INTERCEPT_BUILD'

Execution = collections.namedtuple('Execution', ['pid', 'cwd', 'cmd'])

CtuConfig = collections.namedtuple('CtuConfig', ['collect', 'analyze', 'dir',
                                                 'extdef_map_cmd'])


def duplicate_check(method):
    """ Predicate to detect duplicated entries.

    Unique hash method can be use to detect duplicates. Entries are
    represented as dictionaries, which has no default hash method.
    This implementation uses a set datatype to store the unique hash values.

    This method returns a method which can detect the duplicate values. """

    def predicate(entry):
        entry_hash = predicate.unique(entry)
        if entry_hash not in predicate.state:
            predicate.state.add(entry_hash)
            return False
        return True

    predicate.unique = method
    predicate.state = set()
    return predicate


def run_build(command, *args, **kwargs):
    """ Run and report build command execution

    :param command: array of tokens
    :return: exit code of the process
    """
    environment = kwargs.get('env', os.environ)
    logging.debug('run build %s, in environment: %s', command, environment)
    exit_code = subprocess.call(command, *args, **kwargs)
    logging.debug('build finished with exit code: %d', exit_code)
    return exit_code


def run_command(command, cwd=None):
    """ Run a given command and report the execution.

    :param command: array of tokens
    :param cwd: the working directory where the command will be executed
    :return: output of the command
    """
    def decode_when_needed(result):
        """ check_output returns bytes or string depend on python version """
        return result.decode('utf-8') if isinstance(result, bytes) else result

    try:
        directory = os.path.abspath(cwd) if cwd else os.getcwd()
        logging.debug('exec command %s in %s', command, directory)
        output = subprocess.check_output(command,
                                         cwd=directory,
                                         stderr=subprocess.STDOUT)
        return decode_when_needed(output).splitlines()
    except subprocess.CalledProcessError as ex:
        ex.output = decode_when_needed(ex.output).splitlines()
        raise ex


def reconfigure_logging(verbose_level):
    """ Reconfigure logging level and format based on the verbose flag.

    :param verbose_level: number of `-v` flags received by the command
    :return: no return value
    """
    # Exit when nothing to do.
    if verbose_level == 0:
        return

    root = logging.getLogger()
    # Tune logging level.
    level = logging.WARNING - min(logging.WARNING, (10 * verbose_level))
    root.setLevel(level)
    # Be verbose with messages.
    if verbose_level <= 3:
        fmt_string = '%(name)s: %(levelname)s: %(message)s'
    else:
        fmt_string = '%(name)s: %(levelname)s: %(funcName)s: %(message)s'
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter(fmt=fmt_string))
    root.handlers = [handler]


def command_entry_point(function):
    """ Decorator for command entry methods.

    The decorator initialize/shutdown logging and guard on programming
    errors (catch exceptions).

    The decorated method can have arbitrary parameters, the return value will
    be the exit code of the process. """

    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        """ Do housekeeping tasks and execute the wrapped method. """

        try:
            logging.basicConfig(format='%(name)s: %(message)s',
                                level=logging.WARNING,
                                stream=sys.stdout)
            # This hack to get the executable name as %(name).
            logging.getLogger().name = os.path.basename(sys.argv[0])
            return function(*args, **kwargs)
        except KeyboardInterrupt:
            logging.warning('Keyboard interrupt')
            return 130  # Signal received exit code for bash.
        except Exception:
            logging.exception('Internal error.')
            if logging.getLogger().isEnabledFor(logging.DEBUG):
                logging.error("Please report this bug and attach the output "
                              "to the bug report")
            else:
                logging.error("Please run this command again and turn on "
                              "verbose mode (add '-vvvv' as argument).")
            return 64  # Some non used exit code for internal errors.
        finally:
            logging.shutdown()

    return wrapper


def compiler_wrapper(function):
    """ Implements compiler wrapper base functionality.

    A compiler wrapper executes the real compiler, then implement some
    functionality, then returns with the real compiler exit code.

    :param function: the extra functionality what the wrapper want to
    do on top of the compiler call. If it throws exception, it will be
    caught and logged.
    :return: the exit code of the real compiler.

    The :param function: will receive the following arguments:

    :param result:       the exit code of the compilation.
    :param execution:    the command executed by the wrapper. """

    def is_cxx_compiler():
        """ Find out was it a C++ compiler call. Compiler wrapper names
        contain the compiler type. C++ compiler wrappers ends with `c++`,
        but might have `.exe` extension on windows. """

        wrapper_command = os.path.basename(sys.argv[0])
        return re.match(r'(.+)c\+\+(.*)', wrapper_command)

    def run_compiler(executable):
        """ Execute compilation with the real compiler. """

        command = executable + sys.argv[1:]
        logging.debug('compilation: %s', command)
        result = subprocess.call(command)
        logging.debug('compilation exit code: %d', result)
        return result

    # Get relevant parameters from environment.
    parameters = json.loads(os.environ[ENVIRONMENT_KEY])
    reconfigure_logging(parameters['verbose'])
    # Execute the requested compilation. Do crash if anything goes wrong.
    cxx = is_cxx_compiler()
    compiler = parameters['cxx'] if cxx else parameters['cc']
    result = run_compiler(compiler)
    # Call the wrapped method and ignore it's return value.
    try:
        call = Execution(
            pid=os.getpid(),
            cwd=os.getcwd(),
            cmd=['c++' if cxx else 'cc'] + sys.argv[1:])
        function(result, call)
    except:
        logging.exception('Compiler wrapper failed complete.')
    finally:
        # Always return the real compiler exit code.
        return result


def wrapper_environment(args):
    """ Set up environment for interpose compiler wrapper."""

    return {
        ENVIRONMENT_KEY: json.dumps({
            'verbose': args.verbose,
            'cc': shlex.split(args.cc),
            'cxx': shlex.split(args.cxx)
        })
    }