flare-capa模块
适用于capa3.0.3
capa_ana.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2021/12/14 13:24
# @Author : wing
import re
import sys
import json
from capa.main import main as capa_main
def Parsing_MBC_data(rules_json):
import copy
mbc_all = []
for capability_key,capability_value in rules_json.items():
rules_meta_json = capability_value['meta']
rules_meta_mbc_list = rules_meta_json['mbc']
if rules_meta_mbc_list:
for rules_meta_mbc_dict in rules_meta_mbc_list:
mbc_child = []
mbc_objective = rules_meta_mbc_dict['objective']
mbc_child.append(mbc_objective)
mbc_behavior = rules_meta_mbc_dict['behavior']
mbc_child.append(mbc_behavior)
mbc_method = rules_meta_mbc_dict['method']
mbc_child.append(mbc_method)
mbc_all.append(copy.deepcopy(mbc_child))
return mbc_all
def Parsing_ATT_CK_data(rules_json):
import copy
ttp_all = []
for capability_key,capability_value in rules_json.items():
rules_meta_json = capability_value['meta']
rules_meta_ttp_list = rules_meta_json['att&ck']
if rules_meta_ttp_list:
for rules_meta_ttp_dict in rules_meta_ttp_list:
ttp_child = []
ttp_tactic = rules_meta_ttp_dict['tactic']
ttp_child.append(ttp_tactic)
ttp_technique = rules_meta_ttp_dict['technique']
ttp_child.append(ttp_technique)
ttp_subtechnique = rules_meta_ttp_dict['subtechnique']
ttp_child.append(ttp_subtechnique)
ttp_all.append(copy.deepcopy(ttp_child))
return ttp_all
def find_api(object):
match_func_list = []
if hasattr(object,'children'):
object_child_list = object.children
if not object_child_list and hasattr(object,'statement'):
if hasattr(object.statement,'value'):
match_func = object.statement.value
elif hasattr(object.statement,'child'):
match_func = object.statement.child.value
match_func_list.append(match_func)
return match_func_list
else:
for object_child_obj in object_child_list:
if object_child_obj.success:
match_func_list_ = find_api(object_child_obj)
match_func_list += match_func_list_
return match_func_list
else:
return []
def Parsing_CAPABILITY_data(rules_json,capabilities):
import copy
CAPABILITY_list = []
for capability in rules_json:
Child_CAPABILITY_list = []
Child_CAPABILITY_list.append(capability)
if capability == "contains PDB path":
pdb_dict = rules_json[capability]
match_pdb_dict = pdb_dict['matches']['0']['node']['feature']['matches']
for match_pdb in match_pdb_dict:
Child_CAPABILITY_list.append(match_pdb)
CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))
else:
capability_child_list = capabilities[capability]
for capability_child in capability_child_list:
capability_child_child_obj_children_list = capability_child[1].children
for capability_child_child_obj_children_child in capability_child_child_obj_children_list:
if capability_child_child_obj_children_child.success:
if not capability_child_child_obj_children_child.children:
if hasattr(capability_child_child_obj_children_child.statement,'value'):
match_func = capability_child_child_obj_children_child.statement.value
if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
Child_CAPABILITY_list.append(match_func)
else:
match_func_list = find_api(capability_child_child_obj_children_child)
for match_func in match_func_list:
if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
Child_CAPABILITY_list.append(match_func)
CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))
return CAPABILITY_list
def Parsing_basic_data(meta_josn):
basic_data = []
rules_meta_sample = meta_josn['sample']
sample_path = rules_meta_sample['path']
basic_data.append(sample_path)
sample_md5 = rules_meta_sample['md5']
basic_data.append(sample_md5)
sample_sha1 = rules_meta_sample['sha1']
basic_data.append(sample_sha1)
sample_sha256 = rules_meta_sample['sha256']
basic_data.append(sample_sha256)
return basic_data
'''
格式化方式1
path: E:\\a.exe
md5: e48a1c9b14cb16e363899c4bc4f4d33f
sha1: 0dab1f59a6a3587cc801d810d60be5886fdd7445
sha256: a593b585bd61d23c03fbfe5b31aadc157961d4b65afa0860dd57692a283628c3
'''
def Format_Basic_data_type1(basic_data,file_path=None, con_print=True):
sample_path = basic_data[0]
sample_md5 = basic_data[1]
sample_sha1 = basic_data[2]
sample_sha256 = basic_data[3]
if con_print:
print('path: '+sample_path)
print('md5: '+sample_md5)
print('sha1: '+sample_sha1)
print('sha256: '+sample_sha256)
print('\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
file_handle.write('path: '+sample_path+'\n')
file_handle.write('md5: '+sample_md5+'\n')
file_handle.write('sha1: '+sample_sha1+'\n')
file_handle.write('sha256: '+sample_sha256+'\n')
file_handle.write('\n')
'''
格式化方式1
write file on Windows
ZwWriteFile
contains PDB path
E:\\8168\\vc98\\self\\bin\\x86\\c1.pdb
'''
def Format_CAPABILITY_data_type1(CAPABILITY_list, file_path=None, con_print=True):
if con_print:
for i in CAPABILITY_list:
if len(i)==1:
print('*'+i[0])
else:
print('*'+i[0])
other_list = i[1:]
for j in other_list:
print('\t'+str(j))
print('\r\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
for i in CAPABILITY_list:
if len(i) == 1:
file_handle.write('*'+i[0]+'\n')
else:
file_handle.write('*'+i[0] + '\n')
other_list = i[1:]
for j in other_list:
file_handle.write('\t' + str(j)+'\n')
file_handle.write('\n')
'''
格式化方式1
objective: Operating System
behavior: Registry
method: Create Registry Key
objective: Operating System
behavior: Registry
method: Open Registry Key
'''
def Format_MBC_data_type1(mbc_all, file_path=None, con_print=True):
if con_print:
for mbc_list in mbc_all:
print('*'+'objective: '+mbc_list[0])
print('\t'+'behavior: '+mbc_list[1])
if mbc_list[2]:
print('\t\t'+'method: '+mbc_list[2])
print('\r\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
for mbc_list in mbc_all:
file_handle.write('*'+'objective: ' + mbc_list[0]+'\n')
file_handle.write('\t' + 'behavior: ' + mbc_list[1]+'\n')
if mbc_list[2]:
file_handle.write('\t\t' + 'method: ' + mbc_list[2]+'\n')
file_handle.write('\n')
'''
格式化方式1
tactic: Discovery
technique: System Information Discovery
subtechnique:
'''
def Format_TTP_data_type1(ttp_all, file_path=None, con_print=True):
if con_print:
for ttp_list in ttp_all:
print('*'+'tactic: '+ttp_list[0])
print('\t'+'technique: '+ttp_list[1])
if ttp_list[2]:
print('\t\t'+'subtechnique: '+ttp_list[2])
print('\r\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
for ttp_list in ttp_all:
file_handle.write('*'+'tactic: ' + ttp_list[0]+'\n')
file_handle.write('\t' + 'technique: ' + ttp_list[1]+'\n')
if ttp_list[2]:
file_handle.write('\t\t' + 'subtechnique: ' + ttp_list[2]+'\n')
file_handle.write('\n')
def capa_ana_main(file_path,rules_path="C:\\capa_config\\rules",sigs_path="C:\\capa_config\\sigs",print_sig=True,log_path=None,ttp_sig=True,mbc_sig=True,cap_sig=True):
argv_json = ['-r', rules_path, '-s', sigs_path,'-j', file_path]
argv_v = ['-r', rules_path, '-s', sigs_path,'-v', file_path]
argv_vv = ['-r', rules_path, '-s', sigs_path,'-vv', file_path]
argv = argv_json
results, capabilities = capa_main(argv=argv)
results_json = json.loads(results)
rules_json = results_json['rules']
meta_josn = results_json['meta']
basic_data = Parsing_basic_data(meta_josn)
Format_Basic_data_type1(basic_data,con_print=print_sig,file_path=log_path)
if ttp_sig:
ttp_all = Parsing_ATT_CK_data(rules_json)
Format_TTP_data_type1(ttp_all,con_print=print_sig,file_path=log_path)
if mbc_sig:
mbc_all = Parsing_MBC_data(rules_json)
Format_MBC_data_type1(mbc_all,con_print=print_sig,file_path=log_path)
if cap_sig:
CAPABILITY_list = Parsing_CAPABILITY_data(rules_json,capabilities)
Format_CAPABILITY_data_type1(CAPABILITY_list,con_print=print_sig,file_path=log_path)
if __name__ == '__main__':
import os,sys
base_dir = r"E:\恶意代码分析\1-日常狩猎\APT28\2021-12-17"
log_path = os.path.join(base_dir,'capa_ana_results.log')
base_handle = os.walk(base_dir)
for root,dir_list,file_list in base_handle:
for file in file_list:
file_path = os.path.join(root,file)
capa_ana_main(file_path=file_path,print_sig=False,log_path=log_path)
sys.exit(0)
capa.main
#!/usr/bin/env python3
"""
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import hashlib
import logging
import os.path
import argparse
import datetime
import textwrap
import itertools
import collections
from typing import Any, Dict, List, Tuple
import halo
import tqdm
import colorama
from pefile import PEFormatError
from elftools.common.exceptions import ELFError
import capa.rules
import capa.engine
import capa.version
import capa.render.json
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze
import capa.render.vverbose
import capa.features.extractors
import capa.features.extractors.common
import capa.features.extractors.pefile
import capa.features.extractors.elffile
from capa.rules import Rule, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import get_file_taste
from capa.features.extractors.base_extractor import FunctionHandle, FeatureExtractor
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
BACKEND_VIV = "vivisect"
BACKEND_SMDA = "smda"
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
E_MISSING_RULES = -10
E_MISSING_FILE = -11
E_INVALID_RULE = -12
E_CORRUPT_FILE = -13
E_FILE_LIMITATION = -14
E_INVALID_SIG = -15
E_INVALID_FILE_TYPE = -16
E_INVALID_FILE_ARCH = -17
E_INVALID_FILE_OS = -18
E_UNSUPPORTED_IDA_VERSION = -19
logger = logging.getLogger("capa")
def set_vivisect_log_level(level):
logging.getLogger("vivisect").setLevel(level)
logging.getLogger("vivisect.base").setLevel(level)
logging.getLogger("vivisect.impemu").setLevel(level)
logging.getLogger("vtrace").setLevel(level)
logging.getLogger("envi").setLevel(level)
logging.getLogger("envi.codeflow").setLevel(level)
def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle):
# contains features from:
# - insns
# - function
function_features = collections.defaultdict(set) # type: FeatureSet
bb_matches = collections.defaultdict(list) # type: MatchResults
for feature, va in itertools.chain(extractor.extract_function_features(f), extractor.extract_global_features()):
function_features[feature].add(va)
for bb in extractor.get_basic_blocks(f):
# contains features from:
# - insns
# - basic blocks
bb_features = collections.defaultdict(set)
for feature, va in itertools.chain(
extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
):
bb_features[feature].add(va)
function_features[feature].add(va)
for insn in extractor.get_instructions(f, bb):
for feature, va in itertools.chain(
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
):
bb_features[feature].add(va)
function_features[feature].add(va)
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, int(bb))
for rule_name, res in matches.items():
bb_matches[rule_name].extend(res)
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(function_features, rule, [va])
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f))
return function_matches, bb_matches, len(function_features)
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
file_features = collections.defaultdict(set) # type: FeatureSet
for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
# not all file features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
if va:
file_features[feature].add(va)
else:
if feature not in file_features:
file_features[feature] = set()
logger.debug("analyzed file and extracted %d features", len(file_features))
file_features.update(function_features)
_, matches = capa.engine.match(ruleset.file_rules, file_features, 0x0)
return matches, len(file_features)
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
all_function_matches = collections.defaultdict(list) # type: MatchResults
all_bb_matches = collections.defaultdict(list) # type: MatchResults
meta = {
"feature_counts": {
"file": 0,
"functions": {},
},
"library_functions": {},
} # type: Dict[str, Any]
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
functions = list(extractor.get_functions())
n_funcs = len(functions)
# 修改 for f in pb: 修改为 for f in functions: 去掉加载条 注释pb
# pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions")
for f in functions:
function_address = int(f)
if extractor.is_library_function(function_address):
function_name = extractor.get_function_name(function_address)
logger.debug("skipping library function 0x%x (%s)", function_address, function_name)
meta["library_functions"][function_address] = function_name
#修改 注释pb(进度条)相关
# n_libs = len(meta["library_functions"])
# percentage = 100 * (n_libs / n_funcs)
# if isinstance(pb, tqdm.tqdm):
# pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
continue
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][function_address] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
all_bb_matches[rule_name].extend(res)
# collection of features that captures the rule matches within function and BB scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.
function_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()):
locations = set(map(lambda p: p[0], results))
rule = ruleset[rule_name]
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
meta["feature_counts"]["file"] = feature_count
matches = {
rule_name: results
for rule_name, results in itertools.chain(
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_bb_matches.items(),
all_function_matches.items(),
all_file_matches.items(),
)
}
return matches, meta
def has_rule_with_namespace(rules, capabilities, rule_cat):
for rule_name in capabilities.keys():
if rules.rules[rule_name].meta.get("namespace", "").startswith(rule_cat):
return True
return False
def is_internal_rule(rule: Rule) -> bool:
return rule.meta.get("namespace", "").startswith("internal/")
def is_file_limitation_rule(rule: Rule) -> bool:
return rule.meta.get("namespace", "") == "internal/limitation/file"
def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values()))
for file_limitation_rule in file_limitation_rules:
if file_limitation_rule.name not in capabilities:
continue
#修改logger.warning 修改为logger.debug
logger.debug("-" * 80)
for line in file_limitation_rule.meta.get("description", "").split("\n"):
# 修改logger.warning 修改为logger.debug
logger.debug(" " + line)
# 修改logger.warning 修改为logger.debug
logger.debug(" Identified via rule: %s", file_limitation_rule.name)
if is_standalone:
# 修改logger.warning 修改为logger.debug
logger.debug(" ")
# 修改logger.warning 修改为logger.debug
logger.debug(" Use -v or -vv if you really want to see the capabilities identified by capa.")
# 修改logger.warning 修改为logger.debug
logger.debug("-" * 80)
# bail on first file limitation
return True
return False
def is_supported_format(sample: str) -> bool:
"""
Return if this is a supported file based on magic header values
"""
with open(sample, "rb") as f:
taste = f.read(0x100)
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
def get_format(sample: str) -> str:
with open(sample, "rb") as f:
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_format(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def is_supported_arch(sample: str) -> bool:
with open(sample, "rb") as f:
buf = f.read()
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
def get_arch(sample: str) -> str:
with open(sample, "rb") as f:
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_arch(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def is_supported_os(sample: str) -> bool:
with open(sample, "rb") as f:
buf = f.read()
return len(list(capa.features.extractors.common.extract_os(buf))) == 1
def get_os(sample: str) -> str:
with open(sample, "rb") as f:
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_os(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def get_meta_str(vw):
"""
Return workspace meta information string
"""
meta = []
for k in ["Format", "Platform", "Architecture"]:
if k in vw.metadata:
meta.append("%s: %s" % (k.lower(), vw.metadata[k]))
return "%s, number of functions: %d" % (", ".join(meta), len(vw.getFunctions()))
def is_running_standalone() -> bool:
"""
are we running from a PyInstaller'd executable?
if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
"""
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")
def get_default_root() -> str:
"""
get the file system path to the default resources directory.
under PyInstaller, this comes from _MEIPASS.
under source, this is the root directory of the project.
"""
if is_running_standalone():
# pylance/mypy don't like `sys._MEIPASS` because this isn't standard.
# its injected by pyinstaller.
# so we'll fetch this attribute dynamically.
return getattr(sys, "_MEIPASS")
else:
return os.path.join(os.path.dirname(__file__), "..")
def get_default_signatures() -> List[str]:
"""
compute a list of file system paths to the default FLIRT signatures.
"""
sigs_path = os.path.join(get_default_root(), "sigs")
logger.debug("signatures path: %s", sigs_path)
ret = []
for root, dirs, files in os.walk(sigs_path):
for file in files:
if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")):
continue
ret.append(os.path.join(root, file))
return ret
class UnsupportedFormatError(ValueError):
pass
class UnsupportedArchError(ValueError):
pass
class UnsupportedOSError(ValueError):
pass
def get_workspace(path, format, sigpaths):
"""
load the program at the given path into a vivisect workspace using the given format.
also apply the given FLIRT signatures.
supported formats:
- pe
- elf
- shellcode 32-bit
- shellcode 64-bit
- auto
this creates and analyzes the workspace; however, it does *not* save the workspace.
this is the responsibility of the caller.
"""
# lazy import enables us to not require viv if user wants SMDA, for example.
import viv_utils
logger.debug("generating vivisect workspace for: %s", path)
if format == "auto":
if not is_supported_format(path):
raise UnsupportedFormatError()
# don't analyze, so that we can add our Flirt function analyzer first.
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
elif format in {"pe", "elf"}:
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
elif format == "sc32":
# these are not analyzed nor saved.
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="i386", analyze=False)
elif format == "sc64":
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="amd64", analyze=False)
else:
raise ValueError("unexpected format: " + format)
viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths)
vw.analyze()
logger.debug("%s", get_meta_str(vw))
return vw
class UnsupportedRuntimeError(RuntimeError):
pass
def get_extractor(
path: str, format: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
) -> FeatureExtractor:
"""
raises:
UnsupportedFormatError
UnsupportedArchError
UnsupportedOSError
"""
if format not in ("sc32", "sc64"):
if not is_supported_format(path):
raise UnsupportedFormatError()
if not is_supported_arch(path):
raise UnsupportedArchError()
if not is_supported_os(path):
raise UnsupportedOSError()
if backend == "smda":
from smda.SmdaConfig import SmdaConfig
from smda.Disassembler import Disassembler
import capa.features.extractors.smda.extractor
smda_report = None
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
config = SmdaConfig()
config.STORE_BUFFER = True
smda_disasm = Disassembler(config)
smda_report = smda_disasm.disassembleFile(path)
return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path)
else:
import capa.features.extractors.viv.extractor
# 修改 删除旋转提示 with下面向左缩进
# with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(path, format, sigpaths)
if should_save_workspace:
logger.debug("saving workspace")
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
def is_nursery_rule_path(path: str) -> bool:
"""
The nursery is a spot for rules that have not yet been fully polished.
For example, they may not have references to public example of a technique.
Yet, we still want to capture and report on their matches.
The nursery is currently a subdirectory of the rules directory with that name.
When nursery rules are loaded, their metadata section should be updated with:
`nursery=True`.
"""
return "nursery" in path
def get_rules(rule_path: str, disable_progress=False) -> List[Rule]:
if not os.path.exists(rule_path):
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
rule_paths = []
if os.path.isfile(rule_path):
rule_paths.append(rule_path)
elif os.path.isdir(rule_path):
logger.debug("reading rules from directory %s", rule_path)
for root, dirs, files in os.walk(rule_path):
if ".github" in root:
# the .github directory contains CI config in capa-rules
# this includes some .yml files
# these are not rules
continue
for file in files:
if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_path = os.path.join(root, file)
rule_paths.append(rule_path)
rules = [] # type: List[Rule]
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
# 修改 删除进度条
# for rule_path in pbar(list(rule_paths), desc="loading ", unit=" rules"):
for rule_path in rule_paths:
try:
rule = capa.rules.Rule.from_yaml_file(rule_path)
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = rule_path
if is_nursery_rule_path(rule_path):
rule.meta["capa/nursery"] = True
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)
return rules
def get_signatures(sigs_path):
if not os.path.exists(sigs_path):
raise IOError("signatures path %s does not exist or cannot be accessed" % sigs_path)
paths = []
if os.path.isfile(sigs_path):
paths.append(sigs_path)
elif os.path.isdir(sigs_path):
logger.debug("reading signatures from directory %s", os.path.abspath(os.path.normpath(sigs_path)))
for root, dirs, files in os.walk(sigs_path):
for file in files:
if file.endswith((".pat", ".pat.gz", ".sig")):
sig_path = os.path.join(root, file)
paths.append(sig_path)
# nicely normalize and format path so that debugging messages are clearer
paths = [os.path.abspath(os.path.normpath(path)) for path in paths]
# load signatures in deterministic order: the alphabetic sorting of filename.
# this means that `0_sigs.pat` loads before `1_sigs.pat`.
paths = sorted(paths, key=os.path.basename)
for path in paths:
logger.debug("found signature file: %s", path)
return paths
def collect_metadata(argv, sample_path, rules_path, extractor):
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
with open(sample_path, "rb") as f:
buf = f.read()
md5.update(buf)
sha1.update(buf)
sha256.update(buf)
if rules_path != RULES_PATH_DEFAULT_STRING:
rules_path = os.path.abspath(os.path.normpath(rules_path))
format = get_format(sample_path)
arch = get_arch(sample_path)
os_ = get_os(sample_path)
return {
"timestamp": datetime.datetime.now().isoformat(),
"version": capa.version.__version__,
"argv": argv,
"sample": {
"md5": md5.hexdigest(),
"sha1": sha1.hexdigest(),
"sha256": sha256.hexdigest(),
"path": os.path.normpath(sample_path),
},
"analysis": {
"format": format,
"arch": arch,
"os": os_,
"extractor": extractor.__class__.__name__,
"rules": rules_path,
"base_address": extractor.get_base_address(),
"layout": {
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
},
},
}
def compute_layout(rules, extractor, capabilities):
"""
compute a metadata structure that links basic blocks
to the functions in which they're found.
only collect the basic blocks at which some rule matched.
otherwise, we may pollute the json document with
a large amount of un-referenced data.
"""
functions_by_bb = {}
bbs_by_function = {}
for f in extractor.get_functions():
bbs_by_function[int(f)] = []
for bb in extractor.get_basic_blocks(f):
functions_by_bb[int(bb)] = int(f)
bbs_by_function[int(f)].append(int(bb))
matched_bbs = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
for (addr, match) in matches:
assert addr in functions_by_bb
matched_bbs.add(addr)
layout = {
"functions": {
f: {
"matched_basic_blocks":
# this object is open to extension in the future,
# such as with the function name, etc.
}
for f, bbs in bbs_by_function.items()
}
}
return layout
def install_common_args(parser, wanted=None):
"""
register a common set of command line arguments for re-use by main & scripts.
these are things like logging/coloring/etc.
also enable callers to opt-in to common arguments, like specifying the input sample.
this routine lets many script use the same language for cli arguments.
see `handle_common_args` to do common configuration.
args:
parser (argparse.ArgumentParser): a parser to update in place, adding common arguments.
wanted (Set[str]): collection of arguments to opt-into, including:
- "sample": required positional argument to input file.
- "format": flag to override file format.
- "backend": flag to override analysis backend.
- "rules": flag to override path to capa rules.
- "tag": flag to override/specify which rules to match.
"""
if wanted is None:
wanted = set()
#
# common arguments that all scripts will have
#
parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__))
parser.add_argument(
"-v", "--verbose", action="store_true", help="enable verbose result document (no effect with --json)"
)
parser.add_argument(
"-vv", "--vverbose", action="store_true", help="enable very verbose result document (no effect with --json)"
)
parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors")
parser.add_argument(
"--color",
type=str,
choices=("auto", "always", "never"),
default="auto",
help="enable ANSI color codes in results, default: only during interactive session",
)
#
# arguments that may be opted into:
#
# - sample
# - format
# - rules
# - tag
#
if "sample" in wanted:
parser.add_argument(
"sample",
type=str,
help="path to sample to analyze",
)
if "format" in wanted:
formats = [
("auto", "(default) detect file type automatically"),
("pe", "Windows PE file"),
("elf", "Executable and Linkable Format"),
("sc32", "32-bit shellcode"),
("sc64", "64-bit shellcode"),
("freeze", "features previously frozen by capa"),
]
format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
parser.add_argument(
"-f",
"--format",
choices=[f[0] for f in formats],
default="auto",
help="select sample format, %s" % format_help,
)
if "backend" in wanted:
parser.add_argument(
"-b",
"--backend",
type=str,
help="select the backend to use",
choices=(BACKEND_VIV, BACKEND_SMDA),
default=BACKEND_VIV,
)
if "rules" in wanted:
parser.add_argument(
"-r",
"--rules",
type=str,
default=RULES_PATH_DEFAULT_STRING,
help="path to rule file or directory, use embedded rules by default",
)
if "signatures" in wanted:
parser.add_argument(
"-s",
"--signatures",
type=str,
default=SIGNATURES_PATH_DEFAULT_STRING,
help="path to .sig/.pat file or directory used to identify library functions, use embedded signatures by default",
)
if "tag" in wanted:
parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values")
def handle_common_args(args):
"""
handle the global config specified by `install_common_args`,
such as configuring logging/coloring/etc.
the following fields will be overwritten when present:
- rules: file system path to rule files.
- signatures: file system path to signature files.
args:
args (argparse.Namespace): parsed arguments that included at least `install_common_args` args.
"""
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# disable vivisect-related logging, it's verbose and not relevant for capa users
set_vivisect_log_level(logging.CRITICAL)
# Since Python 3.8 cp65001 is an alias to utf_8, but not for Pyhton < 3.8
# TODO: remove this code when only supporting Python 3.8+
# https://stackoverflow.com/a/3259271/87207
import codecs
codecs.register(lambda name: codecs.lookup("utf-8") if name == "cp65001" else None)
if args.color == "always":
colorama.init(strip=False)
elif args.color == "auto":
# colorama will detect:
# - when on Windows console, and fixup coloring, and
# - when not an interactive session, and disable coloring
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
elif args.color == "never":
colorama.init(strip=True)
else:
raise RuntimeError("unexpected --color value: " + args.color)
if hasattr(args, "rules"):
if args.rules == RULES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded rules.")
logger.debug(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/ /path/to/mal.exe`.")
logger.debug(" You can see the current default rule set here:")
logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80)
rules_path = os.path.join(get_default_root(), "rules")
if not os.path.exists(rules_path):
# when a users installs capa via pip,
# this pulls down just the source code - not the default rules.
# i'm not sure the default rules should even be written to the library directory,
# so in this case, we require the user to use -r to specify the rule directory.
logger.error("default embedded rules not found! (maybe you installed capa as a library?)")
logger.error("provide your own rule set via the `-r` option.")
return E_MISSING_RULES
else:
rules_path = args.rules
logger.debug("using rules path: %s", rules_path)
args.rules = rules_path
if hasattr(args, "signatures"):
if args.signatures == SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded signatures.")
logger.debug(
" To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`."
)
logger.debug("-" * 80)
sigs_path = os.path.join(get_default_root(), "sigs")
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
args.signatures = sigs_path
def main(argv=None):
if sys.version_info < (3, 6):
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.6+")
if argv is None:
argv = sys.argv[1:]
desc = "The FLARE team's open-source tool to identify capabilities in executable files."
epilog = textwrap.dedent(
"""
By default, capa uses a default set of embedded rules.
You can see the rule set here:
https://github.com/mandiant/capa-rules
To provide your own rule set, use the `-r` flag:
capa --rules /path/to/rules suspicious.exe
capa -r /path/to/rules suspicious.exe
examples:
identify capabilities in a binary
capa suspicious.exe
identify capabilities in 32-bit shellcode, see `-f` for all supported formats
capa -f sc32 shellcode.bin
report match locations
capa -v suspicious.exe
report all feature match details
capa -vv suspicious.exe
filter rules by meta fields, e.g. rule name or namespace
capa -t "create TCP socket" suspicious.exe
"""
)
parser = argparse.ArgumentParser(
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
)
install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"})
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)
ret = handle_common_args(args)
if ret is not None and ret != 0:
return ret
try:
taste = get_file_taste(args.sample)
except IOError as e:
# per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
# handle the IOError separately and reach into the args
logger.error("%s", e.args[0])
return E_MISSING_FILE
try:
# 规则
rules = get_rules(args.rules, disable_progress=args.quiet)
rules = capa.rules.RuleSet(rules)
logger.debug(
"successfully loaded %s rules",
# during the load of the RuleSet, we extract subscope statements into their own rules
# that are subsequently `match`ed upon. this inflates the total rule count.
# so, filter out the subscope rules when reporting total number of loaded rules.
len([i for i in filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())]),
)
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.debug("selected %d rules", len(rules))
for i, r in enumerate(rules.rules, 1):
# TODO don't display subscope rules?
logger.debug(" %d. %s", i, r)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return E_INVALID_RULE
file_extractor = None
if args.format == "pe" or (args.format == "auto" and taste.startswith(b"MZ")):
# these pefile and elffile file feature extractors are pretty light weight: they don't do any code analysis.
# so we can fairly quickly determine if the given file has "pure" file-scope rules
# that indicate a limitation (like "file is packed based on section names")
# and avoid doing a full code analysis on difficult/impossible binaries.
try:
file_extractor = capa.features.extractors.pefile.PefileFeatureExtractor(args.sample)
except PEFormatError as e:
logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
return E_CORRUPT_FILE
elif args.format == "elf" or (args.format == "auto" and taste.startswith(b"\x7fELF")):
try:
file_extractor = capa.features.extractors.elffile.ElfFeatureExtractor(args.sample)
except (ELFError, OverflowError) as e:
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
return E_CORRUPT_FILE
if file_extractor:
try:
pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {})
except PEFormatError as e:
logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
return E_CORRUPT_FILE
except (ELFError, OverflowError) as e:
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
return E_CORRUPT_FILE
# file limitations that rely on non-file scope won't be detected here.
# nor on FunctionName features, because pefile doesn't support this.
if has_file_limitation(rules, pure_file_capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
logger.debug("file limitation short circuit, won't analyze fully.")
return E_FILE_LIMITATION
try:
if args.format == "pe" or (args.format == "auto" and taste.startswith(b"MZ")):
sig_paths = get_signatures(args.signatures)
else:
sig_paths = []
logger.debug("skipping library code matching: only have PE signatures")
except (IOError) as e:
logger.error("%s", str(e))
return E_INVALID_SIG
if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)):
format = "freeze"
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
format = args.format
if format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_32):
format = "sc32"
elif format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_64):
format = "sc64"
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
# 解析函数
extractor = get_extractor(
args.sample, format, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
)
except UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE or ELF file.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)."
)
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
logger.error("-" * 80)
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
logger.error("-" * 80)
logger.error(" Input file does not appear to target a supported architecture.")
logger.error(" ")
logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).")
logger.error("-" * 80)
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
logger.error("-" * 80)
logger.error(" Input file does not appear to target a supported OS.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux)."
)
logger.error("-" * 80)
return E_INVALID_FILE_OS
meta = collect_metadata(argv, args.sample, args.rules, extractor)
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
if has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return E_FILE_LIMITATION
# 修改 将打印换成返回 capabilities是字典
if args.json:
results = capa.render.json.render(meta, rules, capabilities)
elif args.vverbose:
results = capa.render.vverbose.render(meta, rules, capabilities)
elif args.verbose:
results = capa.render.verbose.render(meta, rules, capabilities)
else:
results = capa.render.default.render(meta, rules, capabilities)
colorama.deinit()
logger.debug("done.")
# 修改 return 0 为 return results,capabilities
# return 0
return results,capabilities
def ida_main():
import capa.rules
import capa.ida.helpers
import capa.render.default
import capa.features.extractors.ida.extractor
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
if not capa.ida.helpers.is_supported_ida_version():
return E_UNSUPPORTED_IDA_VERSION
if not capa.ida.helpers.is_supported_file_type():
return E_INVALID_FILE_TYPE
logger.debug("-" * 80)
logger.debug(" Using default embedded rules.")
logger.debug(" ")
logger.debug(" You can see the current default rule set here:")
logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80)
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
rules = get_rules(rules_path)
rules = capa.rules.RuleSet(rules)
meta = capa.ida.helpers.collect_metadata()
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
meta["analysis"].update(counts)
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
colorama.init(strip=True)
print(capa.render.default.render(meta, rules, capabilities))
def is_runtime_ida():
try:
import idc
except ImportError:
return False
else:
return True
if __name__ == "__main__":
if is_runtime_ida():
ida_main()
else:
sys.exit(main())
适用于capa4.0.1
capa.ana.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2021/12/14 13:24
# @Author : wing
import re
import sys
import json
from capa.main import main as capa_main
def Parsing_MBC_data(rules_json):
import copy
mbc_all = []
for capability_key,capability_value in rules_json.items():
rules_meta_json = capability_value['meta']
rules_meta_mbc_list = rules_meta_json['mbc']
if rules_meta_mbc_list:
for rules_meta_mbc_dict in rules_meta_mbc_list:
mbc_child = []
mbc_objective = rules_meta_mbc_dict['objective']
mbc_child.append(mbc_objective)
mbc_behavior = rules_meta_mbc_dict['behavior']
mbc_child.append(mbc_behavior)
mbc_method = rules_meta_mbc_dict['method']
mbc_child.append(mbc_method)
mbc_all.append(copy.deepcopy(mbc_child))
return mbc_all
def Parsing_ATT_CK_data(rules_json):
import copy
ttp_all = []
for capability_key,capability_value in rules_json.items():
rules_meta_json = capability_value['meta']
rules_meta_ttp_list = rules_meta_json['attack']
if rules_meta_ttp_list:
for rules_meta_ttp_dict in rules_meta_ttp_list:
ttp_child = []
ttp_tactic = rules_meta_ttp_dict['tactic']
ttp_child.append(ttp_tactic)
ttp_technique = rules_meta_ttp_dict['technique']
ttp_child.append(ttp_technique)
ttp_subtechnique = rules_meta_ttp_dict['subtechnique']
ttp_child.append(ttp_subtechnique)
ttp_all.append(copy.deepcopy(ttp_child))
return ttp_all
def find_api(object):
match_func_list = []
if hasattr(object,'children'):
object_child_list = object.children
if not object_child_list and hasattr(object,'statement'):
if hasattr(object.statement,'value'):
match_func = object.statement.value
elif hasattr(object.statement,'child'):
match_func = object.statement.child.value
match_func_list.append(match_func)
return match_func_list
else:
for object_child_obj in object_child_list:
if object_child_obj.success:
match_func_list_ = find_api(object_child_obj)
match_func_list += match_func_list_
return match_func_list
else:
return []
def Parsing_CAPABILITY_data(rules_json,capabilities):
import copy
CAPABILITY_list = []
for capability in rules_json:
Child_CAPABILITY_list = []
Child_CAPABILITY_list.append(capability)
if capability == "contains PDB path":
pdb_dict = rules_json[capability]
match_pdb_dict = pdb_dict['matches']['0']['node']['feature']['matches']
for match_pdb in match_pdb_dict:
Child_CAPABILITY_list.append(match_pdb)
CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))
else:
capability_child_list = capabilities[capability]
for capability_child in capability_child_list:
capability_child_child_obj_children_list = capability_child[1].children
for capability_child_child_obj_children_child in capability_child_child_obj_children_list:
if capability_child_child_obj_children_child.success:
if not capability_child_child_obj_children_child.children:
if hasattr(capability_child_child_obj_children_child.statement,'value'):
match_func = capability_child_child_obj_children_child.statement.value
if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
Child_CAPABILITY_list.append(match_func)
else:
match_func_list = find_api(capability_child_child_obj_children_child)
for match_func in match_func_list:
if match_func not in Child_CAPABILITY_list and len(str(match_func))>=5:
Child_CAPABILITY_list.append(match_func)
CAPABILITY_list.append(copy.deepcopy(Child_CAPABILITY_list))
return CAPABILITY_list
def Parsing_basic_data(meta_josn):
basic_data = []
rules_meta_sample = meta_josn['sample']
sample_path = rules_meta_sample['path']
basic_data.append(sample_path)
sample_md5 = rules_meta_sample['md5']
basic_data.append(sample_md5)
sample_sha1 = rules_meta_sample['sha1']
basic_data.append(sample_sha1)
sample_sha256 = rules_meta_sample['sha256']
basic_data.append(sample_sha256)
return basic_data
'''
格式化方式1
path: E:\\a.exe
md5: e48a1c9b14cb16e363899c4bc4f4d33f
sha1: 0dab1f59a6a3587cc801d810d60be5886fdd7445
sha256: a593b585bd61d23c03fbfe5b31aadc157961d4b65afa0860dd57692a283628c3
'''
def Format_Basic_data_type1(basic_data,file_path=None, con_print=True):
sample_path = basic_data[0]
sample_md5 = basic_data[1]
sample_sha1 = basic_data[2]
sample_sha256 = basic_data[3]
if con_print:
print('path: '+sample_path)
print('md5: '+sample_md5)
print('sha1: '+sample_sha1)
print('sha256: '+sample_sha256)
print('\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
file_handle.write('path: '+sample_path+'\n')
file_handle.write('md5: '+sample_md5+'\n')
file_handle.write('sha1: '+sample_sha1+'\n')
file_handle.write('sha256: '+sample_sha256+'\n')
file_handle.write('\n')
'''
格式化方式1
write file on Windows
ZwWriteFile
contains PDB path
E:\\8168\\vc98\\self\\bin\\x86\\c1.pdb
'''
def Format_CAPABILITY_data_type1(CAPABILITY_list, file_path=None, con_print=True):
if con_print:
for i in CAPABILITY_list:
if len(i)==1:
print('*'+i[0])
else:
print('*'+i[0])
other_list = i[1:]
for j in other_list:
print('\t'+str(j))
print('\r\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
for i in CAPABILITY_list:
if len(i) == 1:
file_handle.write('*'+i[0]+'\n')
else:
file_handle.write('*'+i[0] + '\n')
other_list = i[1:]
for j in other_list:
file_handle.write('\t' + str(j)+'\n')
file_handle.write('\n')
'''
格式化方式1
objective: Operating System
behavior: Registry
method: Create Registry Key
objective: Operating System
behavior: Registry
method: Open Registry Key
'''
def Format_MBC_data_type1(mbc_all, file_path=None, con_print=True):
if con_print:
for mbc_list in mbc_all:
print('*'+'objective: '+mbc_list[0])
print('\t'+'behavior: '+mbc_list[1])
if mbc_list[2]:
print('\t\t'+'method: '+mbc_list[2])
print('\r\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
for mbc_list in mbc_all:
file_handle.write('*'+'objective: ' + mbc_list[0]+'\n')
file_handle.write('\t' + 'behavior: ' + mbc_list[1]+'\n')
if mbc_list[2]:
file_handle.write('\t\t' + 'method: ' + mbc_list[2]+'\n')
file_handle.write('\n')
'''
格式化方式1
tactic: Discovery
technique: System Information Discovery
subtechnique:
'''
def Format_TTP_data_type1(ttp_all, file_path=None, con_print=True):
if con_print:
for ttp_list in ttp_all:
print('*'+'tactic: '+ttp_list[0])
print('\t'+'technique: '+ttp_list[1])
if ttp_list[2]:
print('\t\t'+'subtechnique: '+ttp_list[2])
print('\r\n')
else:
with open(file_path,'a',encoding='utf-8') as file_handle:
for ttp_list in ttp_all:
file_handle.write('*'+'tactic: ' + ttp_list[0]+'\n')
file_handle.write('\t' + 'technique: ' + ttp_list[1]+'\n')
if ttp_list[2]:
file_handle.write('\t\t' + 'subtechnique: ' + ttp_list[2]+'\n')
file_handle.write('\n')
def capa_ana_main(file_path,rules_path="C:\\capa_config\\rules_20220922",sigs_path="C:\\capa_config\\sigs",print_sig=True,log_path=None,ttp_sig=True,mbc_sig=True,cap_sig=True):
argv_json = ['-r', rules_path, '-s', sigs_path,'-j', file_path]
argv_v = ['-r', rules_path, '-s', sigs_path,'-v', file_path]
argv_vv = ['-r', rules_path, '-s', sigs_path,'-vv', file_path]
argv = argv_json
results, capabilities = capa_main(argv=argv)
results_json = json.loads(results)
rules_json = results_json['rules']
meta_josn = results_json['meta']
basic_data = Parsing_basic_data(meta_josn)
Format_Basic_data_type1(basic_data,con_print=print_sig,file_path=log_path)
if ttp_sig:
ttp_all = Parsing_ATT_CK_data(rules_json)
Format_TTP_data_type1(ttp_all,con_print=print_sig,file_path=log_path)
if mbc_sig:
mbc_all = Parsing_MBC_data(rules_json)
Format_MBC_data_type1(mbc_all,con_print=print_sig,file_path=log_path)
if cap_sig:
CAPABILITY_list = Parsing_CAPABILITY_data(rules_json,capabilities)
Format_CAPABILITY_data_type1(CAPABILITY_list,con_print=print_sig,file_path=log_path)
if __name__ == '__main__':
import os,sys
base_dir = r"E:\恶意代码分析\1-日常狩猎\APT28\20220919-from慧眼"
log_path = os.path.join(base_dir,'capa_ana_results.log')
base_handle = os.walk(base_dir)
for root,dir_list,file_list in base_handle:
for file in file_list:
if len(file)==32 or len(file)==40 or len(file)==64:
file_path = os.path.join(root,file)
capa_ana_main(file_path=file_path,print_sig=False,log_path=log_path,mbc_sig=False,cap_sig=False)
#默认设置file_path,rules_path="C:\\capa_config\\rules_20220922",sigs_path="C:\\capa_config\\sigs",print_sig=True,log_path=None,ttp_sig=True,mbc_sig=True,cap_sig=True
sys.exit(0)
capa.main.py
#!/usr/bin/env python3
"""
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import time
import hashlib
import logging
import os.path
import argparse
import datetime
import textwrap
import warnings
import itertools
import contextlib
import collections
from typing import Any, Dict, List, Tuple
import halo
import tqdm
import colorama
from pefile import PEFormatError
from elftools.common.exceptions import ELFError
import capa.perf
import capa.rules
import capa.engine
import capa.version
import capa.render.json
import capa.render.default
import capa.render.verbose
import capa.features.common
import capa.features.freeze
import capa.render.vverbose
import capa.features.extractors
import capa.features.extractors.common
import capa.features.extractors.pefile
import capa.features.extractors.dnfile_
import capa.features.extractors.elffile
import capa.features.extractors.dotnetfile
import capa.features.extractors.base_extractor
from capa.rules import Rule, Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import (
get_format,
get_file_taste,
get_auto_format,
log_unsupported_os_error,
log_unsupported_arch_error,
log_unsupported_format_error,
)
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError
from capa.features.common import (
FORMAT_PE,
FORMAT_ELF,
FORMAT_AUTO,
FORMAT_SC32,
FORMAT_SC64,
FORMAT_DOTNET,
FORMAT_FREEZE,
)
from capa.features.address import NO_ADDRESS
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
BACKEND_VIV = "vivisect"
BACKEND_SMDA = "smda"
BACKEND_DOTNET = "dotnet"
E_MISSING_RULES = -10
E_MISSING_FILE = -11
E_INVALID_RULE = -12
E_CORRUPT_FILE = -13
E_FILE_LIMITATION = -14
E_INVALID_SIG = -15
E_INVALID_FILE_TYPE = -16
E_INVALID_FILE_ARCH = -17
E_INVALID_FILE_OS = -18
E_UNSUPPORTED_IDA_VERSION = -19
logger = logging.getLogger("capa")
@contextlib.contextmanager
def timing(msg: str):
t0 = time.time()
yield
t1 = time.time()
logger.debug("perf: %s: %0.2fs", msg, t1 - t0)
def set_vivisect_log_level(level):
logging.getLogger("vivisect").setLevel(level)
logging.getLogger("vivisect.base").setLevel(level)
logging.getLogger("vivisect.impemu").setLevel(level)
logging.getLogger("vtrace").setLevel(level)
logging.getLogger("envi").setLevel(level)
logging.getLogger("envi.codeflow").setLevel(level)
logging.getLogger("Elf").setLevel(level)
def find_instruction_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> Tuple[FeatureSet, MatchResults]:
"""
find matches for the given rules for the given instruction.
returns: tuple containing (features for instruction, match results for instruction)
"""
# all features found for the instruction.
features = collections.defaultdict(set) # type: FeatureSet
for feature, addr in itertools.chain(
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
):
features[feature].add(addr)
# matches found at this instruction.
_, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address)
for rule_name, res in matches.items():
rule = ruleset[rule_name]
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])
return features, matches
def find_basic_block_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
"""
find matches for the given rules within the given basic block.
returns: tuple containing (features for basic block, match results for basic block, match results for instructions)
"""
# all features found within this basic block,
# includes features found within instructions.
features = collections.defaultdict(set) # type: FeatureSet
# matches found at the instruction scope.
# might be found at different instructions, thats ok.
insn_matches = collections.defaultdict(list) # type: MatchResults
for insn in extractor.get_instructions(f, bb):
ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn)
for feature, vas in ifeatures.items():
features[feature].update(vas)
for rule_name, res in imatches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(
extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
):
features[feature].add(va)
# matches found within this basic block.
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address)
for rule_name, res in matches.items():
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
return features, matches, insn_matches
def find_code_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, fh: FunctionHandle
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
"""
find matches for the given rules within the given function.
returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features)
"""
# all features found within this function,
# includes features found within basic blocks (and instructions).
function_features = collections.defaultdict(set) # type: FeatureSet
# matches found at the basic block scope.
# might be found at different basic blocks, thats ok.
bb_matches = collections.defaultdict(list) # type: MatchResults
# matches found at the instruction scope.
# might be found at different instructions, thats ok.
insn_matches = collections.defaultdict(list) # type: MatchResults
for bb in extractor.get_basic_blocks(fh):
features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb)
for feature, vas in features.items():
function_features[feature].update(vas)
for rule_name, res in bmatches.items():
bb_matches[rule_name].extend(res)
for rule_name, res in imatches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()):
function_features[feature].add(va)
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address)
return function_matches, bb_matches, insn_matches, len(function_features)
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
file_features = collections.defaultdict(set) # type: FeatureSet
for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
# not all file features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
if va:
file_features[feature].add(va)
else:
if feature not in file_features:
file_features[feature] = set()
logger.debug("analyzed file and extracted %d features", len(file_features))
file_features.update(function_features)
_, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return matches, len(file_features)
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
all_function_matches = collections.defaultdict(list) # type: MatchResults
all_bb_matches = collections.defaultdict(list) # type: MatchResults
all_insn_matches = collections.defaultdict(list) # type: MatchResults
meta = {
"feature_counts": {
"file": 0,
"functions": {},
},
"library_functions": {},
} # type: Dict[str, Any]
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
functions = list(extractor.get_functions())
n_funcs = len(functions)
# 修改 for f in pb: 修改为 for f in functions: 去掉加载条 注释pb
# pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions")
for f in functions:
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
meta["library_functions"][f.address] = function_name
# 修改 注释pb(进度条)相关
# n_libs = len(meta["library_functions"])
# percentage = 100 * (n_libs / n_funcs)
# if isinstance(pb, tqdm.tqdm):
# pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
continue
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][f.address] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in insn_matches.items():
all_insn_matches[rule_name].extend(res)
# collection of features that captures the rule matches within function, BB, and instruction scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.
function_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(
all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items()
):
locations = set(map(lambda p: p[0], results))
rule = ruleset[rule_name]
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
meta["feature_counts"]["file"] = feature_count
matches = {
rule_name: results
for rule_name, results in itertools.chain(
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_insn_matches.items(),
all_bb_matches.items(),
all_function_matches.items(),
all_file_matches.items(),
)
}
return matches, meta
# TODO move all to helpers?
def has_rule_with_namespace(rules, capabilities, rule_cat):
for rule_name in capabilities.keys():
if rules.rules[rule_name].meta.get("namespace", "").startswith(rule_cat):
return True
return False
def is_internal_rule(rule: Rule) -> bool:
return rule.meta.get("namespace", "").startswith("internal/")
def is_file_limitation_rule(rule: Rule) -> bool:
return rule.meta.get("namespace", "") == "internal/limitation/file"
def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values()))
for file_limitation_rule in file_limitation_rules:
if file_limitation_rule.name not in capabilities:
continue
# 修改logger.warning 修改为logger.debug
logger.debug("-" * 80)
for line in file_limitation_rule.meta.get("description", "").split("\n"):
# 修改logger.warning 修改为logger.debug
logger.debug(" " + line)
# 修改logger.warning 修改为logger.debug
logger.debug(" Identified via rule: %s", file_limitation_rule.name)
if is_standalone:
# 修改logger.warning 修改为logger.debug
logger.debug(" ")
# 修改logger.warning 修改为logger.debug
logger.debug(" Use -v or -vv if you really want to see the capabilities identified by capa.")
# 修改logger.warning 修改为logger.debug
logger.debug("-" * 80)
# bail on first file limitation
return True
return False
def is_supported_format(sample: str) -> bool:
"""
Return if this is a supported file based on magic header values
"""
with open(sample, "rb") as f:
taste = f.read(0x100)
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
def is_supported_arch(sample: str) -> bool:
with open(sample, "rb") as f:
buf = f.read()
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
def get_arch(sample: str) -> str:
with open(sample, "rb") as f:
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_arch(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def is_supported_os(sample: str) -> bool:
with open(sample, "rb") as f:
buf = f.read()
return len(list(capa.features.extractors.common.extract_os(buf))) == 1
def get_os(sample: str) -> str:
with open(sample, "rb") as f:
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_os(buf):
assert isinstance(feature.value, str)
return feature.value
return "unknown"
def get_meta_str(vw):
"""
Return workspace meta information string
"""
meta = []
for k in ["Format", "Platform", "Architecture"]:
if k in vw.metadata:
meta.append("%s: %s" % (k.lower(), vw.metadata[k]))
return "%s, number of functions: %d" % (", ".join(meta), len(vw.getFunctions()))
def is_running_standalone() -> bool:
"""
are we running from a PyInstaller'd executable?
if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
"""
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")
def get_default_root() -> str:
"""
get the file system path to the default resources directory.
under PyInstaller, this comes from _MEIPASS.
under source, this is the root directory of the project.
"""
if is_running_standalone():
# pylance/mypy don't like `sys._MEIPASS` because this isn't standard.
# its injected by pyinstaller.
# so we'll fetch this attribute dynamically.
return getattr(sys, "_MEIPASS")
else:
return os.path.join(os.path.dirname(__file__), "..")
def get_default_signatures() -> List[str]:
"""
compute a list of file system paths to the default FLIRT signatures.
"""
sigs_path = os.path.join(get_default_root(), "sigs")
logger.debug("signatures path: %s", sigs_path)
ret = []
for root, dirs, files in os.walk(sigs_path):
for file in files:
if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")):
continue
ret.append(os.path.join(root, file))
return ret
def get_workspace(path, format_, sigpaths):
"""
load the program at the given path into a vivisect workspace using the given format.
also apply the given FLIRT signatures.
supported formats:
- pe
- elf
- shellcode 32-bit
- shellcode 64-bit
- auto
this creates and analyzes the workspace; however, it does *not* save the workspace.
this is the responsibility of the caller.
"""
# lazy import enables us to not require viv if user wants SMDA, for example.
import viv_utils
logger.debug("generating vivisect workspace for: %s", path)
# TODO should not be auto at this point, anymore
if format_ == FORMAT_AUTO:
if not is_supported_format(path):
raise UnsupportedFormatError()
# don't analyze, so that we can add our Flirt function analyzer first.
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
elif format_ in {FORMAT_PE, FORMAT_ELF}:
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
elif format_ == FORMAT_SC32:
# these are not analyzed nor saved.
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="i386", analyze=False)
elif format_ == FORMAT_SC64:
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="amd64", analyze=False)
else:
raise ValueError("unexpected format: " + format_)
viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths)
vw.analyze()
logger.debug("%s", get_meta_str(vw))
return vw
# TODO get_extractors -> List[FeatureExtractor]?
def get_extractor(
path: str, format_: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False
) -> FeatureExtractor:
"""
raises:
UnsupportedFormatError
UnsupportedArchError
UnsupportedOSError
"""
if format_ not in (FORMAT_SC32, FORMAT_SC64):
if not is_supported_format(path):
raise UnsupportedFormatError()
if not is_supported_arch(path):
raise UnsupportedArchError()
if not is_supported_os(path):
raise UnsupportedOSError()
if format_ == FORMAT_DOTNET:
import capa.features.extractors.dnfile.extractor
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
if backend == "smda":
from smda.SmdaConfig import SmdaConfig
from smda.Disassembler import Disassembler
import capa.features.extractors.smda.extractor
logger.warning("Deprecation warning: v4.0 will be the last capa version to support the SMDA backend.")
warnings.warn("v4.0 will be the last capa version to support the SMDA backend.", DeprecationWarning)
smda_report = None
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
config = SmdaConfig()
config.STORE_BUFFER = True
smda_disasm = Disassembler(config)
smda_report = smda_disasm.disassembleFile(path)
return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path)
else:
import capa.features.extractors.viv.extractor
# 修改 删除旋转提示 with下面向左缩进
# with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(path, format_, sigpaths)
if should_save_workspace:
logger.debug("saving workspace")
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]:
file_extractors: List[FeatureExtractor] = list()
if format_ == capa.features.extractors.common.FORMAT_PE:
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample))
dnfile_extractor = capa.features.extractors.dnfile_.DnfileFeatureExtractor(sample)
if dnfile_extractor.is_dotnet_file():
file_extractors.append(dnfile_extractor)
elif format_ == capa.features.extractors.common.FORMAT_ELF:
file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample))
return file_extractors
def is_nursery_rule_path(path: str) -> bool:
"""
The nursery is a spot for rules that have not yet been fully polished.
For example, they may not have references to public example of a technique.
Yet, we still want to capture and report on their matches.
The nursery is currently a subdirectory of the rules directory with that name.
When nursery rules are loaded, their metadata section should be updated with:
`nursery=True`.
"""
return "nursery" in path
def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
rule_file_paths = []
for rule_path in rule_paths:
if not os.path.exists(rule_path):
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
if os.path.isfile(rule_path):
rule_file_paths.append(rule_path)
elif os.path.isdir(rule_path):
logger.debug("reading rules from directory %s", rule_path)
for root, dirs, files in os.walk(rule_path):
if ".git" in root:
# the .github directory contains CI config in capa-rules
# this includes some .yml files
# these are not rules
# additionally, .git has files that are not .yml and generate the warning
# skip those too
continue
for file in files:
if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_path = os.path.join(root, file)
rule_file_paths.append(rule_path)
rules = [] # type: List[Rule]
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
# 修改 删除进度条
# for rule_file_path in pbar(list(rule_file_paths), desc="loading ", unit=" rules"):
for rule_file_path in rule_file_paths:
try:
rule = capa.rules.Rule.from_yaml_file(rule_file_path)
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = rule_file_path
if is_nursery_rule_path(rule_file_path):
rule.meta["capa/nursery"] = True
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)
return rules
def get_signatures(sigs_path):
if not os.path.exists(sigs_path):
raise IOError("signatures path %s does not exist or cannot be accessed" % sigs_path)
paths = []
if os.path.isfile(sigs_path):
paths.append(sigs_path)
elif os.path.isdir(sigs_path):
logger.debug("reading signatures from directory %s", os.path.abspath(os.path.normpath(sigs_path)))
for root, dirs, files in os.walk(sigs_path):
for file in files:
if file.endswith((".pat", ".pat.gz", ".sig")):
sig_path = os.path.join(root, file)
paths.append(sig_path)
# nicely normalize and format path so that debugging messages are clearer
paths = [os.path.abspath(os.path.normpath(path)) for path in paths]
# load signatures in deterministic order: the alphabetic sorting of filename.
# this means that `0_sigs.pat` loads before `1_sigs.pat`.
paths = sorted(paths, key=os.path.basename)
for path in paths:
logger.debug("found signature file: %s", path)
return paths
def collect_metadata(
argv: List[str],
sample_path: str,
rules_path: List[str],
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
):
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
with open(sample_path, "rb") as f:
buf = f.read()
md5.update(buf)
sha1.update(buf)
sha256.update(buf)
if rules_path != [RULES_PATH_DEFAULT_STRING]:
rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path]
format_ = get_format(sample_path)
arch = get_arch(sample_path)
os_ = get_os(sample_path)
return {
"timestamp": datetime.datetime.now().isoformat(),
"version": capa.version.__version__,
"argv": argv,
"sample": {
"md5": md5.hexdigest(),
"sha1": sha1.hexdigest(),
"sha256": sha256.hexdigest(),
"path": os.path.normpath(sample_path),
},
"analysis": {
"format": format_,
"arch": arch,
"os": os_,
"extractor": extractor.__class__.__name__,
"rules": rules_path,
"base_address": extractor.get_base_address(),
"layout": {
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
},
},
}
def compute_layout(rules, extractor, capabilities):
"""
compute a metadata structure that links basic blocks
to the functions in which they're found.
only collect the basic blocks at which some rule matched.
otherwise, we may pollute the json document with
a large amount of un-referenced data.
"""
functions_by_bb = {}
bbs_by_function = {}
for f in extractor.get_functions():
bbs_by_function[f.address] = []
for bb in extractor.get_basic_blocks(f):
functions_by_bb = f.address
bbs_by_function[f.address].append(bb.address)
matched_bbs = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
for (addr, match) in matches:
assert addr in functions_by_bb
matched_bbs.add(addr)
layout = {
"functions": {
f: {
"matched_basic_blocks":
# this object is open to extension in the future,
# such as with the function name, etc.
}
for f, bbs in bbs_by_function.items()
}
}
return layout
def install_common_args(parser, wanted=None):
"""
register a common set of command line arguments for re-use by main & scripts.
these are things like logging/coloring/etc.
also enable callers to opt-in to common arguments, like specifying the input sample.
this routine lets many script use the same language for cli arguments.
see `handle_common_args` to do common configuration.
args:
parser (argparse.ArgumentParser): a parser to update in place, adding common arguments.
wanted (Set[str]): collection of arguments to opt-into, including:
- "sample": required positional argument to input file.
- "format": flag to override file format.
- "backend": flag to override analysis backend.
- "rules": flag to override path to capa rules.
- "tag": flag to override/specify which rules to match.
"""
if wanted is None:
wanted = set()
#
# common arguments that all scripts will have
#
parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__))
parser.add_argument(
"-v", "--verbose", action="store_true", help="enable verbose result document (no effect with --json)"
)
parser.add_argument(
"-vv", "--vverbose", action="store_true", help="enable very verbose result document (no effect with --json)"
)
parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors")
parser.add_argument(
"--color",
type=str,
choices=("auto", "always", "never"),
default="auto",
help="enable ANSI color codes in results, default: only during interactive session",
)
#
# arguments that may be opted into:
#
# - sample
# - format
# - rules
# - tag
#
if "sample" in wanted:
parser.add_argument(
"sample",
type=str,
help="path to sample to analyze",
)
if "format" in wanted:
formats = [
(FORMAT_AUTO, "(default) detect file type automatically"),
(FORMAT_PE, "Windows PE file"),
(FORMAT_DOTNET, ".NET PE file"),
(FORMAT_ELF, "Executable and Linkable Format"),
(FORMAT_SC32, "32-bit shellcode"),
(FORMAT_SC64, "64-bit shellcode"),
(FORMAT_FREEZE, "features previously frozen by capa"),
]
format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
parser.add_argument(
"-f",
"--format",
choices=[f[0] for f in formats],
default=FORMAT_AUTO,
help="select sample format, %s" % format_help,
)
if "backend" in wanted:
parser.add_argument(
"-b",
"--backend",
type=str,
help="select the backend to use",
choices=(BACKEND_VIV, BACKEND_SMDA),
default=BACKEND_VIV,
)
if "rules" in wanted:
parser.add_argument(
"-r",
"--rules",
type=str,
default=[RULES_PATH_DEFAULT_STRING],
action="append",
help="path to rule file or directory, use embedded rules by default",
)
if "signatures" in wanted:
parser.add_argument(
"-s",
"--signatures",
type=str,
default=SIGNATURES_PATH_DEFAULT_STRING,
help="path to .sig/.pat file or directory used to identify library functions, use embedded signatures by default",
)
if "tag" in wanted:
parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values")
def handle_common_args(args):
"""
handle the global config specified by `install_common_args`,
such as configuring logging/coloring/etc.
the following fields will be overwritten when present:
- rules: file system path to rule files.
- signatures: file system path to signature files.
args:
args (argparse.Namespace): parsed arguments that included at least `install_common_args` args.
"""
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# disable vivisect-related logging, it's verbose and not relevant for capa users
set_vivisect_log_level(logging.CRITICAL)
# Since Python 3.8 cp65001 is an alias to utf_8, but not for Python < 3.8
# TODO: remove this code when only supporting Python 3.8+
# https://stackoverflow.com/a/3259271/87207
import codecs
codecs.register(lambda name: codecs.lookup("utf-8") if name == "cp65001" else None)
if args.color == "always":
colorama.init(strip=False)
elif args.color == "auto":
# colorama will detect:
# - when on Windows console, and fixup coloring, and
# - when not an interactive session, and disable coloring
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
elif args.color == "never":
colorama.init(strip=True)
else:
raise RuntimeError("unexpected --color value: " + args.color)
if hasattr(args, "rules"):
rules_paths: List[str] = []
if args.rules == [RULES_PATH_DEFAULT_STRING]:
logger.debug("-" * 80)
logger.debug(" Using default embedded rules.")
logger.debug(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/ /path/to/mal.exe`.")
logger.debug(" You can see the current default rule set here:")
logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80)
default_rule_path = os.path.join(get_default_root(), "rules")
if not os.path.exists(default_rule_path):
# when a users installs capa via pip,
# this pulls down just the source code - not the default rules.
# i'm not sure the default rules should even be written to the library directory,
# so in this case, we require the user to use -r to specify the rule directory.
logger.error("default embedded rules not found! (maybe you installed capa as a library?)")
logger.error("provide your own rule set via the `-r` option.")
return E_MISSING_RULES
rules_paths.append(default_rule_path)
else:
rules_paths = args.rules
if RULES_PATH_DEFAULT_STRING in rules_paths:
rules_paths.remove(RULES_PATH_DEFAULT_STRING)
for rule_path in rules_paths:
logger.debug("using rules path: %s", rule_path)
args.rules = rules_paths
if hasattr(args, "signatures"):
if args.signatures == SIGNATURES_PATH_DEFAULT_STRING:
logger.debug("-" * 80)
logger.debug(" Using default embedded signatures.")
logger.debug(
" To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`."
)
logger.debug("-" * 80)
sigs_path = os.path.join(get_default_root(), "sigs")
else:
sigs_path = args.signatures
logger.debug("using signatures path: %s", sigs_path)
args.signatures = sigs_path
def main(argv=None):
if sys.version_info < (3, 7):
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.7+")
if argv is None:
argv = sys.argv[1:]
desc = "The FLARE team's open-source tool to identify capabilities in executable files."
epilog = textwrap.dedent(
"""
By default, capa uses a default set of embedded rules.
You can see the rule set here:
https://github.com/mandiant/capa-rules
To provide your own rule set, use the `-r` flag:
capa --rules /path/to/rules suspicious.exe
capa -r /path/to/rules suspicious.exe
examples:
identify capabilities in a binary
capa suspicious.exe
identify capabilities in 32-bit shellcode, see `-f` for all supported formats
capa -f sc32 shellcode.bin
report match locations
capa -v suspicious.exe
report all feature match details
capa -vv suspicious.exe
filter rules by meta fields, e.g. rule name or namespace
capa -t "create TCP socket" suspicious.exe
"""
)
parser = argparse.ArgumentParser(
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
)
install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"})
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)
ret = handle_common_args(args)
if ret is not None and ret != 0:
return ret
try:
_ = get_file_taste(args.sample)
except IOError as e:
# per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
# handle the IOError separately and reach into the args
logger.error("%s", e.args[0])
return E_MISSING_FILE
format_ = args.format
if format_ == FORMAT_AUTO:
try:
format_ = get_auto_format(args.sample)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
try:
rules = get_rules(args.rules, disable_progress=args.quiet)
rules = capa.rules.RuleSet(rules)
logger.debug(
"successfully loaded %s rules",
# during the load of the RuleSet, we extract subscope statements into their own rules
# that are subsequently `match`ed upon. this inflates the total rule count.
# so, filter out the subscope rules when reporting total number of loaded rules.
len([i for i in filter(lambda r: not r.is_subscope_rule(), rules.rules.values())]),
)
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.debug("selected %d rules", len(rules))
for i, r in enumerate(rules.rules, 1):
# TODO don't display subscope rules?
logger.debug(" %d. %s", i, r)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
logger.error(
"Please ensure you're using the rules that correspond to your major version of capa (%s)",
capa.version.get_major_version(),
)
logger.error(
"You can check out these rules with the following command:\n %s",
capa.version.get_rules_checkout_command(),
)
logger.error(
"Or, for more details, see the rule set documentation here: %s",
"https://github.com/mandiant/capa/blob/master/doc/rules.md",
)
return E_INVALID_RULE
# file feature extractors are pretty lightweight: they don't do any code analysis.
# so we can fairly quickly determine if the given file has "pure" file-scope rules
# that indicate a limitation (like "file is packed based on section names")
# and avoid doing a full code analysis on difficult/impossible binaries.
#
# this pass can inspect multiple file extractors, e.g., dotnet and pe to identify
# various limitations
try:
file_extractors = get_file_extractors(args.sample, format_)
except PEFormatError as e:
logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
return E_CORRUPT_FILE
except (ELFError, OverflowError) as e:
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
return E_CORRUPT_FILE
for file_extractor in file_extractors:
try:
pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {})
except PEFormatError as e:
logger.error("Input file '%s' is not a valid PE file: %s", args.sample, str(e))
return E_CORRUPT_FILE
except (ELFError, OverflowError) as e:
logger.error("Input file '%s' is not a valid ELF file: %s", args.sample, str(e))
return E_CORRUPT_FILE
if isinstance(file_extractor, capa.features.extractors.dnfile_.DnfileFeatureExtractor):
format_ = FORMAT_DOTNET
# file limitations that rely on non-file scope won't be detected here.
# nor on FunctionName features, because pefile doesn't support this.
if has_file_limitation(rules, pure_file_capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
logger.debug("file limitation short circuit, won't analyze fully.")
return E_FILE_LIMITATION
if format_ == FORMAT_FREEZE:
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
try:
if format_ == FORMAT_PE:
sig_paths = get_signatures(args.signatures)
else:
sig_paths = []
logger.debug("skipping library code matching: only have native PE signatures")
except IOError as e:
logger.error("%s", str(e))
return E_INVALID_SIG
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = get_extractor(
args.sample, format_, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet
)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
log_unsupported_arch_error()
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
log_unsupported_os_error()
return E_INVALID_FILE_OS
meta = collect_metadata(argv, args.sample, args.rules, extractor)
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
if has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return E_FILE_LIMITATION
# 修改 将打印换成返回 capabilities是字典
if args.json:
results = capa.render.json.render(meta, rules, capabilities)
elif args.vverbose:
results = capa.render.vverbose.render(meta, rules, capabilities)
elif args.verbose:
results = capa.render.verbose.render(meta, rules, capabilities)
else:
results = capa.render.default.render(meta, rules, capabilities)
colorama.deinit()
logger.debug("done.")
# 修改 return 0 为 return results,capabilities
# return 0
return results, capabilities
def ida_main():
import capa.rules
import capa.ida.helpers
import capa.render.default
import capa.features.extractors.ida.extractor
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
if not capa.ida.helpers.is_supported_ida_version():
return E_UNSUPPORTED_IDA_VERSION
if not capa.ida.helpers.is_supported_file_type():
return E_INVALID_FILE_TYPE
logger.debug("-" * 80)
logger.debug(" Using default embedded rules.")
logger.debug(" ")
logger.debug(" You can see the current default rule set here:")
logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80)
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
rules = get_rules(rules_path)
rules = capa.rules.RuleSet(rules)
meta = capa.ida.helpers.collect_metadata([rules_path])
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
meta["analysis"].update(counts)
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
colorama.init(strip=True)
print(capa.render.default.render(meta, rules, capabilities))
def is_runtime_ida():
try:
import idc
except ImportError:
return False
else:
return True
if __name__ == "__main__":
if is_runtime_ida():
ida_main()
else:
sys.exit(main())