• 我们在哪一颗星上见过 ,以至如此相互思念 ;我们在哪一颗星上相互思念过,以至如此相互深爱
  • 我们在哪一颗星上分别 ,以至如此相互辉映 ;我们在哪一颗星上入睡 ,以至如此唤醒黎明
  • 认识世界 克服困难 洞悉所有 贴近生活 寻找珍爱 感受彼此

恶意代码技术理论:修改capa输出自定义消息

恶意代码技术理论 云涯 3年前 (2021-12-17) 1526次浏览

flare-capa模块

适用于capa3.0.3

capa_ana.py

#!/usr/bin/env python3 
# -*- coding: utf-8 -*-
# @Time    : 2021/12/14 13:24
# @Author  : wing

import re
import sys
import json
from capa.main import main as capa_main

def Parsing_MBC_data(rules_json):
    import copy
    mbc_all = []

    for capability_key,capability_value in rules_json.items():

        rules_meta_json = capability_value['meta']
        rules_meta_mbc_list = rules_meta_json['mbc']
        if rules_meta_mbc_list:
            for rules_meta_mbc_dict in rules_meta_mbc_list:
                mbc_child = []
                mbc_objective = rules_meta_mbc_dict['objective']
                mbc_child.append(mbc_objective)

                mbc_behavior = rules_meta_mbc_dict['behavior']
                mbc_child.append(mbc_behavior)

                mbc_method = rules_meta_mbc_dict['method']
                mbc_child.append(mbc_method)

                mbc_all.append(copy.deepcopy(mbc_child))

    return mbc_all

def Parsing_ATT_CK_data(rules_json):
    import copy
    ttp_all = []

    for capability_key,capability_value in rules_json.items():

        rules_meta_json = capability_value['meta']
        rules_meta_ttp_list = rules_meta_json['att&ck']
        if rules_meta_ttp_list:
            for rules_meta_ttp_dict in rules_meta_ttp_list:
                ttp_child = []
                ttp_tactic = rules_meta_ttp_dict['tactic']
                ttp_child.append(ttp_tactic)

                ttp_technique = rules_meta_ttp_dict['technique']
                ttp_child.append(ttp_technique)

                ttp_subtechnique = rules_meta_ttp_dict['subtechnique']
                ttp_child.append(ttp_subtechnique)

                ttp_all.append(copy.deepcopy(ttp_child))

    return ttp_all

def find_api(object):
    match_func_list = []
    if hasattr(object,'children'):
        object_child_list = object.children

        if not object_child_list and hasattr(object,'statement'):
            if hasattr(object.statement,'value'):
                match_func = object.statement.value
            elif hasattr(object.statement,'child'):
                match_func = object.statement.child.value
            match_func_list.append(match_func)
            return match_func_list
        else:
            for object_child_obj in object_child_list:
                if object_child_obj.success:
                    match_func_list_ = find_api(object_child_obj)
                    match_func_list += match_func_list_
            return match_func_list
    else:
        return []

def Parsing_CAPABILITY_data(rules_json,capabilities):
    import copy

    CAPABILITY_list = []
    for capability in rules_json:
        Child_CAPABILITY_list = []
        Child_CAPABILITY_list.append(capability)

        if capability == "contains PDB path":
            pdb_dict = rules_json[capability]
            match_pdb_dict = pdb_dict['matches']['0']['node']['feature']['matches']
            for match_pdb in match_pdb_dict:
                Child_CAPABILITY_list.append(match_pdb)
            CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))
        else:
            capability_child_list = capabilities[capability]
            for capability_child in capability_child_list:
                capability_child_child_obj_children_list = capability_child[1].children
                for capability_child_child_obj_children_child in capability_child_child_obj_children_list:
                    if capability_child_child_obj_children_child.success:
                        if not capability_child_child_obj_children_child.children:
                            if hasattr(capability_child_child_obj_children_child.statement,'value'):
                                match_func = capability_child_child_obj_children_child.statement.value
                                if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
                                    Child_CAPABILITY_list.append(match_func)
                        else:
                            match_func_list = find_api(capability_child_child_obj_children_child)
                            for match_func in match_func_list:
                                if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
                                    Child_CAPABILITY_list.append(match_func)
            CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))

    return CAPABILITY_list

def Parsing_basic_data(meta_josn):
    basic_data = []
    rules_meta_sample = meta_josn['sample']

    sample_path = rules_meta_sample['path']
    basic_data.append(sample_path)

    sample_md5 = rules_meta_sample['md5']
    basic_data.append(sample_md5)

    sample_sha1 = rules_meta_sample['sha1']
    basic_data.append(sample_sha1)

    sample_sha256 = rules_meta_sample['sha256']
    basic_data.append(sample_sha256)

    return basic_data

'''
格式化方式1
path: E:\\a.exe
md5: e48a1c9b14cb16e363899c4bc4f4d33f
sha1: 0dab1f59a6a3587cc801d810d60be5886fdd7445
sha256: a593b585bd61d23c03fbfe5b31aadc157961d4b65afa0860dd57692a283628c3
'''
def Format_Basic_data_type1(basic_data,file_path=None, con_print=True):
    sample_path = basic_data[0]
    sample_md5 = basic_data[1]
    sample_sha1 = basic_data[2]
    sample_sha256 = basic_data[3]

    if con_print:
        print('path: '+sample_path)
        print('md5: '+sample_md5)
        print('sha1: '+sample_sha1)
        print('sha256: '+sample_sha256)
        print('\n')
    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            file_handle.write('path: '+sample_path+'\n')
            file_handle.write('md5: '+sample_md5+'\n')
            file_handle.write('sha1: '+sample_sha1+'\n')
            file_handle.write('sha256: '+sample_sha256+'\n')
            file_handle.write('\n')


'''
格式化方式1
write file on Windows
   ZwWriteFile
contains PDB path
    E:\\8168\\vc98\\self\\bin\\x86\\c1.pdb
'''
def Format_CAPABILITY_data_type1(CAPABILITY_list, file_path=None, con_print=True):
    if con_print:
        for i in CAPABILITY_list:
            if len(i)==1:
                print('*'+i[0])
            else:
                print('*'+i[0])
                other_list = i[1:]
                for j in other_list:
                    print('\t'+str(j))
        print('\r\n')
    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            for i in CAPABILITY_list:
                if len(i) == 1:
                    file_handle.write('*'+i[0]+'\n')
                else:
                    file_handle.write('*'+i[0] + '\n')
                    other_list = i[1:]
                    for j in other_list:
                        file_handle.write('\t' + str(j)+'\n')
            file_handle.write('\n')

'''
格式化方式1
objective: Operating System
    behavior: Registry
        method: Create Registry Key
objective: Operating System
    behavior: Registry
        method: Open Registry Key
'''
def Format_MBC_data_type1(mbc_all, file_path=None, con_print=True):
    if con_print:
        for mbc_list in mbc_all:
            print('*'+'objective: '+mbc_list[0])
            print('\t'+'behavior: '+mbc_list[1])
            if mbc_list[2]:
                print('\t\t'+'method: '+mbc_list[2])
        print('\r\n')

    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            for mbc_list in mbc_all:
                file_handle.write('*'+'objective: ' + mbc_list[0]+'\n')
                file_handle.write('\t' + 'behavior: ' + mbc_list[1]+'\n')
                if mbc_list[2]:
                    file_handle.write('\t\t' + 'method: ' + mbc_list[2]+'\n')
            file_handle.write('\n')


'''
格式化方式1
tactic: Discovery
    technique: System Information Discovery
        subtechnique: 
'''
def Format_TTP_data_type1(ttp_all, file_path=None, con_print=True):
    if con_print:
        for ttp_list in ttp_all:
            print('*'+'tactic: '+ttp_list[0])
            print('\t'+'technique: '+ttp_list[1])
            if ttp_list[2]:
                print('\t\t'+'subtechnique: '+ttp_list[2])
        print('\r\n')
    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            for ttp_list in ttp_all:
                file_handle.write('*'+'tactic: ' + ttp_list[0]+'\n')
                file_handle.write('\t' + 'technique: ' + ttp_list[1]+'\n')
                if ttp_list[2]:
                    file_handle.write('\t\t' + 'subtechnique: ' + ttp_list[2]+'\n')
            file_handle.write('\n')


def capa_ana_main(file_path,rules_path="C:\\capa_config\\rules",sigs_path="C:\\capa_config\\sigs",print_sig=True,log_path=None,ttp_sig=True,mbc_sig=True,cap_sig=True):
    argv_json = ['-r', rules_path, '-s', sigs_path,'-j', file_path]
    argv_v = ['-r', rules_path, '-s', sigs_path,'-v', file_path]
    argv_vv = ['-r', rules_path, '-s', sigs_path,'-vv', file_path]
    argv = argv_json

    results, capabilities = capa_main(argv=argv)
    results_json = json.loads(results)
    rules_json = results_json['rules']
    meta_josn = results_json['meta']

    basic_data = Parsing_basic_data(meta_josn)
    Format_Basic_data_type1(basic_data,con_print=print_sig,file_path=log_path)

    if ttp_sig:
        ttp_all = Parsing_ATT_CK_data(rules_json)
        Format_TTP_data_type1(ttp_all,con_print=print_sig,file_path=log_path)

    if mbc_sig:
        mbc_all = Parsing_MBC_data(rules_json)
        Format_MBC_data_type1(mbc_all,con_print=print_sig,file_path=log_path)

    if cap_sig:
        CAPABILITY_list = Parsing_CAPABILITY_data(rules_json,capabilities)
        Format_CAPABILITY_data_type1(CAPABILITY_list,con_print=print_sig,file_path=log_path)


if __name__ == '__main__':

    import os,sys
    base_dir = r"E:\恶意代码分析\1-日常狩猎\APT28\2021-12-17"
    log_path = os.path.join(base_dir,'capa_ana_results.log')
    base_handle = os.walk(base_dir)
    for root,dir_list,file_list in base_handle:
        for file in file_list:
            file_path = os.path.join(root,file)
            capa_ana_main(file_path=file_path,print_sig=False,log_path=log_path)

    sys.exit(0)

capa.main

#!/usr/bin/env python3
"""
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import hashlib
import logging
import os.path
import argparse
import datetime
import textwrap
import itertools
import collections
from typing import Any, Dict, List, Tuple

import halo
import tqdm
import colorama
from pefile import PEFormatError
from elftools.common.exceptions import ELFError

import capa.rules
import capa.engine
import capa.version
import capa.render.json
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze
import capa.render.vverbose
import capa.features.extractors
import capa.features.extractors.common
import capa.features.extractors.pefile
import capa.features.extractors.elffile
from capa.rules import Rule, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import get_file_taste
from capa.features.extractors.base_extractor import FunctionHandle, FeatureExtractor

RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
BACKEND_VIV = "vivisect"
BACKEND_SMDA = "smda"
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")

E_MISSING_RULES = -10
E_MISSING_FILE = -11
E_INVALID_RULE = -12
E_CORRUPT_FILE = -13
E_FILE_LIMITATION = -14
E_INVALID_SIG = -15
E_INVALID_FILE_TYPE = -16
E_INVALID_FILE_ARCH = -17
E_INVALID_FILE_OS = -18
E_UNSUPPORTED_IDA_VERSION = -19

logger = logging.getLogger("capa")


def set_vivisect_log_level(level):
    logging.getLogger("vivisect").setLevel(level)
    logging.getLogger("vivisect.base").setLevel(level)
    logging.getLogger("vivisect.impemu").setLevel(level)
    logging.getLogger("vtrace").setLevel(level)
    logging.getLogger("envi").setLevel(level)
    logging.getLogger("envi.codeflow").setLevel(level)


def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle):
    # contains features from:
    #  - insns
    #  - function
    function_features = collections.defaultdict(set)  # type: FeatureSet
    bb_matches = collections.defaultdict(list)  # type: MatchResults

    for feature, va in itertools.chain(extractor.extract_function_features(f), extractor.extract_global_features()):
        function_features[feature].add(va)

    for bb in extractor.get_basic_blocks(f):
        # contains features from:
        #  - insns
        #  - basic blocks
        bb_features = collections.defaultdict(set)

        for feature, va in itertools.chain(
            extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
        ):
            bb_features[feature].add(va)
            function_features[feature].add(va)

        for insn in extractor.get_instructions(f, bb):
            for feature, va in itertools.chain(
                extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
            ):
                bb_features[feature].add(va)
                function_features[feature].add(va)

        _, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, int(bb))

        for rule_name, res in matches.items():
            bb_matches[rule_name].extend(res)
            rule = ruleset[rule_name]
            for va, _ in res:
                capa.engine.index_rule_matches(function_features, rule, [va])

    _, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f))
    return function_matches, bb_matches, len(function_features)


def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
    file_features = collections.defaultdict(set)  # type: FeatureSet

    for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
        # not all file features may have virtual addresses.
        # if not, then at least ensure the feature shows up in the index.
        # the set of addresses will still be empty.
        if va:
            file_features[feature].add(va)
        else:
            if feature not in file_features:
                file_features[feature] = set()

    logger.debug("analyzed file and extracted %d features", len(file_features))

    file_features.update(function_features)

    _, matches = capa.engine.match(ruleset.file_rules, file_features, 0x0)
    return matches, len(file_features)


def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
    all_function_matches = collections.defaultdict(list)  # type: MatchResults
    all_bb_matches = collections.defaultdict(list)  # type: MatchResults

    meta = {
        "feature_counts": {
            "file": 0,
            "functions": {},
        },
        "library_functions": {},
    }  # type: Dict[str, Any]

    pbar = tqdm.tqdm
    if disable_progress:
        # do not use tqdm to avoid unnecessary side effects when caller intends
        # to disable progress completely
        pbar = lambda s, *args, **kwargs: s

    functions = list(extractor.get_functions())
    n_funcs = len(functions)

    # 修改 for f in pb: 修改为 for f in functions:  去掉加载条 注释pb
    # pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions")
    for f in functions:
        function_address = int(f)

        if extractor.is_library_function(function_address):
            function_name = extractor.get_function_name(function_address)
            logger.debug("skipping library function 0x%x (%s)", function_address, function_name)
            meta["library_functions"][function_address] = function_name

            #修改 注释pb(进度条)相关
            # n_libs = len(meta["library_functions"])
            # percentage = 100 * (n_libs / n_funcs)
            # if isinstance(pb, tqdm.tqdm):
            #     pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
            continue

        function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
        meta["feature_counts"]["functions"][function_address] = feature_count
        logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count)

        for rule_name, res in function_matches.items():
            all_function_matches[rule_name].extend(res)
        for rule_name, res in bb_matches.items():
            all_bb_matches[rule_name].extend(res)

    # collection of features that captures the rule matches within function and BB scopes.
    # mapping from feature (matched rule) to set of addresses at which it matched.
    function_and_lower_features: FeatureSet = collections.defaultdict(set)
    for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()):
        locations = set(map(lambda p: p[0], results))
        rule = ruleset[rule_name]
        capa.engine.index_rule_matches(function_and_lower_features, rule, locations)

    all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
    meta["feature_counts"]["file"] = feature_count

    matches = {
        rule_name: results
        for rule_name, results in itertools.chain(
            # each rule exists in exactly one scope,
            # so there won't be any overlap among these following MatchResults,
            # and we can merge the dictionaries naively.
            all_bb_matches.items(),
            all_function_matches.items(),
            all_file_matches.items(),
        )
    }

    return matches, meta


def has_rule_with_namespace(rules, capabilities, rule_cat):
    for rule_name in capabilities.keys():
        if rules.rules[rule_name].meta.get("namespace", "").startswith(rule_cat):
            return True
    return False


def is_internal_rule(rule: Rule) -> bool:
    return rule.meta.get("namespace", "").startswith("internal/")


def is_file_limitation_rule(rule: Rule) -> bool:
    return rule.meta.get("namespace", "") == "internal/limitation/file"


def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
    file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values()))

    for file_limitation_rule in file_limitation_rules:
        if file_limitation_rule.name not in capabilities:
            continue

        #修改logger.warning 修改为logger.debug
        logger.debug("-" * 80)

        for line in file_limitation_rule.meta.get("description", "").split("\n"):

            # 修改logger.warning 修改为logger.debug
            logger.debug(" " + line)

        # 修改logger.warning 修改为logger.debug
        logger.debug(" Identified via rule: %s", file_limitation_rule.name)
        if is_standalone:
            # 修改logger.warning 修改为logger.debug
            logger.debug(" ")
            # 修改logger.warning 修改为logger.debug
            logger.debug(" Use -v or -vv if you really want to see the capabilities identified by capa.")
        # 修改logger.warning 修改为logger.debug
        logger.debug("-" * 80)

        # bail on first file limitation
        return True

    return False


def is_supported_format(sample: str) -> bool:
    """
    Return if this is a supported file based on magic header values
    """
    with open(sample, "rb") as f:
        taste = f.read(0x100)

    return len(list(capa.features.extractors.common.extract_format(taste))) == 1


def get_format(sample: str) -> str:
    with open(sample, "rb") as f:
        buf = f.read()

    for feature, _ in capa.features.extractors.common.extract_format(buf):
        assert isinstance(feature.value, str)
        return feature.value

    return "unknown"


def is_supported_arch(sample: str) -> bool:
    with open(sample, "rb") as f:
        buf = f.read()

    return len(list(capa.features.extractors.common.extract_arch(buf))) == 1


def get_arch(sample: str) -> str:
    with open(sample, "rb") as f:
        buf = f.read()

    for feature, _ in capa.features.extractors.common.extract_arch(buf):
        assert isinstance(feature.value, str)
        return feature.value

    return "unknown"


def is_supported_os(sample: str) -> bool:
    with open(sample, "rb") as f:
        buf = f.read()

    return len(list(capa.features.extractors.common.extract_os(buf))) == 1


def get_os(sample: str) -> str:
    with open(sample, "rb") as f:
        buf = f.read()

    for feature, _ in capa.features.extractors.common.extract_os(buf):
        assert isinstance(feature.value, str)
        return feature.value

    return "unknown"


def get_meta_str(vw):
    """
    Return workspace meta information string
    """
    meta = []
    for k in ["Format", "Platform", "Architecture"]:
        if k in vw.metadata:
            meta.append("%s: %s" % (k.lower(), vw.metadata[k]))
    return "%s, number of functions: %d" % (", ".join(meta), len(vw.getFunctions()))


def is_running_standalone() -> bool:
    """
    are we running from a PyInstaller'd executable?
    if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
    """
    return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")


def get_default_root() -> str:
    """
    get the file system path to the default resources directory.
    under PyInstaller, this comes from _MEIPASS.
    under source, this is the root directory of the project.
    """
    if is_running_standalone():
        # pylance/mypy don't like `sys._MEIPASS` because this isn't standard.
        # its injected by pyinstaller.
        # so we'll fetch this attribute dynamically.
        return getattr(sys, "_MEIPASS")
    else:
        return os.path.join(os.path.dirname(__file__), "..")


def get_default_signatures() -> List[str]:
    """
    compute a list of file system paths to the default FLIRT signatures.
    """
    sigs_path = os.path.join(get_default_root(), "sigs")
    logger.debug("signatures path: %s", sigs_path)

    ret = []
    for root, dirs, files in os.walk(sigs_path):
        for file in files:
            if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")):
                continue

            ret.append(os.path.join(root, file))

    return ret


class UnsupportedFormatError(ValueError):
    pass


class UnsupportedArchError(ValueError):
    pass


class UnsupportedOSError(ValueError):
    pass


def get_workspace(path, format, sigpaths):
    """
    load the program at the given path into a vivisect workspace using the given format.
    also apply the given FLIRT signatures.

    supported formats:
      - pe
      - elf
      - shellcode 32-bit
      - shellcode 64-bit
      - auto

    this creates and analyzes the workspace; however, it does *not* save the workspace.
    this is the responsibility of the caller.
    """

    # lazy import enables us to not require viv if user wants SMDA, for example.
    import viv_utils

    logger.debug("generating vivisect workspace for: %s", path)
    if format == "auto":
        if not is_supported_format(path):
            raise UnsupportedFormatError()

        # don't analyze, so that we can add our Flirt function analyzer first.
        vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
    elif format in {"pe", "elf"}:
        vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
    elif format == "sc32":
        # these are not analyzed nor saved.
        vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="i386", analyze=False)
    elif format == "sc64":
        vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="amd64", analyze=False)
    else:
        raise ValueError("unexpected format: " + format)

    viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths)

    vw.analyze()

    logger.debug("%s", get_meta_str(vw))
    return vw


class UnsupportedRuntimeError(RuntimeError):
    pass


def get_extractor(
    path: str, format: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
) -> FeatureExtractor:
    """
    raises:
      UnsupportedFormatError
      UnsupportedArchError
      UnsupportedOSError
    """
    if format not in ("sc32", "sc64"):
        if not is_supported_format(path):
            raise UnsupportedFormatError()

        if not is_supported_arch(path):
            raise UnsupportedArchError()

        if not is_supported_os(path):
            raise UnsupportedOSError()

    if backend == "smda":
        from smda.SmdaConfig import SmdaConfig
        from smda.Disassembler import Disassembler

        import capa.features.extractors.smda.extractor

        smda_report = None
        with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
            config = SmdaConfig()
            config.STORE_BUFFER = True
            smda_disasm = Disassembler(config)
            smda_report = smda_disasm.disassembleFile(path)

        return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path)
    else:
        import capa.features.extractors.viv.extractor

        # 修改 删除旋转提示 with下面向左缩进
        # with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
        vw = get_workspace(path, format, sigpaths)

        if should_save_workspace:
            logger.debug("saving workspace")
            try:
                vw.saveWorkspace()
            except IOError:
                # see #168 for discussion around how to handle non-writable directories
                logger.info("source directory is not writable, won't save intermediate workspace")
        else:
            logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

        return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)


def is_nursery_rule_path(path: str) -> bool:
    """
    The nursery is a spot for rules that have not yet been fully polished.
    For example, they may not have references to public example of a technique.
    Yet, we still want to capture and report on their matches.
    The nursery is currently a subdirectory of the rules directory with that name.

    When nursery rules are loaded, their metadata section should be updated with:
      `nursery=True`.
    """
    return "nursery" in path


def get_rules(rule_path: str, disable_progress=False) -> List[Rule]:
    if not os.path.exists(rule_path):
        raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)

    rule_paths = []
    if os.path.isfile(rule_path):
        rule_paths.append(rule_path)
    elif os.path.isdir(rule_path):
        logger.debug("reading rules from directory %s", rule_path)
        for root, dirs, files in os.walk(rule_path):
            if ".github" in root:
                # the .github directory contains CI config in capa-rules
                # this includes some .yml files
                # these are not rules
                continue

            for file in files:
                if not file.endswith(".yml"):
                    if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
                        # expect to see .git* files, readme.md, format.md, and maybe a .git directory
                        # other things maybe are rules, but are mis-named.
                        logger.warning("skipping non-.yml file: %s", file)
                    continue

                rule_path = os.path.join(root, file)
                rule_paths.append(rule_path)

    rules = []  # type: List[Rule]

    pbar = tqdm.tqdm
    if disable_progress:
        # do not use tqdm to avoid unnecessary side effects when caller intends
        # to disable progress completely
        pbar = lambda s, *args, **kwargs: s

    # 修改 删除进度条
    # for rule_path in pbar(list(rule_paths), desc="loading ", unit=" rules"):
    for rule_path in rule_paths:
        try:
            rule = capa.rules.Rule.from_yaml_file(rule_path)
        except capa.rules.InvalidRule:
            raise
        else:
            rule.meta["capa/path"] = rule_path
            if is_nursery_rule_path(rule_path):
                rule.meta["capa/nursery"] = True

            rules.append(rule)
            logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)

    return rules


def get_signatures(sigs_path):
    if not os.path.exists(sigs_path):
        raise IOError("signatures path %s does not exist or cannot be accessed" % sigs_path)

    paths = []
    if os.path.isfile(sigs_path):
        paths.append(sigs_path)
    elif os.path.isdir(sigs_path):
        logger.debug("reading signatures from directory %s", os.path.abspath(os.path.normpath(sigs_path)))
        for root, dirs, files in os.walk(sigs_path):
            for file in files:
                if file.endswith((".pat", ".pat.gz", ".sig")):
                    sig_path = os.path.join(root, file)
                    paths.append(sig_path)

    # nicely normalize and format path so that debugging messages are clearer
    paths = [os.path.abspath(os.path.normpath(path)) for path in paths]

    # load signatures in deterministic order: the alphabetic sorting of filename.
    # this means that `0_sigs.pat` loads before `1_sigs.pat`.
    paths = sorted(paths, key=os.path.basename)

    for path in paths:
        logger.debug("found signature file: %s", path)

    return paths


def collect_metadata(argv, sample_path, rules_path, extractor):
    md5 = hashlib.md5()
    sha1 = hashlib.sha1()
    sha256 = hashlib.sha256()

    with open(sample_path, "rb") as f:
        buf = f.read()

    md5.update(buf)
    sha1.update(buf)
    sha256.update(buf)

    if rules_path != RULES_PATH_DEFAULT_STRING:
        rules_path = os.path.abspath(os.path.normpath(rules_path))

    format = get_format(sample_path)
    arch = get_arch(sample_path)
    os_ = get_os(sample_path)

    return {
        "timestamp": datetime.datetime.now().isoformat(),
        "version": capa.version.__version__,
        "argv": argv,
        "sample": {
            "md5": md5.hexdigest(),
            "sha1": sha1.hexdigest(),
            "sha256": sha256.hexdigest(),
            "path": os.path.normpath(sample_path),
        },
        "analysis": {
            "format": format,
            "arch": arch,
            "os": os_,
            "extractor": extractor.__class__.__name__,
            "rules": rules_path,
            "base_address": extractor.get_base_address(),
            "layout": {
                # this is updated after capabilities have been collected.
                # will look like:
                #
                # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
            },
        },
    }


def compute_layout(rules, extractor, capabilities):
    """
    compute a metadata structure that links basic blocks
    to the functions in which they're found.

    only collect the basic blocks at which some rule matched.
    otherwise, we may pollute the json document with
    a large amount of un-referenced data.
    """
    functions_by_bb = {}
    bbs_by_function = {}
    for f in extractor.get_functions():
        bbs_by_function[int(f)] = []
        for bb in extractor.get_basic_blocks(f):
            functions_by_bb[int(bb)] = int(f)
            bbs_by_function[int(f)].append(int(bb))

    matched_bbs = set()
    for rule_name, matches in capabilities.items():
        rule = rules[rule_name]
        if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
            for (addr, match) in matches:
                assert addr in functions_by_bb
                matched_bbs.add(addr)

    layout = {
        "functions": {
            f: {
                "matched_basic_blocks": 
                # this object is open to extension in the future,
                # such as with the function name, etc.
            }
            for f, bbs in bbs_by_function.items()
        }
    }

    return layout


def install_common_args(parser, wanted=None):
    """
    register a common set of command line arguments for re-use by main & scripts.
    these are things like logging/coloring/etc.
    also enable callers to opt-in to common arguments, like specifying the input sample.

    this routine lets many script use the same language for cli arguments.
    see `handle_common_args` to do common configuration.

    args:
      parser (argparse.ArgumentParser): a parser to update in place, adding common arguments.
      wanted (Set[str]): collection of arguments to opt-into, including:
        - "sample": required positional argument to input file.
        - "format": flag to override file format.
        - "backend": flag to override analysis backend.
        - "rules": flag to override path to capa rules.
        - "tag": flag to override/specify which rules to match.
    """
    if wanted is None:
        wanted = set()

    #
    # common arguments that all scripts will have
    #

    parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__))
    parser.add_argument(
        "-v", "--verbose", action="store_true", help="enable verbose result document (no effect with --json)"
    )
    parser.add_argument(
        "-vv", "--vverbose", action="store_true", help="enable very verbose result document (no effect with --json)"
    )
    parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
    parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors")
    parser.add_argument(
        "--color",
        type=str,
        choices=("auto", "always", "never"),
        default="auto",
        help="enable ANSI color codes in results, default: only during interactive session",
    )

    #
    # arguments that may be opted into:
    #
    #   - sample
    #   - format
    #   - rules
    #   - tag
    #

    if "sample" in wanted:
        parser.add_argument(
            "sample",
            type=str,
            help="path to sample to analyze",
        )

    if "format" in wanted:
        formats = [
            ("auto", "(default) detect file type automatically"),
            ("pe", "Windows PE file"),
            ("elf", "Executable and Linkable Format"),
            ("sc32", "32-bit shellcode"),
            ("sc64", "64-bit shellcode"),
            ("freeze", "features previously frozen by capa"),
        ]
        format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
        parser.add_argument(
            "-f",
            "--format",
            choices=[f[0] for f in formats],
            default="auto",
            help="select sample format, %s" % format_help,
        )

        if "backend" in wanted:
            parser.add_argument(
                "-b",
                "--backend",
                type=str,
                help="select the backend to use",
                choices=(BACKEND_VIV, BACKEND_SMDA),
                default=BACKEND_VIV,
            )

    if "rules" in wanted:
        parser.add_argument(
            "-r",
            "--rules",
            type=str,
            default=RULES_PATH_DEFAULT_STRING,
            help="path to rule file or directory, use embedded rules by default",
        )

    if "signatures" in wanted:
        parser.add_argument(
            "-s",
            "--signatures",
            type=str,
            default=SIGNATURES_PATH_DEFAULT_STRING,
            help="path to .sig/.pat file or directory used to identify library functions, use embedded signatures by default",
        )

    if "tag" in wanted:
        parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values")


def handle_common_args(args):
    """
    handle the global config specified by `install_common_args`,
    such as configuring logging/coloring/etc.
    the following fields will be overwritten when present:
      - rules: file system path to rule files.
      - signatures: file system path to signature files.

    args:
      args (argparse.Namespace): parsed arguments that included at least `install_common_args` args.
    """
    if args.quiet:
        logging.basicConfig(level=logging.WARNING)
        logging.getLogger().setLevel(logging.WARNING)
    elif args.debug:
        logging.basicConfig(level=logging.DEBUG)
        logging.getLogger().setLevel(logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
        logging.getLogger().setLevel(logging.INFO)

    # disable vivisect-related logging, it's verbose and not relevant for capa users
    set_vivisect_log_level(logging.CRITICAL)

    # Since Python 3.8 cp65001 is an alias to utf_8, but not for Pyhton < 3.8
    # TODO: remove this code when only supporting Python 3.8+
    # https://stackoverflow.com/a/3259271/87207
    import codecs

    codecs.register(lambda name: codecs.lookup("utf-8") if name == "cp65001" else None)

    if args.color == "always":
        colorama.init(strip=False)
    elif args.color == "auto":
        # colorama will detect:
        #  - when on Windows console, and fixup coloring, and
        #  - when not an interactive session, and disable coloring
        # renderers should use coloring and assume it will be stripped out if necessary.
        colorama.init()
    elif args.color == "never":
        colorama.init(strip=True)
    else:
        raise RuntimeError("unexpected --color value: " + args.color)

    if hasattr(args, "rules"):
        if args.rules == RULES_PATH_DEFAULT_STRING:
            logger.debug("-" * 80)
            logger.debug(" Using default embedded rules.")
            logger.debug(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/  /path/to/mal.exe`.")
            logger.debug(" You can see the current default rule set here:")
            logger.debug("     https://github.com/mandiant/capa-rules")
            logger.debug("-" * 80)

            rules_path = os.path.join(get_default_root(), "rules")

            if not os.path.exists(rules_path):
                # when a users installs capa via pip,
                # this pulls down just the source code - not the default rules.
                # i'm not sure the default rules should even be written to the library directory,
                # so in this case, we require the user to use -r to specify the rule directory.
                logger.error("default embedded rules not found! (maybe you installed capa as a library?)")
                logger.error("provide your own rule set via the `-r` option.")
                return E_MISSING_RULES
        else:
            rules_path = args.rules
            logger.debug("using rules path: %s", rules_path)

        args.rules = rules_path

    if hasattr(args, "signatures"):
        if args.signatures == SIGNATURES_PATH_DEFAULT_STRING:
            logger.debug("-" * 80)
            logger.debug(" Using default embedded signatures.")
            logger.debug(
                " To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/  /path/to/mal.exe`."
            )
            logger.debug("-" * 80)

            sigs_path = os.path.join(get_default_root(), "sigs")
        else:
            sigs_path = args.signatures
            logger.debug("using signatures path: %s", sigs_path)

        args.signatures = sigs_path


def main(argv=None):
    if sys.version_info < (3, 6):
        raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.6+")

    if argv is None:
        argv = sys.argv[1:]

    desc = "The FLARE team's open-source tool to identify capabilities in executable files."
    epilog = textwrap.dedent(
        """
        By default, capa uses a default set of embedded rules.
        You can see the rule set here:
          https://github.com/mandiant/capa-rules

        To provide your own rule set, use the `-r` flag:
          capa  --rules /path/to/rules  suspicious.exe
          capa  -r      /path/to/rules  suspicious.exe

        examples:
          identify capabilities in a binary
            capa suspicious.exe

          identify capabilities in 32-bit shellcode, see `-f` for all supported formats
            capa -f sc32 shellcode.bin

          report match locations
            capa -v suspicious.exe

          report all feature match details
            capa -vv suspicious.exe

          filter rules by meta fields, e.g. rule name or namespace
            capa -t "create TCP socket" suspicious.exe
         """
    )

    parser = argparse.ArgumentParser(
        description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"})
    parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
    args = parser.parse_args(args=argv)
    ret = handle_common_args(args)
    if ret is not None and ret != 0:
        return ret

    try:
        taste = get_file_taste(args.sample)
    except IOError as e:
        # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
        # handle the IOError separately and reach into the args
        logger.error("%s", e.args[0])
        return E_MISSING_FILE

    try:
        # 规则
        rules = get_rules(args.rules, disable_progress=args.quiet)
        rules = capa.rules.RuleSet(rules)
        logger.debug(
            "successfully loaded %s rules",
            # during the load of the RuleSet, we extract subscope statements into their own rules
            # that are subsequently `match`ed upon. this inflates the total rule count.
            # so, filter out the subscope rules when reporting total number of loaded rules.
            len([i for i in filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())]),
        )
        if args.tag:
            rules = rules.filter_rules_by_meta(args.tag)
            logger.debug("selected %d rules", len(rules))
            for i, r in enumerate(rules.rules, 1):
                # TODO don't display subscope rules?
                logger.debug(" %d. %s", i, r)
    except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
        logger.error("%s", str(e))
        return E_INVALID_RULE

    file_extractor = None
    if args.format == "pe" or (args.format == "auto" and taste.startswith(b"MZ")):
        # these pefile and elffile file feature extractors are pretty light weight: they don't do any code analysis.
        # so we can fairly quickly determine if the given file has "pure" file-scope rules
        # that indicate a limitation (like "file is packed based on section names")
        # and avoid doing a full code analysis on difficult/impossible binaries.
        try:
            file_extractor = capa.features.extractors.pefile.PefileFeatureExtractor(args.sample)
        except PEFormatError as e:
            logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
            return E_CORRUPT_FILE

    elif args.format == "elf" or (args.format == "auto" and taste.startswith(b"\x7fELF")):
        try:
            file_extractor = capa.features.extractors.elffile.ElfFeatureExtractor(args.sample)
        except (ELFError, OverflowError) as e:
            logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
            return E_CORRUPT_FILE

    if file_extractor:
        try:
            pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {})
        except PEFormatError as e:
            logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
            return E_CORRUPT_FILE
        except (ELFError, OverflowError) as e:
            logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
            return E_CORRUPT_FILE

        # file limitations that rely on non-file scope won't be detected here.
        # nor on FunctionName features, because pefile doesn't support this.
        if has_file_limitation(rules, pure_file_capabilities):
            # bail if capa encountered file limitation e.g. a packed binary
            # do show the output in verbose mode, though.
            if not (args.verbose or args.vverbose or args.json):
                logger.debug("file limitation short circuit, won't analyze fully.")
                return E_FILE_LIMITATION

    try:
        if args.format == "pe" or (args.format == "auto" and taste.startswith(b"MZ")):
            sig_paths = get_signatures(args.signatures)
        else:
            sig_paths = []
            logger.debug("skipping library code matching: only have PE signatures")
    except (IOError) as e:
        logger.error("%s", str(e))
        return E_INVALID_SIG

    if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
        format = "freeze"
        with open(args.sample, "rb") as f:
            extractor = capa.features.freeze.load(f.read())
    else:
        format = args.format
        if format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_32):
            format = "sc32"
        elif format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_64):
            format = "sc64"

        should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)

        try:
            # 解析函数
            extractor = get_extractor(
                args.sample, format, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
            )
        except UnsupportedFormatError:
            logger.error("-" * 80)
            logger.error(" Input file does not appear to be a PE or ELF file.")
            logger.error(" ")
            logger.error(
                " capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)."
            )
            logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
            logger.error("-" * 80)
            return E_INVALID_FILE_TYPE
        except UnsupportedArchError:
            logger.error("-" * 80)
            logger.error(" Input file does not appear to target a supported architecture.")
            logger.error(" ")
            logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).")
            logger.error("-" * 80)
            return E_INVALID_FILE_ARCH
        except UnsupportedOSError:
            logger.error("-" * 80)
            logger.error(" Input file does not appear to target a supported OS.")
            logger.error(" ")
            logger.error(
                " capa currently only supports analyzing executables for some operating systems (including Windows and Linux)."
            )
            logger.error("-" * 80)
            return E_INVALID_FILE_OS

    meta = collect_metadata(argv, args.sample, args.rules, extractor)

    capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
    meta["analysis"].update(counts)
    meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)

    if has_file_limitation(rules, capabilities):
        # bail if capa encountered file limitation e.g. a packed binary
        # do show the output in verbose mode, though.
        if not (args.verbose or args.vverbose or args.json):
            return E_FILE_LIMITATION

    # 修改 将打印换成返回  capabilities是字典
    if args.json:
        results = capa.render.json.render(meta, rules, capabilities)
    elif args.vverbose:
        results = capa.render.vverbose.render(meta, rules, capabilities)
    elif args.verbose:
        results = capa.render.verbose.render(meta, rules, capabilities)
    else:
        results = capa.render.default.render(meta, rules, capabilities)
    colorama.deinit()

    logger.debug("done.")

    # 修改 return 0 为 return results,capabilities
    # return 0
    return results,capabilities


def ida_main():
    import capa.rules
    import capa.ida.helpers
    import capa.render.default
    import capa.features.extractors.ida.extractor

    logging.basicConfig(level=logging.INFO)
    logging.getLogger().setLevel(logging.INFO)

    if not capa.ida.helpers.is_supported_ida_version():
        return E_UNSUPPORTED_IDA_VERSION

    if not capa.ida.helpers.is_supported_file_type():
        return E_INVALID_FILE_TYPE

    logger.debug("-" * 80)
    logger.debug(" Using default embedded rules.")
    logger.debug(" ")
    logger.debug(" You can see the current default rule set here:")
    logger.debug("     https://github.com/mandiant/capa-rules")
    logger.debug("-" * 80)

    rules_path = os.path.join(get_default_root(), "rules")
    logger.debug("rule path: %s", rules_path)
    rules = get_rules(rules_path)
    rules = capa.rules.RuleSet(rules)

    meta = capa.ida.helpers.collect_metadata()

    capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
    meta["analysis"].update(counts)

    if has_file_limitation(rules, capabilities, is_standalone=False):
        capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")

    colorama.init(strip=True)
    print(capa.render.default.render(meta, rules, capabilities))


def is_runtime_ida():
    try:
        import idc
    except ImportError:
        return False
    else:
        return True


if __name__ == "__main__":
    if is_runtime_ida():
        ida_main()
    else:
        sys.exit(main())

适用于capa4.0.1

capa.ana.py

#!/usr/bin/env python3 
# -*- coding: utf-8 -*-
# @Time    : 2021/12/14 13:24
# @Author  : wing

import re
import sys
import json
from capa.main import main as capa_main

def Parsing_MBC_data(rules_json):
    import copy
    mbc_all = []

    for capability_key,capability_value in rules_json.items():

        rules_meta_json = capability_value['meta']
        rules_meta_mbc_list = rules_meta_json['mbc']
        if rules_meta_mbc_list:
            for rules_meta_mbc_dict in rules_meta_mbc_list:
                mbc_child = []
                mbc_objective = rules_meta_mbc_dict['objective']
                mbc_child.append(mbc_objective)

                mbc_behavior = rules_meta_mbc_dict['behavior']
                mbc_child.append(mbc_behavior)

                mbc_method = rules_meta_mbc_dict['method']
                mbc_child.append(mbc_method)

                mbc_all.append(copy.deepcopy(mbc_child))

    return mbc_all

def Parsing_ATT_CK_data(rules_json):
    import copy
    ttp_all = []

    for capability_key,capability_value in rules_json.items():

        rules_meta_json = capability_value['meta']
        rules_meta_ttp_list = rules_meta_json['attack']
        if rules_meta_ttp_list:
            for rules_meta_ttp_dict in rules_meta_ttp_list:
                ttp_child = []
                ttp_tactic = rules_meta_ttp_dict['tactic']
                ttp_child.append(ttp_tactic)

                ttp_technique = rules_meta_ttp_dict['technique']
                ttp_child.append(ttp_technique)

                ttp_subtechnique = rules_meta_ttp_dict['subtechnique']
                ttp_child.append(ttp_subtechnique)

                ttp_all.append(copy.deepcopy(ttp_child))

    return ttp_all

def find_api(object):
    match_func_list = []
    if hasattr(object,'children'):
        object_child_list = object.children

        if not object_child_list and hasattr(object,'statement'):
            if hasattr(object.statement,'value'):
                match_func = object.statement.value
            elif hasattr(object.statement,'child'):
                match_func = object.statement.child.value
            match_func_list.append(match_func)
            return match_func_list
        else:
            for object_child_obj in object_child_list:
                if object_child_obj.success:
                    match_func_list_ = find_api(object_child_obj)
                    match_func_list += match_func_list_
            return match_func_list
    else:
        return []

def Parsing_CAPABILITY_data(rules_json,capabilities):
    import copy

    CAPABILITY_list = []
    for capability in rules_json:
        Child_CAPABILITY_list = []
        Child_CAPABILITY_list.append(capability)

        if capability == "contains PDB path":
            pdb_dict = rules_json[capability]
            match_pdb_dict = pdb_dict['matches']['0']['node']['feature']['matches']
            for match_pdb in match_pdb_dict:
                Child_CAPABILITY_list.append(match_pdb)
            CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))
        else:
            capability_child_list = capabilities[capability]
            for capability_child in capability_child_list:
                capability_child_child_obj_children_list = capability_child[1].children
                for capability_child_child_obj_children_child in capability_child_child_obj_children_list:
                    if capability_child_child_obj_children_child.success:
                        if not capability_child_child_obj_children_child.children:
                            if hasattr(capability_child_child_obj_children_child.statement,'value'):
                                match_func = capability_child_child_obj_children_child.statement.value
                                if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
                                    Child_CAPABILITY_list.append(match_func)
                        else:
                            match_func_list = find_api(capability_child_child_obj_children_child)
                            for match_func in match_func_list:
                                if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
                                    Child_CAPABILITY_list.append(match_func)
            CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))

    return CAPABILITY_list

def Parsing_basic_data(meta_josn):
    basic_data = []
    rules_meta_sample = meta_josn['sample']

    sample_path = rules_meta_sample['path']
    basic_data.append(sample_path)

    sample_md5 = rules_meta_sample['md5']
    basic_data.append(sample_md5)

    sample_sha1 = rules_meta_sample['sha1']
    basic_data.append(sample_sha1)

    sample_sha256 = rules_meta_sample['sha256']
    basic_data.append(sample_sha256)

    return basic_data

'''
格式化方式1
path: E:\\a.exe
md5: e48a1c9b14cb16e363899c4bc4f4d33f
sha1: 0dab1f59a6a3587cc801d810d60be5886fdd7445
sha256: a593b585bd61d23c03fbfe5b31aadc157961d4b65afa0860dd57692a283628c3
'''
def Format_Basic_data_type1(basic_data,file_path=None, con_print=True):
    sample_path = basic_data[0]
    sample_md5 = basic_data[1]
    sample_sha1 = basic_data[2]
    sample_sha256 = basic_data[3]

    if con_print:
        print('path: '+sample_path)
        print('md5: '+sample_md5)
        print('sha1: '+sample_sha1)
        print('sha256: '+sample_sha256)
        print('\n')
    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            file_handle.write('path: '+sample_path+'\n')
            file_handle.write('md5: '+sample_md5+'\n')
            file_handle.write('sha1: '+sample_sha1+'\n')
            file_handle.write('sha256: '+sample_sha256+'\n')
            file_handle.write('\n')


'''
格式化方式1
write file on Windows
   ZwWriteFile
contains PDB path
    E:\\8168\\vc98\\self\\bin\\x86\\c1.pdb
'''
def Format_CAPABILITY_data_type1(CAPABILITY_list, file_path=None, con_print=True):
    if con_print:
        for i in CAPABILITY_list:
            if len(i)==1:
                print('*'+i[0])
            else:
                print('*'+i[0])
                other_list = i[1:]
                for j in other_list:
                    print('\t'+str(j))
        print('\r\n')
    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            for i in CAPABILITY_list:
                if len(i) == 1:
                    file_handle.write('*'+i[0]+'\n')
                else:
                    file_handle.write('*'+i[0] + '\n')
                    other_list = i[1:]
                    for j in other_list:
                        file_handle.write('\t' + str(j)+'\n')
            file_handle.write('\n')

'''
格式化方式1
objective: Operating System
    behavior: Registry
        method: Create Registry Key
objective: Operating System
    behavior: Registry
        method: Open Registry Key
'''
def Format_MBC_data_type1(mbc_all, file_path=None, con_print=True):
    if con_print:
        for mbc_list in mbc_all:
            print('*'+'objective: '+mbc_list[0])
            print('\t'+'behavior: '+mbc_list[1])
            if mbc_list[2]:
                print('\t\t'+'method: '+mbc_list[2])
        print('\r\n')

    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            for mbc_list in mbc_all:
                file_handle.write('*'+'objective: ' + mbc_list[0]+'\n')
                file_handle.write('\t' + 'behavior: ' + mbc_list[1]+'\n')
                if mbc_list[2]:
                    file_handle.write('\t\t' + 'method: ' + mbc_list[2]+'\n')
            file_handle.write('\n')


'''
格式化方式1
tactic: Discovery
    technique: System Information Discovery
        subtechnique: 
'''
def Format_TTP_data_type1(ttp_all, file_path=None, con_print=True):
    if con_print:
        for ttp_list in ttp_all:
            print('*'+'tactic: '+ttp_list[0])
            print('\t'+'technique: '+ttp_list[1])
            if ttp_list[2]:
                print('\t\t'+'subtechnique: '+ttp_list[2])
        print('\r\n')
    else:
        with open(file_path,'a',encoding='utf-8') as file_handle:
            for ttp_list in ttp_all:
                file_handle.write('*'+'tactic: ' + ttp_list[0]+'\n')
                file_handle.write('\t' + 'technique: ' + ttp_list[1]+'\n')
                if ttp_list[2]:
                    file_handle.write('\t\t' + 'subtechnique: ' + ttp_list[2]+'\n')
            file_handle.write('\n')


def capa_ana_main(file_path,rules_path="C:\\capa_config\\rules_20220922",sigs_path="C:\\capa_config\\sigs",print_sig=True,log_path=None,ttp_sig=True,mbc_sig=True,cap_sig=True):
    argv_json = ['-r', rules_path, '-s', sigs_path,'-j', file_path]
    argv_v = ['-r', rules_path, '-s', sigs_path,'-v', file_path]
    argv_vv = ['-r', rules_path, '-s', sigs_path,'-vv', file_path]
    argv = argv_json

    results, capabilities = capa_main(argv=argv)
    results_json = json.loads(results)
    rules_json = results_json['rules']
    meta_josn = results_json['meta']

    basic_data = Parsing_basic_data(meta_josn)
    Format_Basic_data_type1(basic_data,con_print=print_sig,file_path=log_path)

    if ttp_sig:
        ttp_all = Parsing_ATT_CK_data(rules_json)
        Format_TTP_data_type1(ttp_all,con_print=print_sig,file_path=log_path)

    if mbc_sig:
        mbc_all = Parsing_MBC_data(rules_json)
        Format_MBC_data_type1(mbc_all,con_print=print_sig,file_path=log_path)

    if cap_sig:
        CAPABILITY_list = Parsing_CAPABILITY_data(rules_json,capabilities)
        Format_CAPABILITY_data_type1(CAPABILITY_list,con_print=print_sig,file_path=log_path)


if __name__ == '__main__':

    import os,sys
    base_dir = r"E:\恶意代码分析\1-日常狩猎\APT28\20220919-from慧眼"
    log_path = os.path.join(base_dir,'capa_ana_results.log')
    base_handle = os.walk(base_dir)
    for root,dir_list,file_list in base_handle:
        for file in file_list:
            if len(file)==32 or len(file)==40 or len(file)==64:
                file_path = os.path.join(root,file)
                capa_ana_main(file_path=file_path,print_sig=False,log_path=log_path,mbc_sig=False,cap_sig=False)
                #默认设置file_path,rules_path="C:\\capa_config\\rules_20220922",sigs_path="C:\\capa_config\\sigs",print_sig=True,log_path=None,ttp_sig=True,mbc_sig=True,cap_sig=True

    sys.exit(0)

 

 

capa.main.py

#!/usr/bin/env python3
"""
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import time
import hashlib
import logging
import os.path
import argparse
import datetime
import textwrap
import warnings
import itertools
import contextlib
import collections
from typing import Any, Dict, List, Tuple

import halo
import tqdm
import colorama
from pefile import PEFormatError
from elftools.common.exceptions import ELFError

import capa.perf
import capa.rules
import capa.engine
import capa.version
import capa.render.json
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze
import capa.render.vverbose
import capa.features.extractors
import capa.features.extractors.common
import capa.features.extractors.pefile
import capa.features.extractors.dnfile_
import capa.features.extractors.elffile
import capa.features.extractors.dotnetfile
import capa.features.extractors.base_extractor
from capa.rules import Rule, Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import (
    get_format,
    get_file_taste,
    get_auto_format,
    log_unsupported_os_error,
    log_unsupported_arch_error,
    log_unsupported_format_error,
)
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError
from capa.features.common import (
    FORMAT_PE,
    FORMAT_ELF,
    FORMAT_AUTO,
    FORMAT_SC32,
    FORMAT_SC64,
    FORMAT_DOTNET,
    FORMAT_FREEZE,
)
from capa.features.address import NO_ADDRESS
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor

RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
BACKEND_VIV = "vivisect"
BACKEND_SMDA = "smda"
BACKEND_DOTNET = "dotnet"

E_MISSING_RULES = -10
E_MISSING_FILE = -11
E_INVALID_RULE = -12
E_CORRUPT_FILE = -13
E_FILE_LIMITATION = -14
E_INVALID_SIG = -15
E_INVALID_FILE_TYPE = -16
E_INVALID_FILE_ARCH = -17
E_INVALID_FILE_OS = -18
E_UNSUPPORTED_IDA_VERSION = -19

logger = logging.getLogger("capa")


@contextlib.contextmanager
def timing(msg: str):
    t0 = time.time()
    yield
    t1 = time.time()
    logger.debug("perf: %s: %0.2fs", msg, t1 - t0)


def set_vivisect_log_level(level):
    logging.getLogger("vivisect").setLevel(level)
    logging.getLogger("vivisect.base").setLevel(level)
    logging.getLogger("vivisect.impemu").setLevel(level)
    logging.getLogger("vtrace").setLevel(level)
    logging.getLogger("envi").setLevel(level)
    logging.getLogger("envi.codeflow").setLevel(level)
    logging.getLogger("Elf").setLevel(level)


def find_instruction_capabilities(
    ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Tuple[FeatureSet, MatchResults]:
    """
    find matches for the given rules for the given instruction.

    returns: tuple containing (features for instruction, match results for instruction)
    """
    # all features found for the instruction.
    features = collections.defaultdict(set)  # type: FeatureSet

    for feature, addr in itertools.chain(
        extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
    ):
        features[feature].add(addr)

    # matches found at this instruction.
    _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address)

    for rule_name, res in matches.items():
        rule = ruleset[rule_name]
        for addr, _ in res:
            capa.engine.index_rule_matches(features, rule, [addr])

    return features, matches


def find_basic_block_capabilities(
    ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
    """
    find matches for the given rules within the given basic block.

    returns: tuple containing (features for basic block, match results for basic block, match results for instructions)
    """
    # all features found within this basic block,
    # includes features found within instructions.
    features = collections.defaultdict(set)  # type: FeatureSet

    # matches found at the instruction scope.
    # might be found at different instructions, thats ok.
    insn_matches = collections.defaultdict(list)  # type: MatchResults

    for insn in extractor.get_instructions(f, bb):
        ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn)
        for feature, vas in ifeatures.items():
            features[feature].update(vas)

        for rule_name, res in imatches.items():
            insn_matches[rule_name].extend(res)

    for feature, va in itertools.chain(
        extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
    ):
        features[feature].add(va)

    # matches found within this basic block.
    _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address)

    for rule_name, res in matches.items():
        rule = ruleset[rule_name]
        for va, _ in res:
            capa.engine.index_rule_matches(features, rule, [va])

    return features, matches, insn_matches


def find_code_capabilities(
    ruleset: RuleSet, extractor: FeatureExtractor, fh: FunctionHandle
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
    """
    find matches for the given rules within the given function.

    returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features)
    """
    # all features found within this function,
    # includes features found within basic blocks (and instructions).
    function_features = collections.defaultdict(set)  # type: FeatureSet

    # matches found at the basic block scope.
    # might be found at different basic blocks, thats ok.
    bb_matches = collections.defaultdict(list)  # type: MatchResults

    # matches found at the instruction scope.
    # might be found at different instructions, thats ok.
    insn_matches = collections.defaultdict(list)  # type: MatchResults

    for bb in extractor.get_basic_blocks(fh):
        features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb)
        for feature, vas in features.items():
            function_features[feature].update(vas)

        for rule_name, res in bmatches.items():
            bb_matches[rule_name].extend(res)

        for rule_name, res in imatches.items():
            insn_matches[rule_name].extend(res)

    for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()):
        function_features[feature].add(va)

    _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address)
    return function_matches, bb_matches, insn_matches, len(function_features)


def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
    file_features = collections.defaultdict(set)  # type: FeatureSet

    for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
        # not all file features may have virtual addresses.
        # if not, then at least ensure the feature shows up in the index.
        # the set of addresses will still be empty.
        if va:
            file_features[feature].add(va)
        else:
            if feature not in file_features:
                file_features[feature] = set()

    logger.debug("analyzed file and extracted %d features", len(file_features))

    file_features.update(function_features)

    _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
    return matches, len(file_features)


def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
    all_function_matches = collections.defaultdict(list)  # type: MatchResults
    all_bb_matches = collections.defaultdict(list)  # type: MatchResults
    all_insn_matches = collections.defaultdict(list)  # type: MatchResults

    meta = {
        "feature_counts": {
            "file": 0,
            "functions": {},
        },
        "library_functions": {},
    }  # type: Dict[str, Any]

    pbar = tqdm.tqdm
    if disable_progress:
        # do not use tqdm to avoid unnecessary side effects when caller intends
        # to disable progress completely
        pbar = lambda s, *args, **kwargs: s

    functions = list(extractor.get_functions())
    n_funcs = len(functions)

    # 修改 for f in pb: 修改为 for f in functions:  去掉加载条 注释pb
    # pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions")
    for f in functions:
        if extractor.is_library_function(f.address):
            function_name = extractor.get_function_name(f.address)
            logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
            meta["library_functions"][f.address] = function_name
            # 修改 注释pb(进度条)相关
            # n_libs = len(meta["library_functions"])
            # percentage = 100 * (n_libs / n_funcs)
            # if isinstance(pb, tqdm.tqdm):
            #     pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
            continue

        function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
        meta["feature_counts"]["functions"][f.address] = feature_count
        logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count)

        for rule_name, res in function_matches.items():
            all_function_matches[rule_name].extend(res)
        for rule_name, res in bb_matches.items():
            all_bb_matches[rule_name].extend(res)
        for rule_name, res in insn_matches.items():
            all_insn_matches[rule_name].extend(res)

    # collection of features that captures the rule matches within function, BB, and instruction scopes.
    # mapping from feature (matched rule) to set of addresses at which it matched.
    function_and_lower_features: FeatureSet = collections.defaultdict(set)
    for rule_name, results in itertools.chain(
        all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items()
    ):
        locations = set(map(lambda p: p[0], results))
        rule = ruleset[rule_name]
        capa.engine.index_rule_matches(function_and_lower_features, rule, locations)

    all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
    meta["feature_counts"]["file"] = feature_count

    matches = {
        rule_name: results
        for rule_name, results in itertools.chain(
            # each rule exists in exactly one scope,
            # so there won't be any overlap among these following MatchResults,
            # and we can merge the dictionaries naively.
            all_insn_matches.items(),
            all_bb_matches.items(),
            all_function_matches.items(),
            all_file_matches.items(),
        )
    }

    return matches, meta


# TODO move all to helpers?
def has_rule_with_namespace(rules, capabilities, rule_cat):
    for rule_name in capabilities.keys():
        if rules.rules[rule_name].meta.get("namespace", "").startswith(rule_cat):
            return True
    return False


def is_internal_rule(rule: Rule) -> bool:
    return rule.meta.get("namespace", "").startswith("internal/")


def is_file_limitation_rule(rule: Rule) -> bool:
    return rule.meta.get("namespace", "") == "internal/limitation/file"


def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
    file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values()))

    for file_limitation_rule in file_limitation_rules:
        if file_limitation_rule.name not in capabilities:
            continue

        # 修改logger.warning 修改为logger.debug
        logger.debug("-" * 80)

        for line in file_limitation_rule.meta.get("description", "").split("\n"):
            # 修改logger.warning 修改为logger.debug
            logger.debug(" " + line)
        # 修改logger.warning 修改为logger.debug
        logger.debug(" Identified via rule: %s", file_limitation_rule.name)
        if is_standalone:
            # 修改logger.warning 修改为logger.debug
            logger.debug(" ")
            # 修改logger.warning 修改为logger.debug
            logger.debug(" Use -v or -vv if you really want to see the capabilities identified by capa.")
        # 修改logger.warning 修改为logger.debug
        logger.debug("-" * 80)

        # bail on first file limitation
        return True

    return False


def is_supported_format(sample: str) -> bool:
    """
    Return if this is a supported file based on magic header values
    """
    with open(sample, "rb") as f:
        taste = f.read(0x100)

    return len(list(capa.features.extractors.common.extract_format(taste))) == 1


def is_supported_arch(sample: str) -> bool:
    with open(sample, "rb") as f:
        buf = f.read()

    return len(list(capa.features.extractors.common.extract_arch(buf))) == 1


def get_arch(sample: str) -> str:
    with open(sample, "rb") as f:
        buf = f.read()

    for feature, _ in capa.features.extractors.common.extract_arch(buf):
        assert isinstance(feature.value, str)
        return feature.value

    return "unknown"


def is_supported_os(sample: str) -> bool:
    with open(sample, "rb") as f:
        buf = f.read()

    return len(list(capa.features.extractors.common.extract_os(buf))) == 1


def get_os(sample: str) -> str:
    with open(sample, "rb") as f:
        buf = f.read()

    for feature, _ in capa.features.extractors.common.extract_os(buf):
        assert isinstance(feature.value, str)
        return feature.value

    return "unknown"


def get_meta_str(vw):
    """
    Return workspace meta information string
    """
    meta = []
    for k in ["Format", "Platform", "Architecture"]:
        if k in vw.metadata:
            meta.append("%s: %s" % (k.lower(), vw.metadata[k]))
    return "%s, number of functions: %d" % (", ".join(meta), len(vw.getFunctions()))


def is_running_standalone() -> bool:
    """
    are we running from a PyInstaller'd executable?
    if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
    """
    return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")


def get_default_root() -> str:
    """
    get the file system path to the default resources directory.
    under PyInstaller, this comes from _MEIPASS.
    under source, this is the root directory of the project.
    """
    if is_running_standalone():
        # pylance/mypy don't like `sys._MEIPASS` because this isn't standard.
        # its injected by pyinstaller.
        # so we'll fetch this attribute dynamically.
        return getattr(sys, "_MEIPASS")
    else:
        return os.path.join(os.path.dirname(__file__), "..")


def get_default_signatures() -> List[str]:
    """
    compute a list of file system paths to the default FLIRT signatures.
    """
    sigs_path = os.path.join(get_default_root(), "sigs")
    logger.debug("signatures path: %s", sigs_path)

    ret = []
    for root, dirs, files in os.walk(sigs_path):
        for file in files:
            if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")):
                continue

            ret.append(os.path.join(root, file))

    return ret


def get_workspace(path, format_, sigpaths):
    """
    load the program at the given path into a vivisect workspace using the given format.
    also apply the given FLIRT signatures.

    supported formats:
      - pe
      - elf
      - shellcode 32-bit
      - shellcode 64-bit
      - auto

    this creates and analyzes the workspace; however, it does *not* save the workspace.
    this is the responsibility of the caller.
    """

    # lazy import enables us to not require viv if user wants SMDA, for example.
    import viv_utils

    logger.debug("generating vivisect workspace for: %s", path)
    # TODO should not be auto at this point, anymore
    if format_ == FORMAT_AUTO:
        if not is_supported_format(path):
            raise UnsupportedFormatError()

        # don't analyze, so that we can add our Flirt function analyzer first.
        vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
    elif format_ in {FORMAT_PE, FORMAT_ELF}:
        vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
    elif format_ == FORMAT_SC32:
        # these are not analyzed nor saved.
        vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="i386", analyze=False)
    elif format_ == FORMAT_SC64:
        vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="amd64", analyze=False)
    else:
        raise ValueError("unexpected format: " + format_)

    viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths)

    vw.analyze()

    logger.debug("%s", get_meta_str(vw))
    return vw


# TODO get_extractors -> List[FeatureExtractor]?
def get_extractor(
    path: str, format_: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
) -> FeatureExtractor:
    """
    raises:
      UnsupportedFormatError
      UnsupportedArchError
      UnsupportedOSError
    """
    if format_ not in (FORMAT_SC32, FORMAT_SC64):
        if not is_supported_format(path):
            raise UnsupportedFormatError()

        if not is_supported_arch(path):
            raise UnsupportedArchError()

        if not is_supported_os(path):
            raise UnsupportedOSError()

    if format_ == FORMAT_DOTNET:
        import capa.features.extractors.dnfile.extractor

        return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)

    if backend == "smda":
        from smda.SmdaConfig import SmdaConfig
        from smda.Disassembler import Disassembler

        import capa.features.extractors.smda.extractor

        logger.warning("Deprecation warning: v4.0 will be the last capa version to support the SMDA backend.")
        warnings.warn("v4.0 will be the last capa version to support the SMDA backend.", DeprecationWarning)
        smda_report = None
        with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
            config = SmdaConfig()
            config.STORE_BUFFER = True
            smda_disasm = Disassembler(config)
            smda_report = smda_disasm.disassembleFile(path)

        return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path)
    else:
        import capa.features.extractors.viv.extractor

        # 修改 删除旋转提示 with下面向左缩进
        # with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
        vw = get_workspace(path, format_, sigpaths)

        if should_save_workspace:
            logger.debug("saving workspace")
            try:
                vw.saveWorkspace()
            except IOError:
                # see #168 for discussion around how to handle non-writable directories
                logger.info("source directory is not writable, won't save intermediate workspace")
        else:
            logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

        return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)


def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]:
    file_extractors: List[FeatureExtractor] = list()

    if format_ == capa.features.extractors.common.FORMAT_PE:
        file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample))

        dnfile_extractor = capa.features.extractors.dnfile_.DnfileFeatureExtractor(sample)
        if dnfile_extractor.is_dotnet_file():
            file_extractors.append(dnfile_extractor)

    elif format_ == capa.features.extractors.common.FORMAT_ELF:
        file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample))

    return file_extractors


def is_nursery_rule_path(path: str) -> bool:
    """
    The nursery is a spot for rules that have not yet been fully polished.
    For example, they may not have references to public example of a technique.
    Yet, we still want to capture and report on their matches.
    The nursery is currently a subdirectory of the rules directory with that name.

    When nursery rules are loaded, their metadata section should be updated with:
      `nursery=True`.
    """
    return "nursery" in path


def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
    rule_file_paths = []
    for rule_path in rule_paths:
        if not os.path.exists(rule_path):
            raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)

        if os.path.isfile(rule_path):
            rule_file_paths.append(rule_path)
        elif os.path.isdir(rule_path):
            logger.debug("reading rules from directory %s", rule_path)
            for root, dirs, files in os.walk(rule_path):
                if ".git" in root:
                    # the .github directory contains CI config in capa-rules
                    # this includes some .yml files
                    # these are not rules
                    # additionally, .git has files that are not .yml and generate the warning
                    # skip those too
                    continue
                for file in files:
                    if not file.endswith(".yml"):
                        if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
                            # expect to see .git* files, readme.md, format.md, and maybe a .git directory
                            # other things maybe are rules, but are mis-named.
                            logger.warning("skipping non-.yml file: %s", file)
                        continue
                    rule_path = os.path.join(root, file)
                    rule_file_paths.append(rule_path)

    rules = []  # type: List[Rule]

    pbar = tqdm.tqdm
    if disable_progress:
        # do not use tqdm to avoid unnecessary side effects when caller intends
        # to disable progress completely
        pbar = lambda s, *args, **kwargs: s

    # 修改 删除进度条
    # for rule_file_path in pbar(list(rule_file_paths), desc="loading ", unit=" rules"):
    for rule_file_path in rule_file_paths:
        try:
            rule = capa.rules.Rule.from_yaml_file(rule_file_path)
        except capa.rules.InvalidRule:
            raise
        else:
            rule.meta["capa/path"] = rule_file_path
            if is_nursery_rule_path(rule_file_path):
                rule.meta["capa/nursery"] = True

            rules.append(rule)
            logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)

    return rules


def get_signatures(sigs_path):
    if not os.path.exists(sigs_path):
        raise IOError("signatures path %s does not exist or cannot be accessed" % sigs_path)

    paths = []
    if os.path.isfile(sigs_path):
        paths.append(sigs_path)
    elif os.path.isdir(sigs_path):
        logger.debug("reading signatures from directory %s", os.path.abspath(os.path.normpath(sigs_path)))
        for root, dirs, files in os.walk(sigs_path):
            for file in files:
                if file.endswith((".pat", ".pat.gz", ".sig")):
                    sig_path = os.path.join(root, file)
                    paths.append(sig_path)

    # nicely normalize and format path so that debugging messages are clearer
    paths = [os.path.abspath(os.path.normpath(path)) for path in paths]

    # load signatures in deterministic order: the alphabetic sorting of filename.
    # this means that `0_sigs.pat` loads before `1_sigs.pat`.
    paths = sorted(paths, key=os.path.basename)

    for path in paths:
        logger.debug("found signature file: %s", path)

    return paths


def collect_metadata(
    argv: List[str],
    sample_path: str,
    rules_path: List[str],
    extractor: capa.features.extractors.base_extractor.FeatureExtractor,
):
    md5 = hashlib.md5()
    sha1 = hashlib.sha1()
    sha256 = hashlib.sha256()

    with open(sample_path, "rb") as f:
        buf = f.read()

    md5.update(buf)
    sha1.update(buf)
    sha256.update(buf)

    if rules_path != [RULES_PATH_DEFAULT_STRING]:
        rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path]

    format_ = get_format(sample_path)
    arch = get_arch(sample_path)
    os_ = get_os(sample_path)

    return {
        "timestamp": datetime.datetime.now().isoformat(),
        "version": capa.version.__version__,
        "argv": argv,
        "sample": {
            "md5": md5.hexdigest(),
            "sha1": sha1.hexdigest(),
            "sha256": sha256.hexdigest(),
            "path": os.path.normpath(sample_path),
        },
        "analysis": {
            "format": format_,
            "arch": arch,
            "os": os_,
            "extractor": extractor.__class__.__name__,
            "rules": rules_path,
            "base_address": extractor.get_base_address(),
            "layout": {
                # this is updated after capabilities have been collected.
                # will look like:
                #
                # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
            },
        },
    }


def compute_layout(rules, extractor, capabilities):
    """
    compute a metadata structure that links basic blocks
    to the functions in which they're found.

    only collect the basic blocks at which some rule matched.
    otherwise, we may pollute the json document with
    a large amount of un-referenced data.
    """
    functions_by_bb = {}
    bbs_by_function = {}
    for f in extractor.get_functions():
        bbs_by_function[f.address] = []
        for bb in extractor.get_basic_blocks(f):
            functions_by_bb = f.address
            bbs_by_function[f.address].append(bb.address)

    matched_bbs = set()
    for rule_name, matches in capabilities.items():
        rule = rules[rule_name]
        if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
            for (addr, match) in matches:
                assert addr in functions_by_bb
                matched_bbs.add(addr)

    layout = {
        "functions": {
            f: {
                "matched_basic_blocks": 
                # this object is open to extension in the future,
                # such as with the function name, etc.
            }
            for f, bbs in bbs_by_function.items()
        }
    }

    return layout


def install_common_args(parser, wanted=None):
    """
    register a common set of command line arguments for re-use by main & scripts.
    these are things like logging/coloring/etc.
    also enable callers to opt-in to common arguments, like specifying the input sample.

    this routine lets many script use the same language for cli arguments.
    see `handle_common_args` to do common configuration.

    args:
      parser (argparse.ArgumentParser): a parser to update in place, adding common arguments.
      wanted (Set[str]): collection of arguments to opt-into, including:
        - "sample": required positional argument to input file.
        - "format": flag to override file format.
        - "backend": flag to override analysis backend.
        - "rules": flag to override path to capa rules.
        - "tag": flag to override/specify which rules to match.
    """
    if wanted is None:
        wanted = set()

    #
    # common arguments that all scripts will have
    #

    parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__))
    parser.add_argument(
        "-v", "--verbose", action="store_true", help="enable verbose result document (no effect with --json)"
    )
    parser.add_argument(
        "-vv", "--vverbose", action="store_true", help="enable very verbose result document (no effect with --json)"
    )
    parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
    parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors")
    parser.add_argument(
        "--color",
        type=str,
        choices=("auto", "always", "never"),
        default="auto",
        help="enable ANSI color codes in results, default: only during interactive session",
    )

    #
    # arguments that may be opted into:
    #
    #   - sample
    #   - format
    #   - rules
    #   - tag
    #

    if "sample" in wanted:
        parser.add_argument(
            "sample",
            type=str,
            help="path to sample to analyze",
        )

    if "format" in wanted:
        formats = [
            (FORMAT_AUTO, "(default) detect file type automatically"),
            (FORMAT_PE, "Windows PE file"),
            (FORMAT_DOTNET, ".NET PE file"),
            (FORMAT_ELF, "Executable and Linkable Format"),
            (FORMAT_SC32, "32-bit shellcode"),
            (FORMAT_SC64, "64-bit shellcode"),
            (FORMAT_FREEZE, "features previously frozen by capa"),
        ]
        format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
        parser.add_argument(
            "-f",
            "--format",
            choices=[f[0] for f in formats],
            default=FORMAT_AUTO,
            help="select sample format, %s" % format_help,
        )

        if "backend" in wanted:
            parser.add_argument(
                "-b",
                "--backend",
                type=str,
                help="select the backend to use",
                choices=(BACKEND_VIV, BACKEND_SMDA),
                default=BACKEND_VIV,
            )

    if "rules" in wanted:
        parser.add_argument(
            "-r",
            "--rules",
            type=str,
            default=[RULES_PATH_DEFAULT_STRING],
            action="append",
            help="path to rule file or directory, use embedded rules by default",
        )

    if "signatures" in wanted:
        parser.add_argument(
            "-s",
            "--signatures",
            type=str,
            default=SIGNATURES_PATH_DEFAULT_STRING,
            help="path to .sig/.pat file or directory used to identify library functions, use embedded signatures by default",
        )

    if "tag" in wanted:
        parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values")


def handle_common_args(args):
    """
    handle the global config specified by `install_common_args`,
    such as configuring logging/coloring/etc.
    the following fields will be overwritten when present:
      - rules: file system path to rule files.
      - signatures: file system path to signature files.

    args:
      args (argparse.Namespace): parsed arguments that included at least `install_common_args` args.
    """
    if args.quiet:
        logging.basicConfig(level=logging.WARNING)
        logging.getLogger().setLevel(logging.WARNING)
    elif args.debug:
        logging.basicConfig(level=logging.DEBUG)
        logging.getLogger().setLevel(logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
        logging.getLogger().setLevel(logging.INFO)

    # disable vivisect-related logging, it's verbose and not relevant for capa users
    set_vivisect_log_level(logging.CRITICAL)

    # Since Python 3.8 cp65001 is an alias to utf_8, but not for Python < 3.8
    # TODO: remove this code when only supporting Python 3.8+
    # https://stackoverflow.com/a/3259271/87207
    import codecs

    codecs.register(lambda name: codecs.lookup("utf-8") if name == "cp65001" else None)

    if args.color == "always":
        colorama.init(strip=False)
    elif args.color == "auto":
        # colorama will detect:
        #  - when on Windows console, and fixup coloring, and
        #  - when not an interactive session, and disable coloring
        # renderers should use coloring and assume it will be stripped out if necessary.
        colorama.init()
    elif args.color == "never":
        colorama.init(strip=True)
    else:
        raise RuntimeError("unexpected --color value: " + args.color)

    if hasattr(args, "rules"):
        rules_paths: List[str] = []

        if args.rules == [RULES_PATH_DEFAULT_STRING]:
            logger.debug("-" * 80)
            logger.debug(" Using default embedded rules.")
            logger.debug(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/  /path/to/mal.exe`.")
            logger.debug(" You can see the current default rule set here:")
            logger.debug("     https://github.com/mandiant/capa-rules")
            logger.debug("-" * 80)

            default_rule_path = os.path.join(get_default_root(), "rules")

            if not os.path.exists(default_rule_path):
                # when a users installs capa via pip,
                # this pulls down just the source code - not the default rules.
                # i'm not sure the default rules should even be written to the library directory,
                # so in this case, we require the user to use -r to specify the rule directory.
                logger.error("default embedded rules not found! (maybe you installed capa as a library?)")
                logger.error("provide your own rule set via the `-r` option.")
                return E_MISSING_RULES

            rules_paths.append(default_rule_path)
        else:
            rules_paths = args.rules

            if RULES_PATH_DEFAULT_STRING in rules_paths:
                rules_paths.remove(RULES_PATH_DEFAULT_STRING)

            for rule_path in rules_paths:
                logger.debug("using rules path: %s", rule_path)

        args.rules = rules_paths

    if hasattr(args, "signatures"):
        if args.signatures == SIGNATURES_PATH_DEFAULT_STRING:
            logger.debug("-" * 80)
            logger.debug(" Using default embedded signatures.")
            logger.debug(
                " To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/  /path/to/mal.exe`."
            )
            logger.debug("-" * 80)

            sigs_path = os.path.join(get_default_root(), "sigs")
        else:
            sigs_path = args.signatures
            logger.debug("using signatures path: %s", sigs_path)

        args.signatures = sigs_path


def main(argv=None):
    if sys.version_info < (3, 7):
        raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.7+")

    if argv is None:
        argv = sys.argv[1:]

    desc = "The FLARE team's open-source tool to identify capabilities in executable files."
    epilog = textwrap.dedent(
        """
        By default, capa uses a default set of embedded rules.
        You can see the rule set here:
          https://github.com/mandiant/capa-rules

        To provide your own rule set, use the `-r` flag:
          capa  --rules /path/to/rules  suspicious.exe
          capa  -r      /path/to/rules  suspicious.exe

        examples:
          identify capabilities in a binary
            capa suspicious.exe

          identify capabilities in 32-bit shellcode, see `-f` for all supported formats
            capa -f sc32 shellcode.bin

          report match locations
            capa -v suspicious.exe

          report all feature match details
            capa -vv suspicious.exe

          filter rules by meta fields, e.g. rule name or namespace
            capa -t "create TCP socket" suspicious.exe
         """
    )

    parser = argparse.ArgumentParser(
        description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"})
    parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
    args = parser.parse_args(args=argv)
    ret = handle_common_args(args)
    if ret is not None and ret != 0:
        return ret

    try:
        _ = get_file_taste(args.sample)
    except IOError as e:
        # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
        # handle the IOError separately and reach into the args
        logger.error("%s", e.args[0])
        return E_MISSING_FILE

    format_ = args.format
    if format_ == FORMAT_AUTO:
        try:
            format_ = get_auto_format(args.sample)
        except UnsupportedFormatError:
            log_unsupported_format_error()
            return E_INVALID_FILE_TYPE

    try:
        rules = get_rules(args.rules, disable_progress=args.quiet)
        rules = capa.rules.RuleSet(rules)

        logger.debug(
            "successfully loaded %s rules",
            # during the load of the RuleSet, we extract subscope statements into their own rules
            # that are subsequently `match`ed upon. this inflates the total rule count.
            # so, filter out the subscope rules when reporting total number of loaded rules.
            len([i for i in filter(lambda r: not r.is_subscope_rule(), rules.rules.values())]),
        )
        if args.tag:
            rules = rules.filter_rules_by_meta(args.tag)
            logger.debug("selected %d rules", len(rules))
            for i, r in enumerate(rules.rules, 1):
                # TODO don't display subscope rules?
                logger.debug(" %d. %s", i, r)
    except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
        logger.error("%s", str(e))
        logger.error(
            "Please ensure you're using the rules that correspond to your major version of capa (%s)",
            capa.version.get_major_version(),
        )
        logger.error(
            "You can check out these rules with the following command:\n    %s",
            capa.version.get_rules_checkout_command(),
        )
        logger.error(
            "Or, for more details, see the rule set documentation here: %s",
            "https://github.com/mandiant/capa/blob/master/doc/rules.md",
        )
        return E_INVALID_RULE

    # file feature extractors are pretty lightweight: they don't do any code analysis.
    # so we can fairly quickly determine if the given file has "pure" file-scope rules
    # that indicate a limitation (like "file is packed based on section names")
    # and avoid doing a full code analysis on difficult/impossible binaries.
    #
    # this pass can inspect multiple file extractors, e.g., dotnet and pe to identify
    # various limitations
    try:
        file_extractors = get_file_extractors(args.sample, format_)
    except PEFormatError as e:
        logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
        return E_CORRUPT_FILE
    except (ELFError, OverflowError) as e:
        logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
        return E_CORRUPT_FILE

    for file_extractor in file_extractors:
        try:
            pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {})
        except PEFormatError as e:
            logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
            return E_CORRUPT_FILE
        except (ELFError, OverflowError) as e:
            logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
            return E_CORRUPT_FILE

        if isinstance(file_extractor, capa.features.extractors.dnfile_.DnfileFeatureExtractor):
            format_ = FORMAT_DOTNET

        # file limitations that rely on non-file scope won't be detected here.
        # nor on FunctionName features, because pefile doesn't support this.
        if has_file_limitation(rules, pure_file_capabilities):
            # bail if capa encountered file limitation e.g. a packed binary
            # do show the output in verbose mode, though.
            if not (args.verbose or args.vverbose or args.json):
                logger.debug("file limitation short circuit, won't analyze fully.")
                return E_FILE_LIMITATION

    if format_ == FORMAT_FREEZE:
        with open(args.sample, "rb") as f:
            extractor = capa.features.freeze.load(f.read())
    else:
        try:
            if format_ == FORMAT_PE:
                sig_paths = get_signatures(args.signatures)
            else:
                sig_paths = []
                logger.debug("skipping library code matching: only have native PE signatures")
        except IOError as e:
            logger.error("%s", str(e))
            return E_INVALID_SIG

        should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)

        try:
            extractor = get_extractor(
                args.sample, format_, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
            )
        except UnsupportedFormatError:
            log_unsupported_format_error()
            return E_INVALID_FILE_TYPE
        except UnsupportedArchError:
            log_unsupported_arch_error()
            return E_INVALID_FILE_ARCH
        except UnsupportedOSError:
            log_unsupported_os_error()
            return E_INVALID_FILE_OS

    meta = collect_metadata(argv, args.sample, args.rules, extractor)

    capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
    meta["analysis"].update(counts)
    meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)

    if has_file_limitation(rules, capabilities):
        # bail if capa encountered file limitation e.g. a packed binary
        # do show the output in verbose mode, though.
        if not (args.verbose or args.vverbose or args.json):
            return E_FILE_LIMITATION

    # 修改 将打印换成返回  capabilities是字典
    if args.json:
        results = capa.render.json.render(meta, rules, capabilities)
    elif args.vverbose:
        results = capa.render.vverbose.render(meta, rules, capabilities)
    elif args.verbose:
        results = capa.render.verbose.render(meta, rules, capabilities)
    else:
        results = capa.render.default.render(meta, rules, capabilities)
    colorama.deinit()

    logger.debug("done.")

    # 修改 return 0 为 return results,capabilities
    # return 0
    return results, capabilities


def ida_main():
    import capa.rules
    import capa.ida.helpers
    import capa.render.default
    import capa.features.extractors.ida.extractor

    logging.basicConfig(level=logging.INFO)
    logging.getLogger().setLevel(logging.INFO)

    if not capa.ida.helpers.is_supported_ida_version():
        return E_UNSUPPORTED_IDA_VERSION

    if not capa.ida.helpers.is_supported_file_type():
        return E_INVALID_FILE_TYPE

    logger.debug("-" * 80)
    logger.debug(" Using default embedded rules.")
    logger.debug(" ")
    logger.debug(" You can see the current default rule set here:")
    logger.debug("     https://github.com/mandiant/capa-rules")
    logger.debug("-" * 80)

    rules_path = os.path.join(get_default_root(), "rules")
    logger.debug("rule path: %s", rules_path)
    rules = get_rules(rules_path)
    rules = capa.rules.RuleSet(rules)

    meta = capa.ida.helpers.collect_metadata([rules_path])

    capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
    meta["analysis"].update(counts)

    if has_file_limitation(rules, capabilities, is_standalone=False):
        capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")

    colorama.init(strip=True)
    print(capa.render.default.render(meta, rules, capabilities))


def is_runtime_ida():
    try:
        import idc
    except ImportError:
        return False
    else:
        return True


if __name__ == "__main__":
    if is_runtime_ida():
        ida_main()
    else:
        sys.exit(main())

云涯历险记 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:恶意代码技术理论:修改capa输出自定义消息
喜欢 (0)